""" routes_catalyst.py Meta Marketing API wrappers for The Catalyst module. Routes: POST /api/catalyst/campaigns/create — Bulk campaign creation POST /api/catalyst/creative/sync — Upload ComfyUI assets to Meta GET /api/catalyst/insights/realtime — Poll Ads Insights API POST /api/catalyst/audiences/lookalike — Push CRM leads → Meta Custom Audience POST /api/catalyst/auth/meta — OAuth token acquisition """ import os import hashlib import logging from typing import Any from datetime import datetime from fastapi import APIRouter, HTTPException, Request, status from pydantic import BaseModel, Field logger = logging.getLogger(__name__) router = APIRouter() # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_sdk() -> tuple[Any, str]: """ Initialise the facebook-business SDK lazily. Returns (FacebookAdsApi instance, ad_account_id). Raises HTTPException 503 if credentials are missing or SDK init fails. """ try: from facebook_business.api import FacebookAdsApi # type: ignore access_token = os.getenv("META_ACCESS_TOKEN", "") app_id = os.getenv("META_APP_ID", "") app_secret = os.getenv("META_APP_SECRET", "") account_id = os.getenv("META_AD_ACCOUNT_ID", "") if not access_token or access_token.startswith("PLACEHOLDER"): raise ValueError("META_ACCESS_TOKEN is not configured.") if not account_id or account_id.startswith("PLACEHOLDER"): raise ValueError("META_AD_ACCOUNT_ID is not configured.") FacebookAdsApi.init(app_id, app_secret, access_token) return FacebookAdsApi.get_default_api(), account_id except ImportError: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="facebook-business SDK not installed. Run: pip install facebook-business", ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc), ) def _get_supabase(): """Initialise the Supabase client lazily.""" try: from supabase import create_client # type: ignore url = os.getenv("SUPABASE_URL", "") key = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "") if not url or url.startswith("PLACEHOLDER"): raise ValueError("SUPABASE_URL is not configured.") return create_client(url, key) except ImportError: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="supabase SDK not installed. Run: pip install supabase", ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc), ) def _ok(data: Any, meta: dict | None = None) -> dict: return {"status": "ok", "data": data, "meta": meta or {}} def _sha256_hash(value: str) -> str: """SHA-256 hash an email for Meta's hashed audience upload.""" return hashlib.sha256(value.strip().lower().encode()).hexdigest() # ── Request / Response Models ───────────────────────────────────────────────── class CampaignCreateRequest(BaseModel): name: str = Field(..., description="Campaign display name") objective: str = Field("OUTCOME_LEADS", description="Meta campaign objective enum") budget_daily: int = Field(..., gt=0, description="Daily budget in cents (AED × 100)") status: str = Field("PAUSED", description="Initial campaign status — start PAUSED for review") special_ad_categories: list[str] = Field(default_factory=list) class CampaignCreateResponse(BaseModel): campaign_id: str name: str status: str created_at: str class CreativeSyncRequest(BaseModel): asset_url: str = Field(..., description="Public URL of the ComfyUI-rendered image or video") asset_name: str = Field(..., description="Human-readable asset name") asset_type: str = Field(..., description="'image' or 'video'") ad_account_id: str | None = Field(None, description="Override ad account ID (optional)") class LookalikeAudienceRequest(BaseModel): country: str = Field("AE", description="ISO 3166-1 alpha-2 country code for lookalike") ratio: float = Field(0.01, ge=0.01, le=0.20, description="Lookalike ratio (1%–20%)") crm_filter_status: str = Field("Closed/Won", description="Supabase lead status to filter on") class MetaAuthRequest(BaseModel): short_lived_token: str = Field(..., description="Short-lived user access token from Meta OAuth") # ── 1. POST /campaigns/create ───────────────────────────────────────────────── @router.post("/campaigns/create", summary="Bulk-create Meta Marketing campaigns") async def create_campaigns( request: Request, payload: CampaignCreateRequest, ) -> dict: """ Triggers `facebook_business.adobjects.campaign.Campaign` to create a campaign under the configured Ad Account. Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID """ _api, account_id = _get_sdk() try: from facebook_business.adobjects.adaccount import AdAccount # type: ignore from facebook_business.adobjects.campaign import Campaign # type: ignore account = AdAccount(account_id) params = { Campaign.Field.name: payload.name, Campaign.Field.objective: payload.objective, Campaign.Field.status: payload.status, Campaign.Field.daily_budget: payload.budget_daily, Campaign.Field.special_ad_categories: payload.special_ad_categories, } campaign = account.create_campaign(params=params) # Broadcast live event via WebSocket if hasattr(request.app.state, "broadcast_live_event"): await request.app.state.broadcast_live_event( "create", f"Created campaign '{payload.name}' (objective: {payload.objective}).", payload.name, f"Budget: AED {payload.budget_daily / 100:.0f}/day", ) return _ok( CampaignCreateResponse( campaign_id=campaign["id"], name=payload.name, status=payload.status, created_at=datetime.utcnow().isoformat(), ).model_dump(), meta={"account_id": account_id}, ) except Exception as exc: logger.error("Campaign creation failed: %s", exc) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) # ── 2. POST /creative/sync ──────────────────────────────────────────────────── @router.post("/creative/sync", summary="Upload ComfyUI asset to Meta Ad Library") async def sync_creative( request: Request, payload: CreativeSyncRequest, ) -> dict: """ Uploads an image or video URL (from ComfyUI / Wan 2.2 / Qwen-Image 2512) to the Meta Ad Library (Creative Hub) and returns the Meta Asset ID. Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID """ _api, account_id = _get_sdk() account_id = payload.ad_account_id or account_id try: from facebook_business.adobjects.adaccount import AdAccount # type: ignore from facebook_business.adobjects.advideo import AdVideo # type: ignore from facebook_business.adobjects.adimage import AdImage # type: ignore account = AdAccount(account_id) if payload.asset_type == "video": # Video upload via file_url result = account.create_ad_video(params={ AdVideo.Field.name: payload.asset_name, AdVideo.Field.file_url: payload.asset_url, }) meta_asset_id = result["id"] else: # Image upload via url result = account.create_ad_image(params={ "filename": payload.asset_name, "url": payload.asset_url, }) # AdImage returns a hash dict — extract hash key meta_asset_id = list(result["images"].values())[0]["hash"] \ if "images" in result else result.get("id", "unknown") return _ok({ "meta_asset_id": meta_asset_id, "asset_name": payload.asset_name, "asset_type": payload.asset_type, "uploaded_at": datetime.utcnow().isoformat(), }) except Exception as exc: logger.error("Creative sync failed: %s", exc) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) # ── 3. GET /insights/realtime ───────────────────────────────────────────────── @router.get("/insights/realtime", summary="Poll Meta Ads Insights API") async def get_realtime_insights( date_preset: str = "last_7_days", level: str = "adset", ) -> dict: """ Polls `AdAccount.get_insights()` for CTR, CPA, spend, impressions across Ad Sets. Supports `date_preset` (e.g. 'today', 'last_7_days', 'last_30_days') and `level` ('campaign', 'adset', 'ad'). Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID """ _api, account_id = _get_sdk() try: from facebook_business.adobjects.adaccount import AdAccount # type: ignore from facebook_business.adobjects.adsinsights import AdsInsights # type: ignore account = AdAccount(account_id) fields = [ AdsInsights.Field.campaign_name, AdsInsights.Field.adset_name, AdsInsights.Field.spend, AdsInsights.Field.impressions, AdsInsights.Field.clicks, AdsInsights.Field.ctr, AdsInsights.Field.cpp, # cost per purchase (proxy for CPA) AdsInsights.Field.date_start, AdsInsights.Field.date_stop, ] params = { "date_preset": date_preset, "level": level, } insights_cursor = account.get_insights(fields=fields, params=params) results = [dict(row) for row in insights_cursor] return _ok(results, meta={ "account_id": account_id, "date_preset": date_preset, "level": level, "count": len(results), }) except Exception as exc: logger.error("Insights fetch failed: %s", exc) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) # ── 4. POST /audiences/lookalike ────────────────────────────────────────────── @router.post("/audiences/lookalike", summary="Push Supabase CRM leads → Meta Lookalike Audience") async def create_lookalike_audience( request: Request, payload: LookalikeAudienceRequest, ) -> dict: """ 1. Queries the Supabase `leads` table for rows matching `status = payload.crm_filter_status`. 2. SHA-256 hashes their email addresses. 3. Creates (or updates) a Meta Custom Audience with the hashed emails. 4. Creates a Lookalike Audience from that Custom Audience. Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID, SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY """ _api, account_id = _get_sdk() supabase = _get_supabase() # ── Step 1: Fetch qualified leads from Supabase CRM ── try: response = supabase.table("leads") \ .select("id, email, name") \ .eq("status", payload.crm_filter_status) \ .execute() leads = response.data or [] except Exception as exc: raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Supabase query failed: {exc}") if not leads: return _ok({"message": f"No leads found with status '{payload.crm_filter_status}'."}) # ── Step 2: Hash emails ── hashed_emails = [ _sha256_hash(lead["email"]) for lead in leads if lead.get("email") ] if not hashed_emails: raise HTTPException(status_code=422, detail="No valid email addresses found in the filtered leads.") # ── Step 3: Create / update Meta Custom Audience ── try: from facebook_business.adobjects.adaccount import AdAccount # type: ignore from facebook_business.adobjects.customaudience import CustomAudience # type: ignore account = AdAccount(account_id) audience_name = f"Velocity CRM — {payload.crm_filter_status} Leads" # Create custom audience custom_audience = account.create_custom_audience(params={ CustomAudience.Field.name: audience_name, CustomAudience.Field.subtype: "CUSTOM", CustomAudience.Field.description: f"Auto-generated from Velocity CRM — {len(hashed_emails)} leads", "customer_file_source": "USER_PROVIDED_ONLY", }) audience_id = custom_audience["id"] # Add users via hashed emails custom_audience.create_users_replace(params={ "payload": { "schema": ["EMAIL_SHA256"], "data": [[h] for h in hashed_emails], } }) # ── Step 4: Create Lookalike Audience ── lookalike = account.create_lookalike_audience(params={ "name": f"Velocity Lookalike — {payload.crm_filter_status} ({int(payload.ratio * 100)}%)", "origin_audience_id": audience_id, "lookalike_spec": { "type": "similarity", "ratio": payload.ratio, "country": payload.country, }, }) # Broadcast live event if hasattr(request.app.state, "broadcast_live_event"): await request.app.state.broadcast_live_event( "create", f"Created Lookalike Audience from {len(hashed_emails)} CRM Closed/Won leads.", None, f"+{len(hashed_emails):,} leads", ) return _ok({ "custom_audience_id": audience_id, "lookalike_audience_id": lookalike["id"], "leads_processed": len(hashed_emails), "country": payload.country, "ratio": payload.ratio, }) except Exception as exc: logger.error("Audience creation failed: %s", exc) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) # ── 5. POST /auth/meta ──────────────────────────────────────────────────────── @router.post("/auth/meta", summary="Exchange short-lived token for System User token") async def meta_oauth(payload: MetaAuthRequest) -> dict: """ Exchanges a short-lived Meta user token for a long-lived token using the `/oauth/access_token` endpoint, then stores it in Supabase for persistence. Requires: META_APP_ID, META_APP_SECRET """ import httpx app_id = os.getenv("META_APP_ID", "") app_secret = os.getenv("META_APP_SECRET", "") api_ver = os.getenv("META_API_VERSION", "v21.0") if app_id.startswith("PLACEHOLDER") or app_secret.startswith("PLACEHOLDER"): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="META_APP_ID or META_APP_SECRET not configured.", ) url = f"https://graph.facebook.com/{api_ver}/oauth/access_token" params = { "grant_type": "fb_exchange_token", "client_id": app_id, "client_secret": app_secret, "fb_exchange_token": payload.short_lived_token, } async with httpx.AsyncClient() as client: resp = await client.get(url, params=params, timeout=15.0) if resp.status_code != 200: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Meta OAuth error: {resp.text}", ) token_data = resp.json() long_lived_token = token_data.get("access_token") if not long_lived_token: raise HTTPException(status_code=502, detail="No access_token in Meta response.") # Persist to Supabase (best-effort — don't block on failure) try: supabase = _get_supabase() supabase.table("catalyst_settings").upsert({ "key": "META_ACCESS_TOKEN", "value": long_lived_token, "updated_at": datetime.utcnow().isoformat(), }).execute() except Exception as exc: logger.warning("Could not persist Meta token to Supabase: %s", exc) return _ok({ "access_token": long_lived_token, "token_type": token_data.get("token_type", "bearer"), "expires_in": token_data.get("expires_in"), })