#38 Ipad app production readiness, Colony orchestration, Social posting Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local> Reviewed-on: #44
509 lines
18 KiB
Python
509 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
import httpx
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class SocialPostingConfigurationError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class SocialPostingError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class SocialPlatform(str, Enum):
|
|
FACEBOOK = "facebook"
|
|
INSTAGRAM = "instagram"
|
|
LINKEDIN = "linkedin"
|
|
TWITTER = "twitter"
|
|
|
|
|
|
class PostStatus(str, Enum):
|
|
SCHEDULED = "scheduled"
|
|
PUBLISHING = "publishing"
|
|
PUBLISHED = "published"
|
|
FAILED = "failed"
|
|
|
|
|
|
class PostType(str, Enum):
|
|
IMAGE = "image"
|
|
VIDEO = "video"
|
|
CAROUSEL = "carousel"
|
|
TEXT = "text"
|
|
LINK = "link"
|
|
|
|
|
|
class PostRequest(BaseModel):
|
|
platforms: list[SocialPlatform] = Field(..., min_length=1, max_length=8)
|
|
post_type: PostType = PostType.IMAGE
|
|
caption: str = Field(..., min_length=1, max_length=4000)
|
|
hashtags: list[str] = Field(default_factory=list, max_length=40)
|
|
media_url: str | None = Field(default=None, max_length=2048)
|
|
media_path: str | None = Field(default=None, max_length=2048)
|
|
link_url: str | None = Field(default=None, max_length=2048)
|
|
schedule_time: str | None = Field(default=None, description="ISO-8601 timestamp. Future timestamps persist as scheduled.")
|
|
|
|
|
|
def _utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _env(name: str) -> str:
|
|
value = os.getenv(name, "").strip()
|
|
if not value or value.startswith("PLACEHOLDER"):
|
|
raise SocialPostingConfigurationError(f"{name} is not configured.")
|
|
return value
|
|
|
|
|
|
def _meta_version() -> str:
|
|
return os.getenv("META_API_VERSION", "v21.0").strip() or "v21.0"
|
|
|
|
|
|
def _parse_schedule(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|
|
|
|
|
|
def _caption_with_hashtags(caption: str, hashtags: list[str]) -> str:
|
|
cleaned = [tag.strip() for tag in hashtags if tag.strip()]
|
|
return f"{caption}\n\n{' '.join(cleaned)}" if cleaned else caption
|
|
|
|
|
|
def _serialize_row(row: asyncpg.Record | dict[str, Any]) -> dict[str, Any]:
|
|
value = dict(row)
|
|
for key in ("hashtags", "engagement", "platform_response"):
|
|
if isinstance(value.get(key), str):
|
|
value[key] = json.loads(value[key])
|
|
for key in ("created_at", "scheduled_at", "published_at", "updated_at"):
|
|
if value.get(key) is not None and hasattr(value[key], "isoformat"):
|
|
value[key] = value[key].isoformat()
|
|
return value
|
|
|
|
|
|
def _required_env_for_platform(platform: SocialPlatform) -> tuple[str, ...]:
|
|
if platform == SocialPlatform.FACEBOOK:
|
|
return ("META_PAGE_ACCESS_TOKEN", "META_PAGE_ID")
|
|
if platform == SocialPlatform.INSTAGRAM:
|
|
return ("META_PAGE_ACCESS_TOKEN", "META_INSTAGRAM_BUSINESS_ID")
|
|
if platform == SocialPlatform.LINKEDIN:
|
|
return ("LINKEDIN_ACCESS_TOKEN", "LINKEDIN_ORG_ID")
|
|
if platform == SocialPlatform.TWITTER:
|
|
return ("TWITTER_BEARER_TOKEN",)
|
|
raise SocialPostingConfigurationError(f"Unsupported social platform: {platform.value}")
|
|
|
|
|
|
def validate_platform_configuration(platforms: list[SocialPlatform]) -> None:
|
|
missing: list[str] = []
|
|
for platform in platforms:
|
|
for name in _required_env_for_platform(platform):
|
|
value = os.getenv(name, "").strip()
|
|
if not value or value.startswith("PLACEHOLDER"):
|
|
missing.append(name)
|
|
if missing:
|
|
joined = ", ".join(sorted(set(missing)))
|
|
raise SocialPostingConfigurationError(f"Social posting credentials are not configured: {joined}.")
|
|
|
|
|
|
def validate_payload_contract(payload: PostRequest) -> None:
|
|
if SocialPlatform.INSTAGRAM in payload.platforms and not payload.media_url:
|
|
raise SocialPostingError("media_url is required for Instagram publishing.")
|
|
if payload.post_type == PostType.VIDEO and not payload.media_url:
|
|
raise SocialPostingError("media_url is required for video publishing.")
|
|
if SocialPlatform.TWITTER in payload.platforms:
|
|
text = _caption_with_hashtags(payload.caption, payload.hashtags)
|
|
if payload.link_url:
|
|
text = f"{text}\n{payload.link_url}"
|
|
if payload.media_url:
|
|
text = f"{text}\n{payload.media_url}"
|
|
if len(text) > 280:
|
|
raise SocialPostingError("Twitter/X post exceeds 280 characters after links and hashtags.")
|
|
|
|
|
|
async def _insert_post(
|
|
conn: asyncpg.Connection,
|
|
*,
|
|
tenant_id: str,
|
|
actor_id: str,
|
|
request_id: str,
|
|
platform: SocialPlatform,
|
|
payload: PostRequest,
|
|
status: PostStatus,
|
|
scheduled_at: datetime | None,
|
|
) -> dict[str, Any]:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO catalyst_social_posts (
|
|
post_id, request_id, tenant_id, actor_id, platform, post_type,
|
|
caption, hashtags, media_url, media_path, link_url, status,
|
|
scheduled_at, engagement, created_at, updated_at
|
|
) VALUES (
|
|
$1::uuid, $2::uuid, $3, $4, $5, $6,
|
|
$7, $8::jsonb, $9, $10, $11, $12,
|
|
$13, '{}'::jsonb, NOW(), NOW()
|
|
)
|
|
RETURNING *
|
|
""",
|
|
str(uuid.uuid4()),
|
|
request_id,
|
|
tenant_id,
|
|
actor_id,
|
|
platform.value,
|
|
payload.post_type.value,
|
|
payload.caption,
|
|
json.dumps(payload.hashtags),
|
|
payload.media_url,
|
|
payload.media_path,
|
|
payload.link_url,
|
|
status.value,
|
|
scheduled_at,
|
|
)
|
|
return _serialize_row(row)
|
|
|
|
|
|
async def _update_post(
|
|
conn: asyncpg.Connection,
|
|
*,
|
|
post_id: str,
|
|
tenant_id: str,
|
|
status: PostStatus,
|
|
platform_post_id: str | None = None,
|
|
platform_response: dict[str, Any] | None = None,
|
|
error: str | None = None,
|
|
) -> dict[str, Any]:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
UPDATE catalyst_social_posts
|
|
SET status = $3,
|
|
platform_post_id = COALESCE($4, platform_post_id),
|
|
platform_response = COALESCE($5::jsonb, platform_response),
|
|
error = $6,
|
|
published_at = CASE WHEN $3 = 'published' THEN NOW() ELSE published_at END,
|
|
updated_at = NOW()
|
|
WHERE post_id = $1::uuid
|
|
AND tenant_id = $2
|
|
RETURNING *
|
|
""",
|
|
post_id,
|
|
tenant_id,
|
|
status.value,
|
|
platform_post_id,
|
|
json.dumps(platform_response) if platform_response is not None else None,
|
|
error,
|
|
)
|
|
if row is None:
|
|
raise SocialPostingError(f"Social post '{post_id}' not found.")
|
|
return _serialize_row(row)
|
|
|
|
|
|
class FacebookPublisher:
|
|
base = "https://graph.facebook.com"
|
|
|
|
async def publish(self, post: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
|
token = _env("META_PAGE_ACCESS_TOKEN")
|
|
page_id = _env("META_PAGE_ID")
|
|
caption = _caption_with_hashtags(post["caption"], post.get("hashtags") or [])
|
|
post_type = post["post_type"]
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
if post_type == PostType.VIDEO.value:
|
|
if not post.get("media_url"):
|
|
raise SocialPostingError("media_url is required for Facebook video posts.")
|
|
response = await client.post(
|
|
f"{self.base}/{_meta_version()}/{page_id}/videos",
|
|
data={"file_url": post["media_url"], "description": caption, "access_token": token},
|
|
)
|
|
elif post_type in {PostType.IMAGE.value, PostType.CAROUSEL.value} and post.get("media_url"):
|
|
response = await client.post(
|
|
f"{self.base}/{_meta_version()}/{page_id}/photos",
|
|
data={"url": post["media_url"], "message": caption, "access_token": token},
|
|
)
|
|
else:
|
|
message = f"{caption}\n{post['link_url']}" if post.get("link_url") else caption
|
|
response = await client.post(
|
|
f"{self.base}/{_meta_version()}/{page_id}/feed",
|
|
data={"message": message, "access_token": token},
|
|
)
|
|
if response.status_code >= 400:
|
|
raise SocialPostingError(f"Facebook publish failed: {response.text}")
|
|
data = response.json()
|
|
return data.get("id", data.get("post_id", "")), data
|
|
|
|
|
|
class InstagramPublisher:
|
|
base = "https://graph.facebook.com"
|
|
|
|
async def publish(self, post: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
|
token = _env("META_PAGE_ACCESS_TOKEN")
|
|
instagram_id = _env("META_INSTAGRAM_BUSINESS_ID")
|
|
if not post.get("media_url"):
|
|
raise SocialPostingError("media_url is required for Instagram posts.")
|
|
caption = _caption_with_hashtags(post["caption"], post.get("hashtags") or [])
|
|
async with httpx.AsyncClient(timeout=180.0) as client:
|
|
create_payload: dict[str, Any] = {
|
|
"caption": caption,
|
|
"access_token": token,
|
|
}
|
|
if post["post_type"] == PostType.VIDEO.value:
|
|
create_payload.update({"video_url": post["media_url"], "media_type": "REELS"})
|
|
else:
|
|
create_payload["image_url"] = post["media_url"]
|
|
created = await client.post(
|
|
f"{self.base}/{_meta_version()}/{instagram_id}/media",
|
|
data=create_payload,
|
|
)
|
|
if created.status_code >= 400:
|
|
raise SocialPostingError(f"Instagram container creation failed: {created.text}")
|
|
container_id = created.json().get("id")
|
|
if not container_id:
|
|
raise SocialPostingError("Instagram did not return a media container id.")
|
|
published = await client.post(
|
|
f"{self.base}/{_meta_version()}/{instagram_id}/media_publish",
|
|
data={"creation_id": container_id, "access_token": token},
|
|
)
|
|
if published.status_code >= 400:
|
|
raise SocialPostingError(f"Instagram publish failed: {published.text}")
|
|
data = published.json()
|
|
return data.get("id", ""), {"container": created.json(), "publish": data}
|
|
|
|
|
|
class LinkedInPublisher:
|
|
base = "https://api.linkedin.com/v2"
|
|
|
|
async def publish(self, post: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
|
token = _env("LINKEDIN_ACCESS_TOKEN")
|
|
org_id = _env("LINKEDIN_ORG_ID")
|
|
caption = _caption_with_hashtags(post["caption"], post.get("hashtags") or [])
|
|
if post.get("link_url"):
|
|
caption = f"{caption}\n{post['link_url']}"
|
|
if post.get("media_url"):
|
|
caption = f"{caption}\n{post['media_url']}"
|
|
payload = {
|
|
"author": f"urn:li:organization:{org_id}",
|
|
"lifecycleState": "PUBLISHED",
|
|
"specificContent": {
|
|
"com.linkedin.ugc.ShareContent": {
|
|
"shareCommentary": {"text": caption},
|
|
"shareMediaCategory": "NONE",
|
|
}
|
|
},
|
|
"visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"},
|
|
}
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.post(
|
|
f"{self.base}/ugcPosts",
|
|
headers={
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"X-Restli-Protocol-Version": "2.0.0",
|
|
},
|
|
json=payload,
|
|
)
|
|
if response.status_code >= 400:
|
|
raise SocialPostingError(f"LinkedIn publish failed: {response.text}")
|
|
data = response.json()
|
|
return data.get("id", ""), data
|
|
|
|
|
|
class TwitterPublisher:
|
|
base = "https://api.twitter.com/2"
|
|
|
|
async def publish(self, post: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
|
token = _env("TWITTER_BEARER_TOKEN")
|
|
text = _caption_with_hashtags(post["caption"], post.get("hashtags") or [])
|
|
if post.get("link_url"):
|
|
text = f"{text}\n{post['link_url']}"
|
|
if post.get("media_url"):
|
|
text = f"{text}\n{post['media_url']}"
|
|
if len(text) > 280:
|
|
raise SocialPostingError("Twitter/X post exceeds 280 characters after links and hashtags.")
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.post(
|
|
f"{self.base}/tweets",
|
|
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
|
|
json={"text": text},
|
|
)
|
|
if response.status_code >= 400:
|
|
raise SocialPostingError(f"Twitter/X publish failed: {response.text}")
|
|
data = response.json()
|
|
return data.get("data", {}).get("id", ""), data
|
|
|
|
|
|
_PUBLISHERS = {
|
|
SocialPlatform.FACEBOOK.value: FacebookPublisher(),
|
|
SocialPlatform.INSTAGRAM.value: InstagramPublisher(),
|
|
SocialPlatform.LINKEDIN.value: LinkedInPublisher(),
|
|
SocialPlatform.TWITTER.value: TwitterPublisher(),
|
|
}
|
|
|
|
|
|
async def _publish_post(conn: asyncpg.Connection, post: dict[str, Any]) -> dict[str, Any]:
|
|
publishing = await _update_post(
|
|
conn,
|
|
post_id=post["post_id"],
|
|
tenant_id=post["tenant_id"],
|
|
status=PostStatus.PUBLISHING,
|
|
)
|
|
publisher = _PUBLISHERS.get(publishing["platform"])
|
|
if publisher is None:
|
|
raise SocialPostingError(f"Unsupported social platform: {publishing['platform']}")
|
|
platform_post_id, platform_response = await publisher.publish(publishing)
|
|
return await _update_post(
|
|
conn,
|
|
post_id=publishing["post_id"],
|
|
tenant_id=publishing["tenant_id"],
|
|
status=PostStatus.PUBLISHED,
|
|
platform_post_id=platform_post_id,
|
|
platform_response=platform_response,
|
|
)
|
|
|
|
|
|
async def publish_content(
|
|
*,
|
|
pool: asyncpg.Pool,
|
|
tenant_id: str,
|
|
actor_id: str,
|
|
payload: PostRequest,
|
|
) -> dict[str, Any]:
|
|
validate_platform_configuration(payload.platforms)
|
|
validate_payload_contract(payload)
|
|
request_id = str(uuid.uuid4())
|
|
scheduled_at = _parse_schedule(payload.schedule_time)
|
|
posts: list[dict[str, Any]] = []
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
for platform in payload.platforms:
|
|
post = await _insert_post(
|
|
conn,
|
|
tenant_id=tenant_id,
|
|
actor_id=actor_id,
|
|
request_id=request_id,
|
|
platform=platform,
|
|
payload=payload,
|
|
status=PostStatus.SCHEDULED if scheduled_at and scheduled_at > _utcnow() else PostStatus.PUBLISHING,
|
|
scheduled_at=scheduled_at,
|
|
)
|
|
posts.append(post)
|
|
|
|
if scheduled_at and scheduled_at > _utcnow():
|
|
return {
|
|
"request_id": request_id,
|
|
"total": len(posts),
|
|
"published": 0,
|
|
"scheduled": len(posts),
|
|
"failed": 0,
|
|
"posts": posts,
|
|
}
|
|
|
|
published: list[dict[str, Any]] = []
|
|
failed: list[dict[str, Any]] = []
|
|
async with pool.acquire() as conn:
|
|
for post in posts:
|
|
try:
|
|
published.append(await _publish_post(conn, post))
|
|
except (SocialPostingConfigurationError, SocialPostingError, httpx.HTTPError) as exc:
|
|
failed.append(
|
|
await _update_post(
|
|
conn,
|
|
post_id=post["post_id"],
|
|
tenant_id=tenant_id,
|
|
status=PostStatus.FAILED,
|
|
error=str(exc),
|
|
)
|
|
)
|
|
|
|
return {
|
|
"request_id": request_id,
|
|
"total": len(posts),
|
|
"published": len(published),
|
|
"scheduled": 0,
|
|
"failed": len(failed),
|
|
"posts": published + failed,
|
|
}
|
|
|
|
|
|
async def get_post(*, pool: asyncpg.Pool, tenant_id: str, post_id: str) -> dict[str, Any] | None:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT * FROM catalyst_social_posts WHERE post_id = $1::uuid AND tenant_id = $2",
|
|
post_id,
|
|
tenant_id,
|
|
)
|
|
return _serialize_row(row) if row else None
|
|
|
|
|
|
async def list_posts(
|
|
*,
|
|
pool: asyncpg.Pool,
|
|
tenant_id: str,
|
|
platform: SocialPlatform | None = None,
|
|
status: PostStatus | None = None,
|
|
limit: int = 50,
|
|
) -> list[dict[str, Any]]:
|
|
clauses = ["tenant_id = $1"]
|
|
params: list[Any] = [tenant_id]
|
|
if platform:
|
|
params.append(platform.value)
|
|
clauses.append(f"platform = ${len(params)}")
|
|
if status:
|
|
params.append(status.value)
|
|
clauses.append(f"status = ${len(params)}")
|
|
params.append(limit)
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
f"""
|
|
SELECT *
|
|
FROM catalyst_social_posts
|
|
WHERE {' AND '.join(clauses)}
|
|
ORDER BY created_at DESC
|
|
LIMIT ${len(params)}
|
|
""",
|
|
*params,
|
|
)
|
|
return [_serialize_row(row) for row in rows]
|
|
|
|
|
|
async def publish_due_scheduled(*, pool: asyncpg.Pool, tenant_id: str, limit: int = 20) -> dict[str, Any]:
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT *
|
|
FROM catalyst_social_posts
|
|
WHERE tenant_id = $1
|
|
AND status = 'scheduled'
|
|
AND scheduled_at <= NOW()
|
|
ORDER BY scheduled_at ASC
|
|
LIMIT $2
|
|
""",
|
|
tenant_id,
|
|
limit,
|
|
)
|
|
published: list[dict[str, Any]] = []
|
|
failed: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
post = _serialize_row(row)
|
|
try:
|
|
published.append(await _publish_post(conn, post))
|
|
except (SocialPostingConfigurationError, SocialPostingError, httpx.HTTPError) as exc:
|
|
failed.append(
|
|
await _update_post(
|
|
conn,
|
|
post_id=post["post_id"],
|
|
tenant_id=tenant_id,
|
|
status=PostStatus.FAILED,
|
|
error=str(exc),
|
|
)
|
|
)
|
|
return {"published": len(published), "failed": len(failed), "posts": published + failed}
|