feat: Added the ComfyUI engine #12

Merged
sayan merged 1 commits from sayan/Project_Velocity:feat/#11 into main 2026-03-27 22:48:36 +05:30
74 changed files with 9390 additions and 7119 deletions

View File

@@ -0,0 +1,91 @@
### **Part 1: First Principles Page Structure Analysis**
*If "The Catalyst" is an autonomous digital marketing agency, it cannot look like a standard Facebook Ads Manager spreadsheet. It must feel like an executive command center.*
Here is the optimized page architecture for the WebOS:
**1\. The Studio (Visual AI Engine)**
* **Purpose:** Where your ComfyUI workflows (Wan 2.2 and Qwen-Image 2512\) live on the frontend.
* **Function:** Users input raw property data/images, and the system generates variations of cinematic videos and multi-lingual posters.
* **Meta Link:** Automatically pushes these assets to the Meta Asset Library via API.
**2\. Campaign Command (Control Surface)**
* **Purpose:** The strategic deployment center.
* **Function:** Instead of manually setting up ads, the user defines the "Objective" (e.g., "Sell 3BHKs fast") and "Budget". The Claw Agent uses the **Meta Marketing API** to execute Ad Campaign Management & Automation, handling ad sets, bulk creation, and dynamic creative rotation using the assets generated in The Studio.
**3\. Intelligence & ROI (Analytics Dashboard)**
* **Purpose:** Real-time visual justification of marketing spend.
* **Function:** Uses the **Ads Insights API** to visualize CPA (Cost Per Acquisition) and ROI. Features a "Live Optimization" log showing what the Claw Agent is doing in real-time (e.g., *"Agent paused Ad Set B due to high CPA. Shifted budget to Ad Set A."*).
**4\. The War Room (Market Research & Audiences)**
* **Purpose:** AI-driven competitor and audience strategy.
* **Function:** Integrates the **Ad Library API** to spy on competitor ads (e.g., Emaar/Damac) and uses the completed Brave Search integration to track real estate trends. Connects directly to the **Audience Management API** to build Custom/Lookalike audiences dynamically from the Supabase CRM leads.
**5\. System & Meta Graph (Settings)**
* **Purpose:** The plumbing.
* **Function:** Uses the **Meta Business API** for Client Onboarding, Business Asset Management (linking WhatsApp, Instagram, FB Pages), and Permissions Management.
---
### **Part 2: Software Requirements Specification (SRS)**
#### **A. Tech Stack & State Management**
* **Frontend (Sayan):** Next JS (Latest Version), Tailwind CSS, Framer Motion, Recharts, Zustand (for `useMarketingStore`).
* **Backend (Sayan):** Python (FastAPI) utilizing the official `facebook-business` Python SDK.
* **Visual Generation (Sagnik):** ComfyUI API mode running locally, utilizing Flux-Schnell/Qwen-Image 2512 (Images) and Wan 2.2 14B/1.3B (Video).
* **Autonomous Logic (Sourik/Sayan):** PicoClaw/IronClaw agent triggered via FastAPI endpoints to manage bidding and rotation.
#### **B. Core API Mappings (Meta to FastAPI)**
Sayan must build these specific Python routes:
* `POST /api/catalyst/campaigns/create`: Triggers Meta Marketing API to build campaigns in bulk.
* `POST /api/catalyst/creative/sync`: Uploads ComfyUI outputs (Wan 2.2 mp4s / Qwen pngs) directly to the Meta Asset Manager.
* `GET /api/catalyst/insights/realtime`: Polls Meta Ads Insights API for click-through rates, CPA, and spend limits.
* `POST /api/catalyst/audiences/lookalike`: Pushes "Qualified" or "Whale" leads from your CRM (Supabase) into Meta to create High-Net-Worth Lookalike Audiences.
---
### **Part 3: Scientific Breakdown of Tasks (Sprint Plan)**
Here are the actionable tasks for you and Sayan to drop directly into your Taiga board for "The Catalyst" module.
#### **👑 Sagniks Tasks (Visual AI & Architecture)**
1. **Expose ComfyUI Endpoints for Catalyst Integration:**
* **Action:** Ensure the `catalyst_poster_qwen.json` (Qwen-Image 2512\) and `cinematic_wan22_14b.json` (Wan 2.2) workflows are exposed via the ComfyUI Asynchronous Queue API.
* **Output:** Provide Sayan with the exact JSON payload structures required to trigger these renders programmatically from the WebOS.
2. **Define Agentic System Prompts for Dynamic Creative:**
* **Action:** Write the system prompts for the Claw agent. Instruct the agent on how to prompt Qwen-Image 2512 for A/B testing (e.g., "Generate Image A with English typography, Image B with Arabic typography").
#### **💻 Sayans Tasks (Full-Stack & Meta API)**
1. **Build the Catalyst Zustand Store & Layout (`app/src/components/modules/Catalyst.tsx`):**
* **Action:** Create `useMarketingStore.ts` in Zustand to manage `campaigns[]`, `activeAssets[]`, and `adInsights{}`.
* **UI Build:** Construct the 4-column Bento grid for the Analytics Dashboard using Recharts (similar to the LeadVelocityChart in the Dashboard).
2. **Meta Business API \- Auth & Settings Interface:**
* **Action:** Build the Settings & API entry fields using `DarkInput` and `GhostButton` components.
* **Backend:** Implement the OAuth flow in FastAPI to acquire and securely store the Meta System User Access Token in Supabase.
3. **Meta Marketing API \- The Integration Engine (`backend/api/routes_catalyst.py`):**
* **Action:** Write the Python wrappers for the Meta Ads API.
* **Function 1:** Implement **Audience Management**—write a script that automatically queries the Supabase CRM for leads tagged "Closed/Won" and pushes their hashed emails to Meta to update a Custom Audience.
* **Function 2:** Implement **Dynamic Creative Rotation**—write an endpoint that accepts Sagnik's ComfyUI image/video URLs, uploads them to Meta's Ad Library, and assigns them to an active Ad Set for A/B testing.
4. **Design the UI/UX Components:**
* **Action:** Design the frontend component states for "Asset Generation". Use "Apple/Steve Jobs" aesthetic (glassmorphism, `backdrop-blur-xl`).
* **Detail:** Must include a visual "Generation Queue" that uses latency-hiding (e.g., shimmering loading states that say *"Wan 2.2 is rendering cinematic lighting..."*).
5. **Real-Time Optimization Dashboard (WebOS):**
* **Action:** Create a `<LiveOptimizationFeed />` component using Framer Motion (`AnimatePresence`).
* **Details:** It should stream WebSocket updates from the backend showing what the autonomous bot is doing (e.g., *"Agent Sourik paused Campaign X due to CPA \> $50"*).
### **First-Principles Summary for Development**
By structuring the Meta API exactly like this, you are not building an Ads Manager. You are building an **Autonomous Deployment Engine**. Sagnik's models (Wan/Qwen) act as the infinite creative agency, Sayan's React/FastAPI stack acts as the pipes, and Sourik's Claw agent acts as the media buyer turning those pipes on and off based on the real-time Meta Insights data.

40
backend/.env.example Normal file
View File

@@ -0,0 +1,40 @@
# ── Meta Graph API ────────────────────────────────────────────────────────────
# Long-lived System User Access Token from Meta Business Manager
# Business Settings → System Users → Generate Token
META_ACCESS_TOKEN=PLACEHOLDER_your_meta_system_user_token
# Ad Account ID — format: act_XXXXXXXXXX
# Meta Business Manager → Ad Accounts
META_AD_ACCOUNT_ID=PLACEHOLDER_act_1234567890
# Business Portfolio ID
# Meta Business Settings → Business Info → Business ID
META_BUSINESS_ID=PLACEHOLDER_1234567890
# App ID & Secret — from Meta Developers → Your App → Basic Settings
META_APP_ID=PLACEHOLDER_9876543210
META_APP_SECRET=PLACEHOLDER_your_app_secret
# API Version (use latest stable)
META_API_VERSION=v21.0
# ── Supabase (CRM) ────────────────────────────────────────────────────────────
# Project URL from Supabase Dashboard → Settings → API
SUPABASE_URL=PLACEHOLDER_https://xxxxxxxxxxx.supabase.co
# Anon/Public key (for server-side reads)
SUPABASE_ANON_KEY=PLACEHOLDER_your_supabase_anon_key
# Service Role key (for elevated writes — keep secret!)
SUPABASE_SERVICE_ROLE_KEY=PLACEHOLDER_your_supabase_service_role_key
# ── ComfyUI ───────────────────────────────────────────────────────────────────
# Base URL of ComfyUI server running locally or on GPU node
COMFY_BASE_URL=http://localhost:8188
# ── Backend ───────────────────────────────────────────────────────────────────
# CORS origins — comma-separated list of allowed frontend origins
CORS_ORIGINS=http://localhost:5173,http://localhost:3000
# Secret key for signing internal JWTs/sessions
SECRET_KEY=PLACEHOLDER_generate_with_openssl_rand_hex_32

View File

@@ -0,0 +1,435 @@
"""
routes_catalyst.py
Meta Marketing API wrappers for The Catalyst module.
Routes:
POST /api/catalyst/campaigns/create — Bulk campaign creation
POST /api/catalyst/creative/sync — Upload ComfyUI assets to Meta
GET /api/catalyst/insights/realtime — Poll Ads Insights API
POST /api/catalyst/audiences/lookalike — Push CRM leads → Meta Custom Audience
POST /api/catalyst/auth/meta — OAuth token acquisition
"""
import os
import hashlib
import logging
from typing import Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, Request, status
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_sdk() -> tuple[Any, str]:
"""
Initialise the facebook-business SDK lazily.
Returns (FacebookAdsApi instance, ad_account_id).
Raises HTTPException 503 if credentials are missing or SDK init fails.
"""
try:
from facebook_business.api import FacebookAdsApi # type: ignore
access_token = os.getenv("META_ACCESS_TOKEN", "")
app_id = os.getenv("META_APP_ID", "")
app_secret = os.getenv("META_APP_SECRET", "")
account_id = os.getenv("META_AD_ACCOUNT_ID", "")
if not access_token or access_token.startswith("PLACEHOLDER"):
raise ValueError("META_ACCESS_TOKEN is not configured.")
if not account_id or account_id.startswith("PLACEHOLDER"):
raise ValueError("META_AD_ACCOUNT_ID is not configured.")
FacebookAdsApi.init(app_id, app_secret, access_token)
return FacebookAdsApi.get_default_api(), account_id
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="facebook-business SDK not installed. Run: pip install facebook-business",
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
)
def _get_supabase():
"""Initialise the Supabase client lazily."""
try:
from supabase import create_client # type: ignore
url = os.getenv("SUPABASE_URL", "")
key = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "")
if not url or url.startswith("PLACEHOLDER"):
raise ValueError("SUPABASE_URL is not configured.")
return create_client(url, key)
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="supabase SDK not installed. Run: pip install supabase",
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
)
def _ok(data: Any, meta: dict | None = None) -> dict:
return {"status": "ok", "data": data, "meta": meta or {}}
def _sha256_hash(value: str) -> str:
"""SHA-256 hash an email for Meta's hashed audience upload."""
return hashlib.sha256(value.strip().lower().encode()).hexdigest()
# ── Request / Response Models ─────────────────────────────────────────────────
class CampaignCreateRequest(BaseModel):
name: str = Field(..., description="Campaign display name")
objective: str = Field("OUTCOME_LEADS", description="Meta campaign objective enum")
budget_daily: int = Field(..., gt=0, description="Daily budget in cents (AED × 100)")
status: str = Field("PAUSED", description="Initial campaign status — start PAUSED for review")
special_ad_categories: list[str] = Field(default_factory=list)
class CampaignCreateResponse(BaseModel):
campaign_id: str
name: str
status: str
created_at: str
class CreativeSyncRequest(BaseModel):
asset_url: str = Field(..., description="Public URL of the ComfyUI-rendered image or video")
asset_name: str = Field(..., description="Human-readable asset name")
asset_type: str = Field(..., description="'image' or 'video'")
ad_account_id: str | None = Field(None, description="Override ad account ID (optional)")
class LookalikeAudienceRequest(BaseModel):
country: str = Field("AE", description="ISO 3166-1 alpha-2 country code for lookalike")
ratio: float = Field(0.01, ge=0.01, le=0.20, description="Lookalike ratio (1%20%)")
crm_filter_status: str = Field("Closed/Won", description="Supabase lead status to filter on")
class MetaAuthRequest(BaseModel):
short_lived_token: str = Field(..., description="Short-lived user access token from Meta OAuth")
# ── 1. POST /campaigns/create ─────────────────────────────────────────────────
@router.post("/campaigns/create", summary="Bulk-create Meta Marketing campaigns")
async def create_campaigns(
request: Request,
payload: CampaignCreateRequest,
) -> dict:
"""
Triggers `facebook_business.adobjects.campaign.Campaign` to create a campaign
under the configured Ad Account.
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
"""
_api, account_id = _get_sdk()
try:
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
from facebook_business.adobjects.campaign import Campaign # type: ignore
account = AdAccount(account_id)
params = {
Campaign.Field.name: payload.name,
Campaign.Field.objective: payload.objective,
Campaign.Field.status: payload.status,
Campaign.Field.daily_budget: payload.budget_daily,
Campaign.Field.special_ad_categories: payload.special_ad_categories,
}
campaign = account.create_campaign(params=params)
# Broadcast live event via WebSocket
if hasattr(request.app.state, "broadcast_live_event"):
await request.app.state.broadcast_live_event(
"create",
f"Created campaign '{payload.name}' (objective: {payload.objective}).",
payload.name,
f"Budget: AED {payload.budget_daily / 100:.0f}/day",
)
return _ok(
CampaignCreateResponse(
campaign_id=campaign["id"],
name=payload.name,
status=payload.status,
created_at=datetime.utcnow().isoformat(),
).model_dump(),
meta={"account_id": account_id},
)
except Exception as exc:
logger.error("Campaign creation failed: %s", exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
# ── 2. POST /creative/sync ────────────────────────────────────────────────────
@router.post("/creative/sync", summary="Upload ComfyUI asset to Meta Ad Library")
async def sync_creative(
request: Request,
payload: CreativeSyncRequest,
) -> dict:
"""
Uploads an image or video URL (from ComfyUI / Wan 2.2 / Qwen-Image 2512) to
the Meta Ad Library (Creative Hub) and returns the Meta Asset ID.
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
"""
_api, account_id = _get_sdk()
account_id = payload.ad_account_id or account_id
try:
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
from facebook_business.adobjects.advideo import AdVideo # type: ignore
from facebook_business.adobjects.adimage import AdImage # type: ignore
account = AdAccount(account_id)
if payload.asset_type == "video":
# Video upload via file_url
result = account.create_ad_video(params={
AdVideo.Field.name: payload.asset_name,
AdVideo.Field.file_url: payload.asset_url,
})
meta_asset_id = result["id"]
else:
# Image upload via url
result = account.create_ad_image(params={
"filename": payload.asset_name,
"url": payload.asset_url,
})
# AdImage returns a hash dict — extract hash key
meta_asset_id = list(result["images"].values())[0]["hash"] \
if "images" in result else result.get("id", "unknown")
return _ok({
"meta_asset_id": meta_asset_id,
"asset_name": payload.asset_name,
"asset_type": payload.asset_type,
"uploaded_at": datetime.utcnow().isoformat(),
})
except Exception as exc:
logger.error("Creative sync failed: %s", exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
# ── 3. GET /insights/realtime ─────────────────────────────────────────────────
@router.get("/insights/realtime", summary="Poll Meta Ads Insights API")
async def get_realtime_insights(
date_preset: str = "last_7_days",
level: str = "adset",
) -> dict:
"""
Polls `AdAccount.get_insights()` for CTR, CPA, spend, impressions across Ad Sets.
Supports `date_preset` (e.g. 'today', 'last_7_days', 'last_30_days') and
`level` ('campaign', 'adset', 'ad').
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
"""
_api, account_id = _get_sdk()
try:
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
from facebook_business.adobjects.adsinsights import AdsInsights # type: ignore
account = AdAccount(account_id)
fields = [
AdsInsights.Field.campaign_name,
AdsInsights.Field.adset_name,
AdsInsights.Field.spend,
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.ctr,
AdsInsights.Field.cpp, # cost per purchase (proxy for CPA)
AdsInsights.Field.date_start,
AdsInsights.Field.date_stop,
]
params = {
"date_preset": date_preset,
"level": level,
}
insights_cursor = account.get_insights(fields=fields, params=params)
results = [dict(row) for row in insights_cursor]
return _ok(results, meta={
"account_id": account_id,
"date_preset": date_preset,
"level": level,
"count": len(results),
})
except Exception as exc:
logger.error("Insights fetch failed: %s", exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
# ── 4. POST /audiences/lookalike ──────────────────────────────────────────────
@router.post("/audiences/lookalike", summary="Push Supabase CRM leads → Meta Lookalike Audience")
async def create_lookalike_audience(
request: Request,
payload: LookalikeAudienceRequest,
) -> dict:
"""
1. Queries the Supabase `leads` table for rows matching `status = payload.crm_filter_status`.
2. SHA-256 hashes their email addresses.
3. Creates (or updates) a Meta Custom Audience with the hashed emails.
4. Creates a Lookalike Audience from that Custom Audience.
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID, SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY
"""
_api, account_id = _get_sdk()
supabase = _get_supabase()
# ── Step 1: Fetch qualified leads from Supabase CRM ──
try:
response = supabase.table("leads") \
.select("id, email, name") \
.eq("status", payload.crm_filter_status) \
.execute()
leads = response.data or []
except Exception as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Supabase query failed: {exc}")
if not leads:
return _ok({"message": f"No leads found with status '{payload.crm_filter_status}'."})
# ── Step 2: Hash emails ──
hashed_emails = [
_sha256_hash(lead["email"])
for lead in leads
if lead.get("email")
]
if not hashed_emails:
raise HTTPException(status_code=422, detail="No valid email addresses found in the filtered leads.")
# ── Step 3: Create / update Meta Custom Audience ──
try:
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
from facebook_business.adobjects.customaudience import CustomAudience # type: ignore
account = AdAccount(account_id)
audience_name = f"Velocity CRM — {payload.crm_filter_status} Leads"
# Create custom audience
custom_audience = account.create_custom_audience(params={
CustomAudience.Field.name: audience_name,
CustomAudience.Field.subtype: "CUSTOM",
CustomAudience.Field.description: f"Auto-generated from Velocity CRM — {len(hashed_emails)} leads",
"customer_file_source": "USER_PROVIDED_ONLY",
})
audience_id = custom_audience["id"]
# Add users via hashed emails
custom_audience.create_users_replace(params={
"payload": {
"schema": ["EMAIL_SHA256"],
"data": [[h] for h in hashed_emails],
}
})
# ── Step 4: Create Lookalike Audience ──
lookalike = account.create_lookalike_audience(params={
"name": f"Velocity Lookalike — {payload.crm_filter_status} ({int(payload.ratio * 100)}%)",
"origin_audience_id": audience_id,
"lookalike_spec": {
"type": "similarity",
"ratio": payload.ratio,
"country": payload.country,
},
})
# Broadcast live event
if hasattr(request.app.state, "broadcast_live_event"):
await request.app.state.broadcast_live_event(
"create",
f"Created Lookalike Audience from {len(hashed_emails)} CRM Closed/Won leads.",
None,
f"+{len(hashed_emails):,} leads",
)
return _ok({
"custom_audience_id": audience_id,
"lookalike_audience_id": lookalike["id"],
"leads_processed": len(hashed_emails),
"country": payload.country,
"ratio": payload.ratio,
})
except Exception as exc:
logger.error("Audience creation failed: %s", exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
# ── 5. POST /auth/meta ────────────────────────────────────────────────────────
@router.post("/auth/meta", summary="Exchange short-lived token for System User token")
async def meta_oauth(payload: MetaAuthRequest) -> dict:
"""
Exchanges a short-lived Meta user token for a long-lived token using the
`/oauth/access_token` endpoint, then stores it in Supabase for persistence.
Requires: META_APP_ID, META_APP_SECRET
"""
import httpx
app_id = os.getenv("META_APP_ID", "")
app_secret = os.getenv("META_APP_SECRET", "")
api_ver = os.getenv("META_API_VERSION", "v21.0")
if app_id.startswith("PLACEHOLDER") or app_secret.startswith("PLACEHOLDER"):
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="META_APP_ID or META_APP_SECRET not configured.",
)
url = f"https://graph.facebook.com/{api_ver}/oauth/access_token"
params = {
"grant_type": "fb_exchange_token",
"client_id": app_id,
"client_secret": app_secret,
"fb_exchange_token": payload.short_lived_token,
}
async with httpx.AsyncClient() as client:
resp = await client.get(url, params=params, timeout=15.0)
if resp.status_code != 200:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Meta OAuth error: {resp.text}",
)
token_data = resp.json()
long_lived_token = token_data.get("access_token")
if not long_lived_token:
raise HTTPException(status_code=502, detail="No access_token in Meta response.")
# Persist to Supabase (best-effort — don't block on failure)
try:
supabase = _get_supabase()
supabase.table("catalyst_settings").upsert({
"key": "META_ACCESS_TOKEN",
"value": long_lived_token,
"updated_at": datetime.utcnow().isoformat(),
}).execute()
except Exception as exc:
logger.warning("Could not persist Meta token to Supabase: %s", exc)
return _ok({
"access_token": long_lived_token,
"token_type": token_data.get("token_type", "bearer"),
"expires_in": token_data.get("expires_in"),
})

View File

@@ -0,0 +1,115 @@
"""
The Catalyst — FastAPI Backend
Autonomous Digital Marketing Agency powered by Meta Marketing API.
"""
import os
import json
import asyncio
from datetime import datetime
from typing import Set
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from api.routes_catalyst import router as catalyst_router
load_dotenv()
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Velocity — Catalyst Backend",
description="Meta Marketing API integration for autonomous campaign management.",
version="1.0.0",
)
# ── CORS ──────────────────────────────────────────────────────────────────────
origins = [o.strip() for o in os.getenv("CORS_ORIGINS", "http://localhost:5173").split(",")]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Routers ───────────────────────────────────────────────────────────────────
app.include_router(catalyst_router, prefix="/api/catalyst", tags=["Catalyst"])
# ── WebSocket — Live Optimization Feed ────────────────────────────────────────
class ConnectionManager:
"""Manages active WebSocket connections for live optimization broadcasts."""
def __init__(self) -> None:
self.active: Set[WebSocket] = set()
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
self.active.add(ws)
def disconnect(self, ws: WebSocket) -> None:
self.active.discard(ws)
async def broadcast(self, payload: dict) -> None:
dead: Set[WebSocket] = set()
for ws in self.active:
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.add(ws)
self.active -= dead
manager = ConnectionManager()
@app.websocket("/ws/catalyst")
async def websocket_endpoint(ws: WebSocket) -> None:
"""
WebSocket endpoint for streaming live Claw Agent optimization events.
Clients connect from <LiveOptimizationFeed /> in Catalyst.tsx.
"""
await manager.connect(ws)
try:
while True:
# Keep-alive: wait for any incoming ping/message
data = await ws.receive_text()
# Echo back as acknowledgment (clients may send heartbeat pings)
await ws.send_text(json.dumps({"type": "ack", "data": data}))
except WebSocketDisconnect:
manager.disconnect(ws)
# ── Helper: broadcast a live event (called from routes after API mutations) ───
async def broadcast_live_event(
event_type: str,
message: str,
campaign_name: str | None = None,
value: str | None = None,
) -> None:
payload = {
"type": event_type,
"message": message,
"campaignName": campaign_name,
"value": value,
"timestamp": datetime.utcnow().isoformat(),
}
await manager.broadcast(payload)
# Attach broadcaster so routes can call it
app.state.broadcast_live_event = broadcast_live_event
# ── Health check ──────────────────────────────────────────────────────────────
@app.get("/health", tags=["Health"])
async def health() -> dict:
return {"status": "ok", "service": "catalyst-backend", "version": "1.0.0"}

View File

@@ -0,0 +1,8 @@
fastapi>=0.115.0
uvicorn[standard]>=0.32.0
facebook-business>=21.0.0
supabase>=2.10.0
python-dotenv>=1.0.0
httpx>=0.27.0
pydantic>=2.9.0
python-multipart>=0.0.12

View File

@@ -0,0 +1,159 @@
import os
import requests
import time
import json
from pathlib import Path
# Config
GATEWAY_URL = "http://54.91.19.60:8082" # Active IP
INPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Abantika Test Sample")
OUTPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Abantika Test Samples - Test 2")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# 4 Styles requested by user
STYLES = [
{
"id": "gothic_industrial",
"keywords": "Gothic, Indusrial Design, Black, Wood Textures and Accents, Bare Metal Edison Bulbs, Red Silk",
"denoise": 0.75
},
{
"id": "greek_minimal",
"keywords": "Greek Aesthetic, Minimal, Grand, White, Deep Blue Silk, Emral Green Marbel",
"denoise": 0.75
},
{
"id": "turkish_vintage",
"keywords": "Turkish Interioir, Mosaic Work, Vintage, Intricate Work, Royal",
"denoise": 0.75
},
{
"id": "bali_modern",
"keywords": "Bali Aesthetic, Modern, Minimal, Stone, Live Indoor House Plants, Indonesian Suar wood (Samanea saman)",
"denoise": 0.75
}
]
def map_filename_to_room_type(filename: str) -> str:
name = filename.lower()
if "bed-room" in name or "bedroom" in name or "bed" in name:
return "bedroom"
elif "bath-room" in name or "bathroom" in name or "bath" in name:
return "bathroom"
elif "kitchen" in name:
return "kitchen"
elif "dining" in name:
return "dining_room"
elif "balcony" in name:
return "balcony"
return "living_room" # default
def run_job(image_path: Path, style_cfg: dict):
room_type = map_filename_to_room_type(image_path.name)
style_id = style_cfg["id"]
keywords = style_cfg["keywords"]
print(f"\n--- Processing {image_path.name} with style {style_id.upper()} ({room_type}) ---")
# Check if already processed
img_out = OUTPUT_DIR / f"{image_path.stem}_{style_id}.png"
if img_out.exists():
print(f"[{style_id}] Already processed {img_out.name}, skipping.")
return
# 1. Start generation
try:
with open(image_path, "rb") as f:
files = {"image": (image_path.name, f, "image/jpeg")}
data = {
"keywords": keywords,
"room_type": room_type,
"denoise": style_cfg["denoise"]
}
res = requests.post(f"{GATEWAY_URL}/dream-weaver", files=files, data=data, timeout=180)
res.raise_for_status()
job_data = res.json()
job_id = job_data["job_id"]
prompt_preview = job_data.get("prompt_preview", "")
print(f"[{style_id}] Job submitted: {job_id}")
print(f"[{style_id}] Prompt Preview: {prompt_preview[:150]}...")
except requests.exceptions.RequestException as e:
print(f"Failed to submit {image_path.name}: {e}")
return
# 2. Poll status
status_url = f"{GATEWAY_URL}/dream-weaver/status/{job_id}"
ready = False
# Poll for up to 6 minutes
s_data = {}
for i in range(180):
time.sleep(2)
try:
s_res = requests.get(status_url, timeout=10)
if s_res.status_code == 200:
s_data = s_res.json()
if s_data.get("ready"):
ready = True
break
elif s_data.get("status") == "error":
print(f"Job failed: {s_data.get('error')}")
return
except requests.exceptions.RequestException as e:
print(f"Polling error: {e}")
if not ready:
print(f"Timeout waiting for job {job_id}")
return
# 3. Download image and save prompt
print(f"[{style_id}] Job ready, downloading...")
# Save prompt data locally
prompt_file = OUTPUT_DIR / f"{image_path.stem}_{style_id}_prompt.json"
with open(prompt_file, "w") as pf:
json.dump(s_data, pf, indent=2)
result_url = f"{GATEWAY_URL}/dream-weaver/result/{job_id}"
try:
r_res = requests.get(result_url, stream=True, timeout=60)
r_res.raise_for_status()
with open(img_out, "wb") as f:
for chunk in r_res.iter_content(chunk_size=8192):
f.write(chunk)
print(f"[{style_id}] Saved {img_out.name}")
except requests.exceptions.RequestException as e:
print(f"Failed to download result for {job_id}: {e}")
def main():
# Wait for gateway and Ollama to be fully ready
print(f"Checking Gateway at {GATEWAY_URL}/health...")
for _ in range(3):
try:
h = requests.get(f"{GATEWAY_URL}/health", timeout=5)
if h.status_code == 200:
print("Gateway is UP.")
break
except requests.exceptions.RequestException:
print("Waiting for gateway...")
time.sleep(5)
else:
print("Gateway failed to answer health check. Exiting.")
return
# Get images
target_imgs = sorted([f for f in INPUT_DIR.iterdir() if f.suffix.lower() in [".jpg", ".png", ".jpeg"]])
print(f"Found {len(target_imgs)} target images.")
for img_path in target_imgs:
for style in STYLES:
run_job(img_path, style)
time.sleep(1) # Small pause between submissions
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
"""
Dream Weaver API Gateway v2 — Dynamic Keyword → Local LLM → ComfyUI Pipeline
========================================================================
@@ -15,7 +15,7 @@ Environment variables:
OLLAMA_URL — Ollama server (default: http://localhost:11434)
OLLAMA_MODEL — Model name (default: qwen3.5:27b)
"""
import asyncio, json, time, uuid, io, sys, os, logging
import asyncio, json, time, uuid, io, sys, os, logging, traceback
from pathlib import Path
from typing import Optional, List
@@ -31,7 +31,7 @@ SCRIPTS_DIR = Path(__file__).parent / "scripts"
sys.path.insert(0, str(SCRIPTS_DIR))
try:
from prompt_expander import expand_prompt, expand_prompt_simple, ROOM_CONTEXTS, ExpandedPrompt
from prompt_expander import expand_prompt, ROOM_CONTEXTS, ExpandedPrompt
LLM_AVAILABLE = True
except ImportError:
LLM_AVAILABLE = False
@@ -40,7 +40,7 @@ except ImportError:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("DreamWeaverGateway")
COMFY = "http://127.0.0.1:8188"
COMFY = "http://127.0.0.1:8118"
COMFY_ROOT = "/opt/dlami/nvme/ComfyUI"
app = FastAPI(
@@ -210,7 +210,7 @@ async def expand_endpoint(req: ExpandRequest):
additional_notes=req.additional_notes
)
else:
result = expand_prompt_simple(req.keywords, req.room_type)
raise RuntimeError("Local LLM model is not available or disabled.")
except Exception as e:
logger.error(f"Prompt expansion failed: {e}")
raise HTTPException(status_code=500, detail=f"LLM expansion failed: {str(e)}")
@@ -291,7 +291,7 @@ async def dream_weaver(
additional_notes=additional_notes
)
else:
expanded = expand_prompt_simple(kw_list, room_type)
raise HTTPException(status_code=500, detail="LLM model is not available or disabled.")
# Apply manual overrides if provided
if denoise > 0:
@@ -335,6 +335,7 @@ async def dream_weaver(
except Exception as e:
jobs[job_id] = {"status": "error", "error": str(e)}
logger.error(f"Generation failed: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@@ -396,8 +397,10 @@ async def dream_weaver_sync(
expanded = _P()
elif keywords:
kw_list = [k.strip() for k in keywords.split(",") if k.strip()]
expanded = (expand_prompt(kw_list, room_type, additional_notes)
if LLM_AVAILABLE else expand_prompt_simple(kw_list, room_type))
if LLM_AVAILABLE:
expanded = expand_prompt(kw_list, room_type, additional_notes)
else:
raise RuntimeError("Local LLM model is not available or disabled.")
else:
raise HTTPException(status_code=400, detail="Provide keywords or custom_positive")

View File

@@ -96,7 +96,7 @@ Generate JSON containing:
1. "positive_prompt" (rich, photorealistic, 80-120 words)
2. "negative_prompt" (preventing artifacts, 30-50 words)
3. "cfg" (float 6.0-9.0)
4. "denoise" (float 0.5-0.85)
4. "denoise" (float 0.45-0.65) - CRITICAL: Must be kept low to preserve input image structure
5. "steps" (int 25-40)
RULES FOR POSITIVE PROMPT:
@@ -144,7 +144,8 @@ def _call_ollama(user_message: str) -> str:
timeout=180 # Large models take time
)
r.raise_for_status()
return r.json()["response"]
resp_json = r.json()
return resp_json["response"]
def expand_prompt(keywords: list[str], room_type: str = "living_room", additional_notes: str = "") -> ExpandedPrompt:
@@ -166,38 +167,68 @@ AVOID: {ctx['avoid']}
logger.info("Calling local Ollama LLM...")
raw = _call_ollama(user_message).strip()
json_match = re.search(r'\{[\s\S]*\}', raw)
if json_match:
raw_json = json_match.group(0)
else:
raw_json = raw
# Log the raw response for debugging
logger.info(f"Raw Ollama response length: {len(raw)}")
data = json.loads(raw_json)
# Handle empty response
if not raw:
logger.error("Empty response from Ollama")
raise ValueError("Ollama returned an empty response")
# Clean string of common junk (control characters, leading/trailing non-bracket junk)
raw_cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', raw)
# More robust JSON block extraction
# Try finding the first '{' and last '}'
start_idx = raw_cleaned.find('{')
end_idx = raw_cleaned.rfind('}')
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
raw_json = raw_cleaned[start_idx:end_idx+1]
else:
raw_json = raw_cleaned
try:
data = json.loads(raw_json)
except json.JSONDecodeError as je:
logger.error(f"JSON Decode failed. Raw tail: {raw_json[:100]}...")
# Emergency fallback: if we can't parse, try to create a basic structure from keywords
return ExpandedPrompt(
style_name="fallback_" + (keywords[0] if keywords else "custom"),
positive_prompt=", ".join(keywords) + f", photorealistic, high quality, {room_type}",
negative_prompt="blurry, distorted, low quality",
cfg=7.5,
denoise=0.55,
steps=30,
reasoning="Fallback due to LLM parsing error",
source="fallback"
)
return ExpandedPrompt(
style_name=data.get("style_name", "custom_local"),
positive_prompt=data["positive_prompt"],
negative_prompt=data["negative_prompt"],
positive_prompt=data.get("positive_prompt", ", ".join(keywords)),
negative_prompt=data.get("negative_prompt", "blurry, distorted, low quality"),
cfg=float(data.get("cfg", 7.5)),
denoise=float(data.get("denoise", 0.72)),
denoise=float(data.get("denoise", 0.55)),
steps=int(data.get("steps", 30)),
reasoning=data.get("reasoning", ""),
source="ollama_local"
)
except Exception as e:
logger.warning(f"Ollama failed, using sync fallback: {e}")
return expand_prompt_simple(keywords, room_type)
def expand_prompt_simple(keywords: list[str], room_type: str = "living_room") -> ExpandedPrompt:
ctx = ROOM_CONTEXTS.get(room_type.replace(" ", "_"), ROOM_CONTEXTS["living_room"])
kw_str = ", ".join(keywords)
positive = f"{kw_str} interior design, {', '.join(ctx['key_elements'][:4])}, photorealistic {room_type.replace('_', ' ')} interior, architectural photography, 8k resolution, photorealistic"
negative = "(worst quality, low quality, illustration, 3d render, 2d, painting, cartoon, sketch), blurry, distorted, extra windows, unrealistic lighting, structural changes"
return ExpandedPrompt(
style_name="fallback", positive_prompt=positive, negative_prompt=negative,
cfg=7.5, denoise=0.72, steps=30, reasoning="No LLM", source="fallback"
)
logger.error(f"Ollama LLM expansion failed: {e}")
import traceback
traceback.print_exc()
# Full fallback if anything goes wrong
return ExpandedPrompt(
style_name="emergency_fallback",
positive_prompt=", ".join(keywords) + f", photorealistic, {room_type}",
negative_prompt="blurry, distorted",
cfg=7.5,
denoise=0.55,
steps=30,
reasoning=f"Emergency fallback due to: {str(e)}",
source="emergency"
)
if __name__ == "__main__":
import sys

View File

@@ -0,0 +1,159 @@
import os
import requests
import time
import json
from pathlib import Path
# Config
GATEWAY_URL = "http://54.91.19.60:8288" # Active IP
INPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Sagnik Test Sample")
OUTPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Sagnik Test Sample")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# 4 Styles requested by user
STYLES = [
{
"id": "bali_minimal_stone",
"keywords": "Bali Aesthetic, Modern, Minimal, Stone, Live Indoor House Plants, Indonesian, Reclaimed Wood, Eco Friendly Materials, Bare Stone",
"denoise": 0.55
},
{
"id": "gothic_industrial",
"keywords": "Gothic, Indusrial Design, Black, Wood Textures and Accents, Bare Metal Edison Bulbs, Red Silk",
"denoise": 0.55
},
{
"id": "greek_grand",
"keywords": "Greek Aesthetic, Minimal, Grand, Emral Green Marbel, Gold paint highlits, Deep Blue Silk-Muslin-Soft",
"denoise": 0.55
},
{
"id": "turkish_fusion",
"keywords": "Turkish Interioir, Mosaic Work, Vintage, Intricate Work, Royal, Minimal, Fusion",
"denoise": 0.55
}
]
def map_filename_to_room_type(filename: str) -> str:
name = filename.lower()
if "bed-room" in name or "bedroom" in name or "bed" in name:
return "bedroom"
elif "bath-room" in name or "bathroom" in name or "bath" in name:
return "bathroom"
elif "kitchen" in name:
return "kitchen"
elif "dining" in name:
return "dining_room"
elif "balcony" in name:
return "balcony"
return "living_room" # default
def run_job(image_path: Path, style_cfg: dict):
room_type = map_filename_to_room_type(image_path.name)
style_id = style_cfg["id"]
keywords = style_cfg["keywords"]
print(f"\n--- Processing {image_path.name} with style {style_id.upper()} ({room_type}) ---")
# Check if already processed
img_out = OUTPUT_DIR / f"{image_path.stem}_{style_id}.png"
if img_out.exists():
print(f"[{style_id}] Already processed {img_out.name}, skipping.")
return
# 1. Start generation
try:
with open(image_path, "rb") as f:
files = {"image": (image_path.name, f, "image/jpeg")}
data = {
"keywords": keywords,
"room_type": room_type,
"denoise": style_cfg["denoise"]
}
res = requests.post(f"{GATEWAY_URL}/dream-weaver", files=files, data=data, timeout=180)
res.raise_for_status()
job_data = res.json()
job_id = job_data["job_id"]
prompt_preview = job_data.get("prompt_preview", "")
print(f"[{style_id}] Job submitted: {job_id}")
print(f"[{style_id}] Prompt Preview: {prompt_preview[:150]}...")
except requests.exceptions.RequestException as e:
print(f"Failed to submit {image_path.name}: {e}")
return
# 2. Poll status
status_url = f"{GATEWAY_URL}/dream-weaver/status/{job_id}"
ready = False
# Poll for up to 6 minutes
s_data = {}
for i in range(180):
time.sleep(2)
try:
s_res = requests.get(status_url, timeout=10)
if s_res.status_code == 200:
s_data = s_res.json()
if s_data.get("ready"):
ready = True
break
elif s_data.get("status") == "error":
print(f"Job failed: {s_data.get('error')}")
return
except requests.exceptions.RequestException as e:
print(f"Polling error: {e}")
if not ready:
print(f"Timeout waiting for job {job_id}")
return
# 3. Download image and save prompt
print(f"[{style_id}] Job ready, downloading...")
# Save prompt data locally
prompt_file = OUTPUT_DIR / f"{image_path.stem}_{style_id}_prompt.json"
with open(prompt_file, "w") as pf:
json.dump(s_data, pf, indent=2)
result_url = f"{GATEWAY_URL}/dream-weaver/result/{job_id}"
try:
r_res = requests.get(result_url, stream=True, timeout=60)
r_res.raise_for_status()
with open(img_out, "wb") as f:
for chunk in r_res.iter_content(chunk_size=8192):
f.write(chunk)
print(f"[{style_id}] Saved {img_out.name}")
except requests.exceptions.RequestException as e:
print(f"Failed to download result for {job_id}: {e}")
def main():
# Wait for gateway and Ollama to be fully ready
print(f"Checking Gateway at {GATEWAY_URL}/health...")
for _ in range(3):
try:
h = requests.get(f"{GATEWAY_URL}/health", timeout=5)
if h.status_code == 200:
print("Gateway is UP.")
break
except requests.exceptions.RequestException:
print("Waiting for gateway...")
time.sleep(5)
else:
print("Gateway failed to answer health check. Exiting.")
return
# Get images
target_imgs = sorted([f for f in INPUT_DIR.iterdir() if f.suffix.lower() in [".jpg", ".png", ".jpeg"]])
print(f"Found {len(target_imgs)} target images.")
for img_path in target_imgs:
for style in STYLES:
run_job(img_path, style)
time.sleep(1) # Small pause between submissions
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,506 @@
#!/usr/bin/env python3
"""
test_catalyst_batch.py — 5-Prompt Batch Test for Catalyst Poster Generation
============================================================================
Sends 5 distinct social media marketing poster generation requests to the
ComfyUI server running Qwen-Image-2512. Each test uses:
- A different Ground Truth image (room photo from the property)
- A Style Reference image (professional real estate marketing poster)
- A unique "Prompt Keyword" set that an end-user would type
The script demonstrates the full end-user flow:
User enters: Keywords + Ground Truth Image + Style Reference → Gets Poster
Test Matrix:
1. "luxury modern kitchen" → Kitchen photo + Orange card reference
2. "cozy master bedroom" → Bedroom photo + SOLD poster reference
3. "elegant living space" → Balcony bedroom + Magazine editorial ref
4. "premium apartment lifestyle" → Room with AC + Bellagio luxury ad ref
5. "smart home investment" → Corridor/room + Social media grid ref
Environment: ComfyUI + Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4)
"""
import os
import sys
import json
import re
import copy
import time
from pathlib import Path
from typing import Tuple, Optional, Dict, List
import requests
from PIL import Image
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CONFIGURATION
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
COMFYUI_SERVER_URL: str = "http://54.91.19.60:8118"
"""
ComfyUI server URL. Options:
- Direct (if port open via SG): http://54.91.19.60:8118
- SSH tunnel: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443
then use: http://127.0.0.1:8118
"""
BASE_DIR = Path(r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine")
INPUT_DIR = BASE_DIR / "test_inputs" / "Sagnik Test Sample New"
REF_DIR = INPUT_DIR / "Sample Reference"
OUTPUT_DIR = BASE_DIR / "test_outputs" / "catalyst_batch_results"
WORKFLOW_PATH = BASE_DIR / "workflows" / "catalyst_poster_qwen.json"
# Node IDs matching catalyst_poster_qwen.json
NODE_GROUND_TRUTH = "1"
NODE_STYLE_REF = "2"
NODE_POS_PROMPT = "9"
NODE_NEG_PROMPT = "10"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 5 TEST CASES — Each simulates what an end-user would enter
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
TEST_CASES: List[Dict] = [
{
"name": "Test 1: Luxury Modern Kitchen",
"ground_truth": "IMG_20210330_154502.jpg", # Kitchen with wooden cabinets
"style_ref": "6301510fa187aca16f680b2a525ed6de.jpg", # Orange card-style poster
"user_keywords": "luxury modern kitchen",
"marketing_copy": "Cook Your Dreams to Life",
"description": "A modular kitchen showcase — warm tones, premium finishes."
},
{
"name": "Test 2: Cozy Master Bedroom",
"ground_truth": "IMG_20210330_154512.jpg", # Bedroom with accent wall
"style_ref": "79c9a52c9af0c1d94df025dd1505db83.jpg", # Bold SOLD poster
"user_keywords": "cozy master bedroom",
"marketing_copy": "Where Comfort Meets Elegance",
"description": "Master bedroom with designer wallpaper — aspirational lifestyle."
},
{
"name": "Test 3: Elegant Living Space",
"ground_truth": "IMG_20210330_160420.jpg", # Balcony view bedroom
"style_ref": "7bb67cbc287300b78b4e8da3da7de242.jpg", # Magazine editorial
"user_keywords": "elegant living space",
"marketing_copy": "A New Beginning Starts Here",
"description": "Bedroom with balcony view — editorial magazine style."
},
{
"name": "Test 4: Premium Apartment Lifestyle",
"ground_truth": "IMG_20210330_154534.jpg", # Another room view
"style_ref": "ee9d7efdf9303342480d5cb57cec8400.jpg", # Bellagio luxury ad
"user_keywords": "premium apartment lifestyle",
"marketing_copy": "Live Above the Ordinary",
"description": "Premium apartment showcase — Dubai-style luxury marketing."
},
{
"name": "Test 5: Smart Home Investment",
"ground_truth": "IMG_20210330_160212.jpg", # Compact room
"style_ref": "fd0586727e1b43e9c346a6f851fb50f9.jpg", # Social media grid
"user_keywords": "smart home investment",
"marketing_copy": "Your Dream Home Is Waiting",
"description": "Investment-focused social media post — modern minimalist grid."
}
]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CORE FUNCTIONS
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def process_prompt(user_keywords: str, marketing_copy: str) -> Tuple[str, str]:
"""Parse user input into aesthetic keywords and marketing copy.
In the production Catalyst UI, the user types keywords and a marketing
headline separately. This function validates and returns them.
Args:
user_keywords: Style/aesthetic descriptors (e.g., 'luxury modern kitchen').
marketing_copy: Headline text to render on the poster.
Returns:
Validated (aesthetic_keywords, marketing_copy) tuple.
Raises:
ValueError: If either input is empty.
"""
if not user_keywords.strip():
raise ValueError("Keyword prompt cannot be empty")
if not marketing_copy.strip():
raise ValueError("Marketing copy cannot be empty")
return user_keywords.strip(), marketing_copy.strip()
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
"""Expand user keywords into a full Qwen-Image-2512-optimized prompt.
Takes simple user keywords and transforms them into a richly detailed
prompt that leverages Qwen-Image-2512's strengths: precise typography
rendering, cinematic lighting, and photorealistic quality.
The expanded prompt follows this structure:
1. Scene type declaration (marketing poster)
2. Aesthetic keyword injection (user's style preferences)
3. Typography instruction (exact text + font style)
4. Technical quality boosters (8k, photorealistic, etc.)
Args:
aesthetic_keywords: User's style keywords (e.g., 'luxury modern kitchen').
marketing_copy: Exact text to appear in the poster.
Returns:
A fully expanded prompt string ready for CLIPTextEncode.
Example:
>>> expand_prompt('luxury modern kitchen', 'Cook Your Dreams')
'A stunning, high-end real estate social media marketing poster...'
"""
return (
f"A stunning, high-end real estate social media marketing poster. "
f"Style: {aesthetic_keywords}, warm ambient lighting, premium materials, "
f"cinematic composition, professional interior photography. "
f"The poster must prominently display the exact text "
f"'{marketing_copy}' rendered in elegant, bold, modern sans-serif "
f"typography with high contrast against the background, crisp edges, "
f"perfectly aligned, highly legible. "
f"8k resolution, photorealistic quality, detailed textures, "
f"architectural magazine aesthetic, ultra-sharp focus, "
f"golden hour warmth, depth of field bokeh, premium brand feel, "
f"social media optimized layout, clean negative space for text."
)
def upload_image(image_path: Path) -> str:
"""Upload an image to the ComfyUI server's input directory.
Opens the image, ensures RGB mode, and uploads via /upload/image.
Args:
image_path: Path to the image file.
Returns:
The server-assigned filename for use in workflow JSON.
Raises:
FileNotFoundError: If the image doesn't exist.
ConnectionError: If ComfyUI server is unreachable.
"""
if not image_path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
img = Image.open(image_path)
if img.mode != "RGB":
img = img.convert("RGB")
import tempfile
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
tmp_path = tmp.name
img.save(tmp_path, format="PNG")
try:
with open(tmp_path, "rb") as f:
response = requests.post(
f"{COMFYUI_SERVER_URL}/upload/image",
files={"image": (image_path.name, f, "image/png")},
data={"overwrite": "true"},
timeout=60
)
response.raise_for_status()
result = response.json()
server_name = result.get("name", "")
if not server_name:
raise RuntimeError(f"Upload failed: {result}")
return server_name
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
def execute_workflow(
workflow: dict,
prompt_text: str,
gt_filename: str,
sr_filename: str
) -> str:
"""Inject dynamic values and queue workflow on ComfyUI.
Updates LoadImage nodes with uploaded filenames and CLIPTextEncode
with the expanded prompt, then sends to /prompt endpoint.
Args:
workflow: The loaded workflow JSON dict.
prompt_text: Expanded prompt from expand_prompt().
gt_filename: Server filename of ground truth image.
sr_filename: Server filename of style reference image.
Returns:
The prompt_id from the queue response.
"""
wf = copy.deepcopy(workflow)
wf[NODE_GROUND_TRUTH]["inputs"]["image"] = gt_filename
wf[NODE_STYLE_REF]["inputs"]["image"] = sr_filename
wf[NODE_POS_PROMPT]["inputs"]["text"] = prompt_text
payload = {
"prompt": wf,
"client_id": f"catalyst_batch_{int(time.time())}"
}
response = requests.post(
f"{COMFYUI_SERVER_URL}/prompt",
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
prompt_id = result.get("prompt_id", "")
if not prompt_id:
raise RuntimeError(f"Queue failed: {result}")
return prompt_id
def wait_for_completion(prompt_id: str, timeout: int = 600, poll_interval: int = 3) -> dict:
"""Poll /history/{prompt_id} until workflow completes.
Qwen-Image-2512 with 50 steps on L4 GPUs may take 60-180 seconds.
Args:
prompt_id: The queued prompt ID.
timeout: Max wait time in seconds.
poll_interval: Seconds between polls.
Returns:
The history dict containing output image metadata.
"""
start = time.time()
polls = 0
while time.time() - start < timeout:
time.sleep(poll_interval)
polls += 1
try:
r = requests.get(
f"{COMFYUI_SERVER_URL}/history/{prompt_id}",
timeout=10
)
if r.status_code == 200:
history = r.json()
prompt_data = history.get(prompt_id, {})
# Check for error
status = prompt_data.get("status", {})
if status.get("status_str") == "error":
msgs = status.get("messages", ["Unknown"])
raise RuntimeError(f"Workflow error: {msgs}")
# Check for outputs
for node_id, node_out in prompt_data.get("outputs", {}).items():
if "images" in node_out and node_out["images"]:
elapsed = time.time() - start
print(f" ✓ Done in {elapsed:.1f}s ({polls} polls)")
return prompt_data
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
if polls % 10 == 0:
print(f" ⏳ Still waiting... ({polls} polls)")
raise TimeoutError(f"Timed out after {timeout}s (prompt: {prompt_id})")
def download_output(history: dict, output_dir: Path, test_name: str) -> str:
"""Download the generated poster from ComfyUI.
Args:
history: The prompt history dict.
output_dir: Local directory to save to.
test_name: Name for the output file.
Returns:
Path to the saved image.
"""
for node_id, node_out in history.get("outputs", {}).items():
images = node_out.get("images", [])
if images:
img_info = images[0]
break
else:
raise RuntimeError("No output images in history")
view_url = (
f"{COMFYUI_SERVER_URL}/view"
f"?filename={img_info['filename']}"
f"&subfolder={img_info.get('subfolder', '')}"
f"&type={img_info.get('type', 'output')}"
)
r = requests.get(view_url, stream=True, timeout=60)
r.raise_for_status()
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', test_name).lower()
timestamp = time.strftime("%Y%m%d_%H%M%S")
out_path = output_dir / f"{safe_name}_{timestamp}.png"
with open(out_path, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
size_mb = out_path.stat().st_size / (1024 * 1024)
print(f" 💾 Saved: {out_path.name} ({size_mb:.1f} MB)")
return str(out_path)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# MAIN EXECUTION
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if __name__ == "__main__":
print("=" * 72)
print(" CATALYST BATCH TEST — 5 Social Media Poster Prompts")
print(" Model: Qwen-Image-2512 | Server: " + COMFYUI_SERVER_URL)
print("=" * 72)
# ── Setup ──
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"\n📂 Output: {OUTPUT_DIR}")
print(f"📄 Workflow: {WORKFLOW_PATH}")
print(f"🖼️ Inputs: {INPUT_DIR}")
print(f"🎨 Refs: {REF_DIR}")
# ── Verify server connectivity ──
print(f"\n🔌 Testing connection to {COMFYUI_SERVER_URL} ...")
try:
r = requests.get(f"{COMFYUI_SERVER_URL}/system_stats", timeout=10)
r.raise_for_status()
stats = r.json()
gpu_info = stats.get("devices", [])
print(f" ✓ Connected! GPUs: {len(gpu_info)}")
for gpu in gpu_info:
name = gpu.get("name", "unknown")
vram_total = gpu.get("vram_total", 0) / (1024**3)
vram_free = gpu.get("vram_free", 0) / (1024**3)
print(f"{name}: {vram_free:.1f}/{vram_total:.1f} GB free")
except requests.exceptions.ConnectionError:
print(f" ✗ FAILED: Cannot reach {COMFYUI_SERVER_URL}")
print(f" Ensure ComfyUI is running and the port is accessible.")
print(f" Try: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443")
sys.exit(1)
except Exception as e:
print(f" ⚠ Warning: {e}")
# ── Load workflow ──
try:
with open(WORKFLOW_PATH, "r", encoding="utf-8") as f:
workflow = json.load(f)
print(f"\n📋 Workflow loaded ({len(workflow)} nodes)")
except Exception as e:
print(f"\n✗ Failed to load workflow: {e}")
sys.exit(1)
# ── Run 5 tests ──
results = []
total_start = time.time()
for i, test in enumerate(TEST_CASES, 1):
print(f"\n{'' * 72}")
print(f" TEST {i}/5: {test['name']}")
print(f" {test['description']}")
print(f"{'' * 72}")
gt_path = INPUT_DIR / test["ground_truth"]
sr_path = REF_DIR / test["style_ref"]
print(f" 📸 Ground Truth: {test['ground_truth']}")
print(f" 🎨 Style Ref: {test['style_ref']}")
print(f" 🏷️ Keywords: {test['user_keywords']}")
print(f" ✍️ Copy: \"{test['marketing_copy']}\"")
try:
# Step 1: Parse
keywords, copy_text = process_prompt(
test["user_keywords"],
test["marketing_copy"]
)
# Step 2: Expand
expanded = expand_prompt(keywords, copy_text)
print(f"\n 📝 Expanded prompt ({len(expanded)} chars):")
print(f" {expanded[:100]}...")
# Step 3: Upload
print(f"\n ⬆️ Uploading images...")
gt_name = upload_image(gt_path)
print(f" GT → {gt_name}")
sr_name = upload_image(sr_path)
print(f" SR → {sr_name}")
# Step 4: Execute
print(f"\n 🚀 Queuing workflow...")
prompt_id = execute_workflow(workflow, expanded, gt_name, sr_name)
print(f" prompt_id: {prompt_id}")
# Step 5: Wait
print(f"\n ⏳ Waiting for generation...")
history = wait_for_completion(prompt_id, timeout=600)
# Step 6: Download
print(f"\n ⬇️ Downloading result...")
out_path = download_output(history, OUTPUT_DIR, test["name"])
results.append({
"test": test["name"],
"status": "✅ SUCCESS",
"output": out_path,
"prompt_id": prompt_id
})
except FileNotFoundError as e:
print(f"\n ✗ FILE NOT FOUND: {e}")
results.append({"test": test["name"], "status": "❌ FILE NOT FOUND", "error": str(e)})
except requests.exceptions.ConnectionError as e:
print(f"\n ✗ CONNECTION ERROR: {e}")
results.append({"test": test["name"], "status": "❌ CONNECTION ERROR", "error": str(e)})
except TimeoutError as e:
print(f"\n ✗ TIMEOUT: {e}")
results.append({"test": test["name"], "status": "❌ TIMEOUT", "error": str(e)})
except RuntimeError as e:
print(f"\n ✗ RUNTIME ERROR: {e}")
results.append({"test": test["name"], "status": "❌ RUNTIME ERROR", "error": str(e)})
except Exception as e:
print(f"\n ✗ UNEXPECTED ERROR: {type(e).__name__}: {e}")
results.append({"test": test["name"], "status": "❌ ERROR", "error": str(e)})
# ── Summary ──
total_time = time.time() - total_start
print(f"\n\n{'=' * 72}")
print(f" BATCH TEST SUMMARY")
print(f" Total time: {total_time:.1f}s | Tests: {len(TEST_CASES)}")
print(f"{'=' * 72}")
successes = 0
for r in results:
print(f" {r['status']} {r['test']}")
if "output" in r:
print(f"{r['output']}")
successes += 1
elif "error" in r:
print(f"{r['error'][:80]}")
print(f"\n Result: {successes}/{len(TEST_CASES)} passed")
print(f"{'=' * 72}")
# Save results to JSON
results_file = OUTPUT_DIR / "batch_results.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n 📊 Results saved: {results_file}")

View File

@@ -0,0 +1,569 @@
#!/usr/bin/env python3
"""
test_catalyst_workflow.py — Catalyst Poster Generation via ComfyUI + Qwen-Image-2512
=====================================================================================
This script tests the Catalyst real-estate poster generation workflow by:
1. Uploading a Ground Truth (architectural/floorplan) image to ComfyUI
2. Uploading a Style Reference image (from Google/Pinterest) to ComfyUI
3. Parsing raw UI input to extract marketing copy and aesthetic keywords
4. Expanding the parsed input into a full Qwen-Image-2512-tuned prompt
5. Dynamically injecting filenames and prompts into the workflow JSON
6. Queuing the workflow on the ComfyUI server and polling for completion
7. Downloading the final poster to a local output directory
Environment:
- ComfyUI backend running Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4, 96GB VRAM)
- Model location: /home/ubuntu/models/Qwen-Image-2512 (diffusers sharded format)
- ComfyUI location: /home/ubuntu/velocity/
- Internal ComfyUI port: 8118 | External gateway port: 8288
Usage:
python test_catalyst_workflow.py
"""
import os
import json
import re
import time
import base64
from pathlib import Path
from typing import Tuple, Optional
import requests
from PIL import Image
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CONFIGURATION — Update COMFYUI_SERVER_URL with your AWS instance IP
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
COMFYUI_SERVER_URL: str = "http://<AWS_INSTANCE_IP>:8188"
"""
ComfyUI server URL. Replace <AWS_INSTANCE_IP> with the actual IP address.
- For direct access (SSH tunnel): http://127.0.0.1:8118
- For external access (if port 8188 is open): http://54.91.19.60:8188
- Via Dream Weaver gateway (does NOT apply here): http://54.91.19.60:8288
Note: The internal ComfyUI port on the AWS instance is 8118. If SSH-tunnelling,
map local port 8188 to remote port 8118.
"""
INPUT_DIR: str = r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Sagnik Test Sample New"
"""Base directory containing Ground Truth and Style Reference test images."""
OUTPUT_DIR: str = r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Sagnik Test Sample New"
"""Directory to save generated poster outputs."""
WORKFLOW_JSON_PATH: str = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..", "workflows", "catalyst_poster_qwen.json"
)
"""Path to the catalyst_poster_qwen.json workflow file (relative to this script)."""
# Node IDs in the workflow JSON (must match catalyst_poster_qwen.json)
NODE_ID_GROUND_TRUTH: str = "1" # LoadImage node for Ground Truth
NODE_ID_STYLE_REF: str = "2" # LoadImage node for Style Reference
NODE_ID_POSITIVE_PROMPT: str = "9" # CLIPTextEncode node for positive prompt
NODE_ID_NEGATIVE_PROMPT: str = "10" # CLIPTextEncode node for negative prompt
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 1: Prompt Parsing
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def process_prompt(raw_ui_input: str) -> Tuple[str, str]:
"""Parse raw UI input into aesthetic keywords and marketing copy.
The raw input must contain marketing copy enclosed in double quotes.
Everything outside the quotes is treated as aesthetic/style keywords.
Args:
raw_ui_input: Raw string from the UI, e.g.:
'modern luxury warm lighting "Your Dream Home Awaits"'
Returns:
A tuple of (aesthetic_keywords, marketing_copy).
Raises:
ValueError: If no text enclosed in double quotes is found.
Example:
>>> process_prompt('art deco gold "Live in Elegance"')
('art deco gold', 'Live in Elegance')
"""
match = re.search(r'"([^"]+)"', raw_ui_input)
if not match:
raise ValueError(
"Marketing copy must be enclosed in double quotes. "
"Example: 'modern luxury \"Your Dream Home Awaits\"'"
)
marketing_copy: str = match.group(1).strip()
# Extract everything outside the quotes as aesthetic keywords
aesthetic_keywords: str = raw_ui_input[:match.start()] + raw_ui_input[match.end():]
aesthetic_keywords = re.sub(r'\s+', ' ', aesthetic_keywords).strip()
return aesthetic_keywords, marketing_copy
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 2: Prompt Expansion
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
"""Expand parsed inputs into a full Qwen-Image-2512-optimized prompt.
Constructs a semantically rich prompt formatted for Qwen-Image-2512's
typography rendering capabilities. The prompt explicitly instructs the
model to render text within the generated image.
Args:
aesthetic_keywords: Style descriptors (e.g., 'modern luxury warm lighting').
marketing_copy: Exact text to render in the poster (e.g., 'Your Dream Home Awaits').
Returns:
A complete prompt string ready for CLIPTextEncode.
Example:
>>> expand_prompt('modern luxury', 'Live in Style')
'A highly realistic, cinematic realestate marketing poster...'
"""
return (
f"A highly realistic, cinematic realestate marketing poster. "
f"Interior style: {aesthetic_keywords}. "
f"The image must prominently feature the exact text "
f"'{marketing_copy}' written in elegant, modern, highly legible "
f"typography. Professional lighting, 8k resolution, photorealistic "
f"quality, detailed textures."
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 3: Image Upload
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def upload_image(image_path: str) -> str:
"""Upload an image to the ComfyUI server for use in workflows.
Opens the image, converts to RGB if necessary, saves as a
temporary PNG, and uploads via the /upload/image endpoint.
Args:
image_path: Absolute path to the image file on disk.
Returns:
The server-side filename assigned by ComfyUI (used in workflow JSON).
Raises:
FileNotFoundError: If the image file does not exist.
requests.exceptions.ConnectionError: If the ComfyUI server is unreachable.
requests.exceptions.Timeout: If the upload times out.
RuntimeError: If the server returns an unexpected response.
"""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
# Open and ensure RGB mode
img = Image.open(path)
if img.mode != "RGB":
img = img.convert("RGB")
# Save to a temporary PNG buffer for upload
import tempfile
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
tmp_path = tmp.name
img.save(tmp_path, format="PNG")
try:
with open(tmp_path, "rb") as f:
files = {
"image": (path.name, f, "image/png")
}
data = {
"overwrite": "true"
}
response = requests.post(
f"{COMFYUI_SERVER_URL}/upload/image",
files=files,
data=data,
timeout=60
)
response.raise_for_status()
result = response.json()
server_filename: str = result.get("name", "")
if not server_filename:
raise RuntimeError(
f"ComfyUI upload returned unexpected response: {result}"
)
print(f" ✓ Uploaded '{path.name}' → server filename: '{server_filename}'")
return server_filename
finally:
# Clean up temp file
try:
os.unlink(tmp_path)
except OSError:
pass
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 4: Execute Workflow
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def execute_workflow(
workflow_json: dict,
prompt_text: str,
ground_truth_filename: str,
style_ref_filename: str
) -> str:
"""Inject dynamic values into the workflow JSON and queue it on ComfyUI.
Updates the following nodes in the workflow:
- Node 1 (LoadImage): Sets Ground Truth filename
- Node 2 (LoadImage): Sets Style Reference filename
- Node 9 (CLIPTextEncode): Sets the expanded positive prompt
Args:
workflow_json: The loaded workflow JSON dict (API format).
prompt_text: The expanded prompt string from expand_prompt().
ground_truth_filename: Server-side filename of the ground truth image.
style_ref_filename: Server-side filename of the style reference image.
Returns:
The prompt_id string from ComfyUI's queue response.
Raises:
requests.exceptions.ConnectionError: If the server is unreachable.
requests.exceptions.Timeout: If the request times out.
KeyError: If expected node IDs are missing from the workflow JSON.
"""
# Deep copy to avoid mutating the original
import copy
wf = copy.deepcopy(workflow_json)
# Inject Ground Truth image filename
wf[NODE_ID_GROUND_TRUTH]["inputs"]["image"] = ground_truth_filename
# Inject Style Reference image filename
wf[NODE_ID_STYLE_REF]["inputs"]["image"] = style_ref_filename
# Inject expanded positive prompt
wf[NODE_ID_POSITIVE_PROMPT]["inputs"]["text"] = prompt_text
# Build the API payload
payload = {
"prompt": wf,
"client_id": f"catalyst_{int(time.time())}"
}
print(f" → Queuing workflow on {COMFYUI_SERVER_URL}/prompt ...")
response = requests.post(
f"{COMFYUI_SERVER_URL}/prompt",
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
prompt_id: str = result.get("prompt_id", "")
if not prompt_id:
raise RuntimeError(
f"ComfyUI /prompt returned unexpected response: {result}"
)
print(f" ✓ Queued successfully. prompt_id: {prompt_id}")
return prompt_id
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 5: Poll for Completion
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def wait_for_completion(
prompt_id: str,
timeout: int = 300,
poll_interval: int = 2
) -> dict:
"""Poll the ComfyUI history endpoint until the workflow completes.
Repeatedly checks /history/{prompt_id} for output images. The Qwen-Image-2512
model with 50 inference steps on 4x L4 GPUs typically takes 30-90 seconds.
Args:
prompt_id: The prompt ID returned by execute_workflow().
timeout: Maximum seconds to wait before raising TimeoutError.
poll_interval: Seconds between poll requests.
Returns:
The history dict for this prompt_id (contains output image metadata).
Raises:
TimeoutError: If the workflow doesn't complete within timeout seconds.
requests.exceptions.ConnectionError: If the server is unreachable.
RuntimeError: If the workflow reports an error status.
"""
history_url = f"{COMFYUI_SERVER_URL}/history/{prompt_id}"
start_time = time.time()
poll_count = 0
print(f" ⏳ Polling for completion (timeout: {timeout}s) ...")
while time.time() - start_time < timeout:
time.sleep(poll_interval)
poll_count += 1
try:
response = requests.get(history_url, timeout=10)
if response.status_code == 200:
history = response.json()
prompt_history = history.get(prompt_id, {})
# Check for error status
status_info = prompt_history.get("status", {})
if status_info.get("status_str") == "error":
error_msgs = status_info.get("messages", ["Unknown error"])
raise RuntimeError(
f"Workflow execution failed: {error_msgs}"
)
# Check for output images
outputs = prompt_history.get("outputs", {})
for node_id, node_output in outputs.items():
if "images" in node_output and node_output["images"]:
elapsed = time.time() - start_time
print(
f" ✓ Completed in {elapsed:.1f}s "
f"({poll_count} polls)"
)
return prompt_history
except requests.exceptions.ConnectionError:
# Server might be busy with GPU inference, retry
print(f" Poll #{poll_count}: Connection interrupted, retrying...")
except requests.exceptions.Timeout:
print(f" Poll #{poll_count}: Timeout, retrying...")
raise TimeoutError(
f"Workflow did not complete within {timeout} seconds "
f"(prompt_id: {prompt_id})"
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# FUNCTION 6: Download Output
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def download_output(history: dict, output_dir: str) -> str:
"""Extract and download the generated poster from ComfyUI history.
Reads the output image metadata from the history response, constructs
the /view URL, downloads the image, and saves it with a timestamped
filename.
Args:
history: The prompt history dict returned by wait_for_completion().
output_dir: Local directory to save the output image.
Returns:
The absolute path to the saved output image.
Raises:
RuntimeError: If no output images are found in the history.
requests.exceptions.ConnectionError: If the server is unreachable.
"""
# Find the output image in the history
output_image: Optional[dict] = None
for node_id, node_output in history.get("outputs", {}).items():
images = node_output.get("images", [])
if images:
output_image = images[0]
break
if not output_image:
raise RuntimeError("No output images found in workflow history")
# Construct the ComfyUI /view URL
filename = output_image["filename"]
subfolder = output_image.get("subfolder", "")
img_type = output_image.get("type", "output")
view_url = (
f"{COMFYUI_SERVER_URL}/view"
f"?filename={filename}"
f"&subfolder={subfolder}"
f"&type={img_type}"
)
print(f" ⬇ Downloading: {filename} ...")
response = requests.get(view_url, stream=True, timeout=60)
response.raise_for_status()
# Save with timestamp
timestamp = time.strftime("%Y%m%d_%H%M%S")
output_filename = f"catalyst_poster_{timestamp}.png"
output_path = os.path.join(output_dir, output_filename)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
file_size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f" ✓ Saved: {output_path} ({file_size_mb:.1f} MB)")
return output_path
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# MAIN EXECUTION
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if __name__ == "__main__":
print("=" * 72)
print(" CATALYST POSTER GENERATION — Qwen-Image-2512 Workflow Test")
print("=" * 72)
# ── Ensure output directory exists ──
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"\n📂 Output dir: {OUTPUT_DIR}")
# ── Load workflow JSON ──
workflow_path = os.path.normpath(WORKFLOW_JSON_PATH)
print(f"📄 Workflow: {workflow_path}")
try:
with open(workflow_path, "r", encoding="utf-8") as f:
workflow = json.load(f)
print(f" ✓ Loaded workflow ({len(workflow)} nodes)")
except FileNotFoundError:
print(f" ✗ ERROR: Workflow file not found: {workflow_path}")
print(" Ensure catalyst_poster_qwen.json is in ../workflows/")
exit(1)
except json.JSONDecodeError as e:
print(f" ✗ ERROR: Invalid JSON in workflow file: {e}")
exit(1)
# ── Define test inputs ──
# Update these paths to your actual test images
ground_truth_image = os.path.join(INPUT_DIR, "ground_truth.jpg")
style_reference_image = os.path.join(INPUT_DIR, "style_reference.jpg")
raw_prompt = (
'modern luxury warm ambient lighting premium materials '
'golden hour cinematic architectural photography '
'"Your Dream Home Awaits"'
)
print(f"\n🖼️ Ground Truth: {ground_truth_image}")
print(f"🎨 Style Reference: {style_reference_image}")
print(f"📝 Raw Prompt: {raw_prompt}")
# ── Step 1: Parse the prompt ──
print("\n── Step 1: Parsing prompt ──")
try:
aesthetic_keywords, marketing_copy = process_prompt(raw_prompt)
print(f" Keywords: {aesthetic_keywords}")
print(f" Copy: \"{marketing_copy}\"")
except ValueError as e:
print(f" ✗ ERROR: {e}")
exit(1)
# ── Step 2: Expand the prompt ──
print("\n── Step 2: Expanding prompt ──")
expanded = expand_prompt(aesthetic_keywords, marketing_copy)
print(f" Expanded ({len(expanded)} chars):")
print(f" {expanded[:120]}...")
# ── Step 3: Upload images ──
print("\n── Step 3: Uploading images ──")
try:
gt_filename = upload_image(ground_truth_image)
sr_filename = upload_image(style_reference_image)
except FileNotFoundError as e:
print(f" ✗ ERROR: {e}")
print(" Place test images in the INPUT_DIR directory.")
exit(1)
except requests.exceptions.ConnectionError as e:
print(f" ✗ CONNECTION ERROR: Cannot reach {COMFYUI_SERVER_URL}")
print(f" Details: {e}")
print(" Ensure ComfyUI is running and the URL is correct.")
print(" If using SSH tunnel: ssh -L 8188:127.0.0.1:8118 ...")
exit(1)
except requests.exceptions.Timeout:
print(f" ✗ TIMEOUT: Upload timed out to {COMFYUI_SERVER_URL}")
exit(1)
except Exception as e:
print(f" ✗ UNEXPECTED ERROR during upload: {e}")
exit(1)
# ── Step 4: Execute workflow ──
print("\n── Step 4: Executing workflow ──")
try:
prompt_id = execute_workflow(
workflow_json=workflow,
prompt_text=expanded,
ground_truth_filename=gt_filename,
style_ref_filename=sr_filename
)
except requests.exceptions.ConnectionError as e:
print(f" ✗ CONNECTION ERROR: {e}")
exit(1)
except requests.exceptions.Timeout:
print(f" ✗ TIMEOUT: Could not queue workflow")
exit(1)
except KeyError as e:
print(f" ✗ WORKFLOW ERROR: Missing node ID {e} in workflow JSON")
exit(1)
except Exception as e:
print(f" ✗ UNEXPECTED ERROR: {e}")
exit(1)
# ── Step 5: Poll for completion ──
print("\n── Step 5: Waiting for completion ──")
try:
history = wait_for_completion(
prompt_id=prompt_id,
timeout=300,
poll_interval=2
)
except TimeoutError as e:
print(f" ✗ TIMEOUT: {e}")
exit(1)
except RuntimeError as e:
print(f" ✗ EXECUTION ERROR: {e}")
exit(1)
except Exception as e:
print(f" ✗ UNEXPECTED ERROR: {e}")
exit(1)
# ── Step 6: Download output ──
print("\n── Step 6: Downloading output ──")
try:
output_path = download_output(
history=history,
output_dir=OUTPUT_DIR
)
except RuntimeError as e:
print(f" ✗ ERROR: {e}")
exit(1)
except requests.exceptions.ConnectionError as e:
print(f" ✗ DOWNLOAD ERROR: {e}")
exit(1)
except Exception as e:
print(f" ✗ UNEXPECTED ERROR: {e}")
exit(1)
# ── Success ──
print("\n" + "=" * 72)
print(f" ✅ SUCCESS — Poster saved to:")
print(f" {output_path}")
print("=" * 72)

Binary file not shown.

After

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 76 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 243 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 273 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 234 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 386 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 320 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 271 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 173 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 282 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 209 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 274 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 280 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 157 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 280 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 345 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 210 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 174 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 213 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 267 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 243 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 273 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 234 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 386 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 320 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 271 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 173 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 282 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 209 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 274 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 280 KiB

View File

@@ -0,0 +1,155 @@
{
"1": {
"class_type": "LoadImage",
"_meta": {"title": "Ground Truth (Architectural/Floorplan)"},
"inputs": {
"image": "ground_truth_input.png",
"upload": "image"
}
},
"2": {
"class_type": "LoadImage",
"_meta": {"title": "Style Reference (Google/Pinterest)"},
"inputs": {
"image": "style_reference_input.png",
"upload": "image"
}
},
"3": {
"class_type": "DiffusersLoader",
"_meta": {"title": "Qwen-Image-2512 Model Loader"},
"inputs": {
"model_path": "/home/ubuntu/models/Qwen-Image-2512"
}
},
"4": {
"class_type": "ImageScale",
"_meta": {"title": "Scale Ground Truth"},
"inputs": {
"image": ["1", 0],
"upscale_method": "lanczos",
"width": 1104,
"height": 1472,
"crop": "center"
}
},
"5": {
"class_type": "CannyEdgePreprocessor",
"_meta": {"title": "Canny Edge (Spatial Geometry)"},
"inputs": {
"image": ["4", 0],
"low_threshold": 80,
"high_threshold": 160,
"resolution": 1024
}
},
"6": {
"class_type": "ControlNetLoader",
"_meta": {"title": "ControlNet Canny Loader"},
"inputs": {
"control_net_name": "control_v11p_sd15_canny.pth"
}
},
"7": {
"class_type": "CLIPVisionLoader",
"_meta": {"title": "CLIP Vision for IP-Adapter"},
"inputs": {
"clip_name": "CLIP-ViT-H-14-laion2B-s32B-b79K.safetensors"
}
},
"8": {
"class_type": "IPAdapterModelLoader",
"_meta": {"title": "IP-Adapter Model"},
"inputs": {
"ipadapter_file": "ip-adapter_sd15.bin"
}
},
"9": {
"class_type": "CLIPTextEncode",
"_meta": {"title": "Positive Prompt (Typography + Aesthetic)"},
"inputs": {
"text": "A highly realistic, cinematic real estate marketing poster. Interior style: modern luxury, warm ambient lighting, premium materials. The image must prominently feature the exact text 'YOUR DREAM HOME AWAITS' written in elegant, modern, highly legible typography positioned at the lower third of the image, clean sans-serif font, crisp rendering, high contrast text. Professional cinematic lighting, 8k resolution, photorealistic quality, detailed textures, architectural photography, ultra-sharp focus, golden hour warmth, premium real estate aesthetic.",
"clip": ["3", 1]
}
},
"10": {
"class_type": "CLIPTextEncode",
"_meta": {"title": "Negative Prompt"},
"inputs": {
"text": "deformed, blurry, bad anatomy, watermark, logo, extra text, distorted text, blurry text, misaligned text, low quality, worst quality, illustration, 3d render, painting, cartoon, sketch, artifacts, noise, oversaturated, unrealistic lighting, structural changes, extra windows, extra doors, warped perspective, low resolution, pixelated",
"clip": ["3", 1]
}
},
"11": {
"class_type": "EmptyLatentImage",
"_meta": {"title": "Poster Canvas (3:4 Portrait)"},
"inputs": {
"width": 1104,
"height": 1472,
"batch_size": 1
}
},
"12": {
"class_type": "IPAdapterAdvanced",
"_meta": {"title": "Style Transfer from Reference"},
"inputs": {
"model": ["3", 0],
"ipadapter": ["8", 0],
"image": ["2", 0],
"clip_vision": ["7", 0],
"weight": 0.55,
"weight_type": "linear",
"combine_embeds": "concat",
"start_at": 0.0,
"end_at": 0.5,
"embeds_scaling": "V only",
"noise": 0.0
}
},
"13": {
"class_type": "ControlNetApplyAdvanced",
"_meta": {"title": "ControlNet Apply (Canny Geometry)"},
"inputs": {
"positive": ["9", 0],
"negative": ["10", 0],
"control_net": ["6", 0],
"image": ["5", 0],
"strength": 0.75,
"start_percent": 0.0,
"end_percent": 0.65
}
},
"14": {
"class_type": "KSampler",
"_meta": {"title": "Qwen Sampler (true_cfg compatible)"},
"inputs": {
"model": ["12", 0],
"positive": ["13", 0],
"negative": ["13", 1],
"latent_image": ["11", 0],
"seed": 42,
"control_after_generate": "randomize",
"steps": 50,
"cfg": 4.0,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1.0
}
},
"15": {
"class_type": "VAEDecode",
"_meta": {"title": "Decode Latent to Image"},
"inputs": {
"samples": ["14", 0],
"vae": ["3", 2]
}
},
"16": {
"class_type": "SaveImage",
"_meta": {"title": "Save Catalyst Poster Output"},
"inputs": {
"images": ["15", 0],
"filename_prefix": "catalyst_poster_qwen"
}
}
}