feat: Added the ComfyUI engine #12
91
.Agent Context/Bibels/The Catalyst_ Brief.md
Normal file
@@ -0,0 +1,91 @@
|
||||
### **Part 1: First Principles Page Structure Analysis**
|
||||
|
||||
*If "The Catalyst" is an autonomous digital marketing agency, it cannot look like a standard Facebook Ads Manager spreadsheet. It must feel like an executive command center.*
|
||||
|
||||
Here is the optimized page architecture for the WebOS:
|
||||
|
||||
**1\. The Studio (Visual AI Engine)**
|
||||
|
||||
* **Purpose:** Where your ComfyUI workflows (Wan 2.2 and Qwen-Image 2512\) live on the frontend.
|
||||
* **Function:** Users input raw property data/images, and the system generates variations of cinematic videos and multi-lingual posters.
|
||||
* **Meta Link:** Automatically pushes these assets to the Meta Asset Library via API.
|
||||
|
||||
**2\. Campaign Command (Control Surface)**
|
||||
|
||||
* **Purpose:** The strategic deployment center.
|
||||
* **Function:** Instead of manually setting up ads, the user defines the "Objective" (e.g., "Sell 3BHKs fast") and "Budget". The Claw Agent uses the **Meta Marketing API** to execute Ad Campaign Management & Automation, handling ad sets, bulk creation, and dynamic creative rotation using the assets generated in The Studio.
|
||||
|
||||
**3\. Intelligence & ROI (Analytics Dashboard)**
|
||||
|
||||
* **Purpose:** Real-time visual justification of marketing spend.
|
||||
* **Function:** Uses the **Ads Insights API** to visualize CPA (Cost Per Acquisition) and ROI. Features a "Live Optimization" log showing what the Claw Agent is doing in real-time (e.g., *"Agent paused Ad Set B due to high CPA. Shifted budget to Ad Set A."*).
|
||||
|
||||
**4\. The War Room (Market Research & Audiences)**
|
||||
|
||||
* **Purpose:** AI-driven competitor and audience strategy.
|
||||
* **Function:** Integrates the **Ad Library API** to spy on competitor ads (e.g., Emaar/Damac) and uses the completed Brave Search integration to track real estate trends. Connects directly to the **Audience Management API** to build Custom/Lookalike audiences dynamically from the Supabase CRM leads.
|
||||
|
||||
**5\. System & Meta Graph (Settings)**
|
||||
|
||||
* **Purpose:** The plumbing.
|
||||
* **Function:** Uses the **Meta Business API** for Client Onboarding, Business Asset Management (linking WhatsApp, Instagram, FB Pages), and Permissions Management.
|
||||
|
||||
---
|
||||
|
||||
### **Part 2: Software Requirements Specification (SRS)**
|
||||
|
||||
#### **A. Tech Stack & State Management**
|
||||
|
||||
* **Frontend (Sayan):** Next JS (Latest Version), Tailwind CSS, Framer Motion, Recharts, Zustand (for `useMarketingStore`).
|
||||
* **Backend (Sayan):** Python (FastAPI) utilizing the official `facebook-business` Python SDK.
|
||||
* **Visual Generation (Sagnik):** ComfyUI API mode running locally, utilizing Flux-Schnell/Qwen-Image 2512 (Images) and Wan 2.2 14B/1.3B (Video).
|
||||
* **Autonomous Logic (Sourik/Sayan):** PicoClaw/IronClaw agent triggered via FastAPI endpoints to manage bidding and rotation.
|
||||
|
||||
#### **B. Core API Mappings (Meta to FastAPI)**
|
||||
|
||||
Sayan must build these specific Python routes:
|
||||
|
||||
* `POST /api/catalyst/campaigns/create`: Triggers Meta Marketing API to build campaigns in bulk.
|
||||
* `POST /api/catalyst/creative/sync`: Uploads ComfyUI outputs (Wan 2.2 mp4s / Qwen pngs) directly to the Meta Asset Manager.
|
||||
* `GET /api/catalyst/insights/realtime`: Polls Meta Ads Insights API for click-through rates, CPA, and spend limits.
|
||||
* `POST /api/catalyst/audiences/lookalike`: Pushes "Qualified" or "Whale" leads from your CRM (Supabase) into Meta to create High-Net-Worth Lookalike Audiences.
|
||||
|
||||
---
|
||||
|
||||
### **Part 3: Scientific Breakdown of Tasks (Sprint Plan)**
|
||||
|
||||
Here are the actionable tasks for you and Sayan to drop directly into your Taiga board for "The Catalyst" module.
|
||||
|
||||
#### **👑 Sagnik’s Tasks (Visual AI & Architecture)**
|
||||
|
||||
1. **Expose ComfyUI Endpoints for Catalyst Integration:**
|
||||
* **Action:** Ensure the `catalyst_poster_qwen.json` (Qwen-Image 2512\) and `cinematic_wan22_14b.json` (Wan 2.2) workflows are exposed via the ComfyUI Asynchronous Queue API.
|
||||
* **Output:** Provide Sayan with the exact JSON payload structures required to trigger these renders programmatically from the WebOS.
|
||||
2. **Define Agentic System Prompts for Dynamic Creative:**
|
||||
* **Action:** Write the system prompts for the Claw agent. Instruct the agent on how to prompt Qwen-Image 2512 for A/B testing (e.g., "Generate Image A with English typography, Image B with Arabic typography").
|
||||
|
||||
#### **💻 Sayan’s Tasks (Full-Stack & Meta API)**
|
||||
|
||||
1. **Build the Catalyst Zustand Store & Layout (`app/src/components/modules/Catalyst.tsx`):**
|
||||
* **Action:** Create `useMarketingStore.ts` in Zustand to manage `campaigns[]`, `activeAssets[]`, and `adInsights{}`.
|
||||
* **UI Build:** Construct the 4-column Bento grid for the Analytics Dashboard using Recharts (similar to the LeadVelocityChart in the Dashboard).
|
||||
2. **Meta Business API \- Auth & Settings Interface:**
|
||||
* **Action:** Build the Settings & API entry fields using `DarkInput` and `GhostButton` components.
|
||||
* **Backend:** Implement the OAuth flow in FastAPI to acquire and securely store the Meta System User Access Token in Supabase.
|
||||
3. **Meta Marketing API \- The Integration Engine (`backend/api/routes_catalyst.py`):**
|
||||
* **Action:** Write the Python wrappers for the Meta Ads API.
|
||||
* **Function 1:** Implement **Audience Management**—write a script that automatically queries the Supabase CRM for leads tagged "Closed/Won" and pushes their hashed emails to Meta to update a Custom Audience.
|
||||
* **Function 2:** Implement **Dynamic Creative Rotation**—write an endpoint that accepts Sagnik's ComfyUI image/video URLs, uploads them to Meta's Ad Library, and assigns them to an active Ad Set for A/B testing.
|
||||
|
||||
4. **Design the UI/UX Components:**
|
||||
* **Action:** Design the frontend component states for "Asset Generation". Use "Apple/Steve Jobs" aesthetic (glassmorphism, `backdrop-blur-xl`).
|
||||
* **Detail:** Must include a visual "Generation Queue" that uses latency-hiding (e.g., shimmering loading states that say *"Wan 2.2 is rendering cinematic lighting..."*).
|
||||
|
||||
5. **Real-Time Optimization Dashboard (WebOS):**
|
||||
* **Action:** Create a `<LiveOptimizationFeed />` component using Framer Motion (`AnimatePresence`).
|
||||
* **Details:** It should stream WebSocket updates from the backend showing what the autonomous bot is doing (e.g., *"Agent Sourik paused Campaign X due to CPA \> $50"*).
|
||||
|
||||
### **First-Principles Summary for Development**
|
||||
|
||||
By structuring the Meta API exactly like this, you are not building an Ads Manager. You are building an **Autonomous Deployment Engine**. Sagnik's models (Wan/Qwen) act as the infinite creative agency, Sayan's React/FastAPI stack acts as the pipes, and Sourik's Claw agent acts as the media buyer turning those pipes on and off based on the real-time Meta Insights data.
|
||||
|
||||
40
backend/.env.example
Normal file
@@ -0,0 +1,40 @@
|
||||
# ── Meta Graph API ────────────────────────────────────────────────────────────
|
||||
# Long-lived System User Access Token from Meta Business Manager
|
||||
# Business Settings → System Users → Generate Token
|
||||
META_ACCESS_TOKEN=PLACEHOLDER_your_meta_system_user_token
|
||||
|
||||
# Ad Account ID — format: act_XXXXXXXXXX
|
||||
# Meta Business Manager → Ad Accounts
|
||||
META_AD_ACCOUNT_ID=PLACEHOLDER_act_1234567890
|
||||
|
||||
# Business Portfolio ID
|
||||
# Meta Business Settings → Business Info → Business ID
|
||||
META_BUSINESS_ID=PLACEHOLDER_1234567890
|
||||
|
||||
# App ID & Secret — from Meta Developers → Your App → Basic Settings
|
||||
META_APP_ID=PLACEHOLDER_9876543210
|
||||
META_APP_SECRET=PLACEHOLDER_your_app_secret
|
||||
|
||||
# API Version (use latest stable)
|
||||
META_API_VERSION=v21.0
|
||||
|
||||
# ── Supabase (CRM) ────────────────────────────────────────────────────────────
|
||||
# Project URL from Supabase Dashboard → Settings → API
|
||||
SUPABASE_URL=PLACEHOLDER_https://xxxxxxxxxxx.supabase.co
|
||||
|
||||
# Anon/Public key (for server-side reads)
|
||||
SUPABASE_ANON_KEY=PLACEHOLDER_your_supabase_anon_key
|
||||
|
||||
# Service Role key (for elevated writes — keep secret!)
|
||||
SUPABASE_SERVICE_ROLE_KEY=PLACEHOLDER_your_supabase_service_role_key
|
||||
|
||||
# ── ComfyUI ───────────────────────────────────────────────────────────────────
|
||||
# Base URL of ComfyUI server running locally or on GPU node
|
||||
COMFY_BASE_URL=http://localhost:8188
|
||||
|
||||
# ── Backend ───────────────────────────────────────────────────────────────────
|
||||
# CORS origins — comma-separated list of allowed frontend origins
|
||||
CORS_ORIGINS=http://localhost:5173,http://localhost:3000
|
||||
|
||||
# Secret key for signing internal JWTs/sessions
|
||||
SECRET_KEY=PLACEHOLDER_generate_with_openssl_rand_hex_32
|
||||
435
backend/api/routes_catalyst.py
Normal file
@@ -0,0 +1,435 @@
|
||||
"""
|
||||
routes_catalyst.py
|
||||
Meta Marketing API wrappers for The Catalyst module.
|
||||
|
||||
Routes:
|
||||
POST /api/catalyst/campaigns/create — Bulk campaign creation
|
||||
POST /api/catalyst/creative/sync — Upload ComfyUI assets to Meta
|
||||
GET /api/catalyst/insights/realtime — Poll Ads Insights API
|
||||
POST /api/catalyst/audiences/lookalike — Push CRM leads → Meta Custom Audience
|
||||
POST /api/catalyst/auth/meta — OAuth token acquisition
|
||||
"""
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Any
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request, status
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _get_sdk() -> tuple[Any, str]:
|
||||
"""
|
||||
Initialise the facebook-business SDK lazily.
|
||||
Returns (FacebookAdsApi instance, ad_account_id).
|
||||
Raises HTTPException 503 if credentials are missing or SDK init fails.
|
||||
"""
|
||||
try:
|
||||
from facebook_business.api import FacebookAdsApi # type: ignore
|
||||
access_token = os.getenv("META_ACCESS_TOKEN", "")
|
||||
app_id = os.getenv("META_APP_ID", "")
|
||||
app_secret = os.getenv("META_APP_SECRET", "")
|
||||
account_id = os.getenv("META_AD_ACCOUNT_ID", "")
|
||||
|
||||
if not access_token or access_token.startswith("PLACEHOLDER"):
|
||||
raise ValueError("META_ACCESS_TOKEN is not configured.")
|
||||
if not account_id or account_id.startswith("PLACEHOLDER"):
|
||||
raise ValueError("META_AD_ACCOUNT_ID is not configured.")
|
||||
|
||||
FacebookAdsApi.init(app_id, app_secret, access_token)
|
||||
return FacebookAdsApi.get_default_api(), account_id
|
||||
except ImportError:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail="facebook-business SDK not installed. Run: pip install facebook-business",
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail=str(exc),
|
||||
)
|
||||
|
||||
|
||||
def _get_supabase():
|
||||
"""Initialise the Supabase client lazily."""
|
||||
try:
|
||||
from supabase import create_client # type: ignore
|
||||
url = os.getenv("SUPABASE_URL", "")
|
||||
key = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "")
|
||||
if not url or url.startswith("PLACEHOLDER"):
|
||||
raise ValueError("SUPABASE_URL is not configured.")
|
||||
return create_client(url, key)
|
||||
except ImportError:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail="supabase SDK not installed. Run: pip install supabase",
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail=str(exc),
|
||||
)
|
||||
|
||||
|
||||
def _ok(data: Any, meta: dict | None = None) -> dict:
|
||||
return {"status": "ok", "data": data, "meta": meta or {}}
|
||||
|
||||
|
||||
def _sha256_hash(value: str) -> str:
|
||||
"""SHA-256 hash an email for Meta's hashed audience upload."""
|
||||
return hashlib.sha256(value.strip().lower().encode()).hexdigest()
|
||||
|
||||
|
||||
# ── Request / Response Models ─────────────────────────────────────────────────
|
||||
|
||||
class CampaignCreateRequest(BaseModel):
|
||||
name: str = Field(..., description="Campaign display name")
|
||||
objective: str = Field("OUTCOME_LEADS", description="Meta campaign objective enum")
|
||||
budget_daily: int = Field(..., gt=0, description="Daily budget in cents (AED × 100)")
|
||||
status: str = Field("PAUSED", description="Initial campaign status — start PAUSED for review")
|
||||
special_ad_categories: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class CampaignCreateResponse(BaseModel):
|
||||
campaign_id: str
|
||||
name: str
|
||||
status: str
|
||||
created_at: str
|
||||
|
||||
|
||||
class CreativeSyncRequest(BaseModel):
|
||||
asset_url: str = Field(..., description="Public URL of the ComfyUI-rendered image or video")
|
||||
asset_name: str = Field(..., description="Human-readable asset name")
|
||||
asset_type: str = Field(..., description="'image' or 'video'")
|
||||
ad_account_id: str | None = Field(None, description="Override ad account ID (optional)")
|
||||
|
||||
|
||||
class LookalikeAudienceRequest(BaseModel):
|
||||
country: str = Field("AE", description="ISO 3166-1 alpha-2 country code for lookalike")
|
||||
ratio: float = Field(0.01, ge=0.01, le=0.20, description="Lookalike ratio (1%–20%)")
|
||||
crm_filter_status: str = Field("Closed/Won", description="Supabase lead status to filter on")
|
||||
|
||||
|
||||
class MetaAuthRequest(BaseModel):
|
||||
short_lived_token: str = Field(..., description="Short-lived user access token from Meta OAuth")
|
||||
|
||||
|
||||
# ── 1. POST /campaigns/create ─────────────────────────────────────────────────
|
||||
|
||||
@router.post("/campaigns/create", summary="Bulk-create Meta Marketing campaigns")
|
||||
async def create_campaigns(
|
||||
request: Request,
|
||||
payload: CampaignCreateRequest,
|
||||
) -> dict:
|
||||
"""
|
||||
Triggers `facebook_business.adobjects.campaign.Campaign` to create a campaign
|
||||
under the configured Ad Account.
|
||||
|
||||
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
|
||||
"""
|
||||
_api, account_id = _get_sdk()
|
||||
|
||||
try:
|
||||
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
|
||||
from facebook_business.adobjects.campaign import Campaign # type: ignore
|
||||
|
||||
account = AdAccount(account_id)
|
||||
params = {
|
||||
Campaign.Field.name: payload.name,
|
||||
Campaign.Field.objective: payload.objective,
|
||||
Campaign.Field.status: payload.status,
|
||||
Campaign.Field.daily_budget: payload.budget_daily,
|
||||
Campaign.Field.special_ad_categories: payload.special_ad_categories,
|
||||
}
|
||||
campaign = account.create_campaign(params=params)
|
||||
|
||||
# Broadcast live event via WebSocket
|
||||
if hasattr(request.app.state, "broadcast_live_event"):
|
||||
await request.app.state.broadcast_live_event(
|
||||
"create",
|
||||
f"Created campaign '{payload.name}' (objective: {payload.objective}).",
|
||||
payload.name,
|
||||
f"Budget: AED {payload.budget_daily / 100:.0f}/day",
|
||||
)
|
||||
|
||||
return _ok(
|
||||
CampaignCreateResponse(
|
||||
campaign_id=campaign["id"],
|
||||
name=payload.name,
|
||||
status=payload.status,
|
||||
created_at=datetime.utcnow().isoformat(),
|
||||
).model_dump(),
|
||||
meta={"account_id": account_id},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error("Campaign creation failed: %s", exc)
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
|
||||
|
||||
|
||||
# ── 2. POST /creative/sync ────────────────────────────────────────────────────
|
||||
|
||||
@router.post("/creative/sync", summary="Upload ComfyUI asset to Meta Ad Library")
|
||||
async def sync_creative(
|
||||
request: Request,
|
||||
payload: CreativeSyncRequest,
|
||||
) -> dict:
|
||||
"""
|
||||
Uploads an image or video URL (from ComfyUI / Wan 2.2 / Qwen-Image 2512) to
|
||||
the Meta Ad Library (Creative Hub) and returns the Meta Asset ID.
|
||||
|
||||
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
|
||||
"""
|
||||
_api, account_id = _get_sdk()
|
||||
account_id = payload.ad_account_id or account_id
|
||||
|
||||
try:
|
||||
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
|
||||
from facebook_business.adobjects.advideo import AdVideo # type: ignore
|
||||
from facebook_business.adobjects.adimage import AdImage # type: ignore
|
||||
|
||||
account = AdAccount(account_id)
|
||||
|
||||
if payload.asset_type == "video":
|
||||
# Video upload via file_url
|
||||
result = account.create_ad_video(params={
|
||||
AdVideo.Field.name: payload.asset_name,
|
||||
AdVideo.Field.file_url: payload.asset_url,
|
||||
})
|
||||
meta_asset_id = result["id"]
|
||||
else:
|
||||
# Image upload via url
|
||||
result = account.create_ad_image(params={
|
||||
"filename": payload.asset_name,
|
||||
"url": payload.asset_url,
|
||||
})
|
||||
# AdImage returns a hash dict — extract hash key
|
||||
meta_asset_id = list(result["images"].values())[0]["hash"] \
|
||||
if "images" in result else result.get("id", "unknown")
|
||||
|
||||
return _ok({
|
||||
"meta_asset_id": meta_asset_id,
|
||||
"asset_name": payload.asset_name,
|
||||
"asset_type": payload.asset_type,
|
||||
"uploaded_at": datetime.utcnow().isoformat(),
|
||||
})
|
||||
except Exception as exc:
|
||||
logger.error("Creative sync failed: %s", exc)
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
|
||||
|
||||
|
||||
# ── 3. GET /insights/realtime ─────────────────────────────────────────────────
|
||||
|
||||
@router.get("/insights/realtime", summary="Poll Meta Ads Insights API")
|
||||
async def get_realtime_insights(
|
||||
date_preset: str = "last_7_days",
|
||||
level: str = "adset",
|
||||
) -> dict:
|
||||
"""
|
||||
Polls `AdAccount.get_insights()` for CTR, CPA, spend, impressions across Ad Sets.
|
||||
Supports `date_preset` (e.g. 'today', 'last_7_days', 'last_30_days') and
|
||||
`level` ('campaign', 'adset', 'ad').
|
||||
|
||||
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
|
||||
"""
|
||||
_api, account_id = _get_sdk()
|
||||
|
||||
try:
|
||||
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
|
||||
from facebook_business.adobjects.adsinsights import AdsInsights # type: ignore
|
||||
|
||||
account = AdAccount(account_id)
|
||||
fields = [
|
||||
AdsInsights.Field.campaign_name,
|
||||
AdsInsights.Field.adset_name,
|
||||
AdsInsights.Field.spend,
|
||||
AdsInsights.Field.impressions,
|
||||
AdsInsights.Field.clicks,
|
||||
AdsInsights.Field.ctr,
|
||||
AdsInsights.Field.cpp, # cost per purchase (proxy for CPA)
|
||||
AdsInsights.Field.date_start,
|
||||
AdsInsights.Field.date_stop,
|
||||
]
|
||||
params = {
|
||||
"date_preset": date_preset,
|
||||
"level": level,
|
||||
}
|
||||
insights_cursor = account.get_insights(fields=fields, params=params)
|
||||
results = [dict(row) for row in insights_cursor]
|
||||
|
||||
return _ok(results, meta={
|
||||
"account_id": account_id,
|
||||
"date_preset": date_preset,
|
||||
"level": level,
|
||||
"count": len(results),
|
||||
})
|
||||
except Exception as exc:
|
||||
logger.error("Insights fetch failed: %s", exc)
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
|
||||
|
||||
|
||||
# ── 4. POST /audiences/lookalike ──────────────────────────────────────────────
|
||||
|
||||
@router.post("/audiences/lookalike", summary="Push Supabase CRM leads → Meta Lookalike Audience")
|
||||
async def create_lookalike_audience(
|
||||
request: Request,
|
||||
payload: LookalikeAudienceRequest,
|
||||
) -> dict:
|
||||
"""
|
||||
1. Queries the Supabase `leads` table for rows matching `status = payload.crm_filter_status`.
|
||||
2. SHA-256 hashes their email addresses.
|
||||
3. Creates (or updates) a Meta Custom Audience with the hashed emails.
|
||||
4. Creates a Lookalike Audience from that Custom Audience.
|
||||
|
||||
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID, SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY
|
||||
"""
|
||||
_api, account_id = _get_sdk()
|
||||
supabase = _get_supabase()
|
||||
|
||||
# ── Step 1: Fetch qualified leads from Supabase CRM ──
|
||||
try:
|
||||
response = supabase.table("leads") \
|
||||
.select("id, email, name") \
|
||||
.eq("status", payload.crm_filter_status) \
|
||||
.execute()
|
||||
leads = response.data or []
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"Supabase query failed: {exc}")
|
||||
|
||||
if not leads:
|
||||
return _ok({"message": f"No leads found with status '{payload.crm_filter_status}'."})
|
||||
|
||||
# ── Step 2: Hash emails ──
|
||||
hashed_emails = [
|
||||
_sha256_hash(lead["email"])
|
||||
for lead in leads
|
||||
if lead.get("email")
|
||||
]
|
||||
if not hashed_emails:
|
||||
raise HTTPException(status_code=422, detail="No valid email addresses found in the filtered leads.")
|
||||
|
||||
# ── Step 3: Create / update Meta Custom Audience ──
|
||||
try:
|
||||
from facebook_business.adobjects.adaccount import AdAccount # type: ignore
|
||||
from facebook_business.adobjects.customaudience import CustomAudience # type: ignore
|
||||
|
||||
account = AdAccount(account_id)
|
||||
audience_name = f"Velocity CRM — {payload.crm_filter_status} Leads"
|
||||
|
||||
# Create custom audience
|
||||
custom_audience = account.create_custom_audience(params={
|
||||
CustomAudience.Field.name: audience_name,
|
||||
CustomAudience.Field.subtype: "CUSTOM",
|
||||
CustomAudience.Field.description: f"Auto-generated from Velocity CRM — {len(hashed_emails)} leads",
|
||||
"customer_file_source": "USER_PROVIDED_ONLY",
|
||||
})
|
||||
audience_id = custom_audience["id"]
|
||||
|
||||
# Add users via hashed emails
|
||||
custom_audience.create_users_replace(params={
|
||||
"payload": {
|
||||
"schema": ["EMAIL_SHA256"],
|
||||
"data": [[h] for h in hashed_emails],
|
||||
}
|
||||
})
|
||||
|
||||
# ── Step 4: Create Lookalike Audience ──
|
||||
lookalike = account.create_lookalike_audience(params={
|
||||
"name": f"Velocity Lookalike — {payload.crm_filter_status} ({int(payload.ratio * 100)}%)",
|
||||
"origin_audience_id": audience_id,
|
||||
"lookalike_spec": {
|
||||
"type": "similarity",
|
||||
"ratio": payload.ratio,
|
||||
"country": payload.country,
|
||||
},
|
||||
})
|
||||
|
||||
# Broadcast live event
|
||||
if hasattr(request.app.state, "broadcast_live_event"):
|
||||
await request.app.state.broadcast_live_event(
|
||||
"create",
|
||||
f"Created Lookalike Audience from {len(hashed_emails)} CRM Closed/Won leads.",
|
||||
None,
|
||||
f"+{len(hashed_emails):,} leads",
|
||||
)
|
||||
|
||||
return _ok({
|
||||
"custom_audience_id": audience_id,
|
||||
"lookalike_audience_id": lookalike["id"],
|
||||
"leads_processed": len(hashed_emails),
|
||||
"country": payload.country,
|
||||
"ratio": payload.ratio,
|
||||
})
|
||||
except Exception as exc:
|
||||
logger.error("Audience creation failed: %s", exc)
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
|
||||
|
||||
|
||||
# ── 5. POST /auth/meta ────────────────────────────────────────────────────────
|
||||
|
||||
@router.post("/auth/meta", summary="Exchange short-lived token for System User token")
|
||||
async def meta_oauth(payload: MetaAuthRequest) -> dict:
|
||||
"""
|
||||
Exchanges a short-lived Meta user token for a long-lived token using the
|
||||
`/oauth/access_token` endpoint, then stores it in Supabase for persistence.
|
||||
|
||||
Requires: META_APP_ID, META_APP_SECRET
|
||||
"""
|
||||
import httpx
|
||||
|
||||
app_id = os.getenv("META_APP_ID", "")
|
||||
app_secret = os.getenv("META_APP_SECRET", "")
|
||||
api_ver = os.getenv("META_API_VERSION", "v21.0")
|
||||
|
||||
if app_id.startswith("PLACEHOLDER") or app_secret.startswith("PLACEHOLDER"):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail="META_APP_ID or META_APP_SECRET not configured.",
|
||||
)
|
||||
|
||||
url = f"https://graph.facebook.com/{api_ver}/oauth/access_token"
|
||||
params = {
|
||||
"grant_type": "fb_exchange_token",
|
||||
"client_id": app_id,
|
||||
"client_secret": app_secret,
|
||||
"fb_exchange_token": payload.short_lived_token,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, params=params, timeout=15.0)
|
||||
|
||||
if resp.status_code != 200:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"Meta OAuth error: {resp.text}",
|
||||
)
|
||||
|
||||
token_data = resp.json()
|
||||
long_lived_token = token_data.get("access_token")
|
||||
|
||||
if not long_lived_token:
|
||||
raise HTTPException(status_code=502, detail="No access_token in Meta response.")
|
||||
|
||||
# Persist to Supabase (best-effort — don't block on failure)
|
||||
try:
|
||||
supabase = _get_supabase()
|
||||
supabase.table("catalyst_settings").upsert({
|
||||
"key": "META_ACCESS_TOKEN",
|
||||
"value": long_lived_token,
|
||||
"updated_at": datetime.utcnow().isoformat(),
|
||||
}).execute()
|
||||
except Exception as exc:
|
||||
logger.warning("Could not persist Meta token to Supabase: %s", exc)
|
||||
|
||||
return _ok({
|
||||
"access_token": long_lived_token,
|
||||
"token_type": token_data.get("token_type", "bearer"),
|
||||
"expires_in": token_data.get("expires_in"),
|
||||
})
|
||||
115
backend/main.py
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
The Catalyst — FastAPI Backend
|
||||
Autonomous Digital Marketing Agency powered by Meta Marketing API.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from typing import Set
|
||||
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from api.routes_catalyst import router as catalyst_router
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# ── App ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
app = FastAPI(
|
||||
title="Velocity — Catalyst Backend",
|
||||
description="Meta Marketing API integration for autonomous campaign management.",
|
||||
version="1.0.0",
|
||||
)
|
||||
|
||||
# ── CORS ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
origins = [o.strip() for o in os.getenv("CORS_ORIGINS", "http://localhost:5173").split(",")]
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# ── Routers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
app.include_router(catalyst_router, prefix="/api/catalyst", tags=["Catalyst"])
|
||||
|
||||
# ── WebSocket — Live Optimization Feed ────────────────────────────────────────
|
||||
|
||||
class ConnectionManager:
|
||||
"""Manages active WebSocket connections for live optimization broadcasts."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.active: Set[WebSocket] = set()
|
||||
|
||||
async def connect(self, ws: WebSocket) -> None:
|
||||
await ws.accept()
|
||||
self.active.add(ws)
|
||||
|
||||
def disconnect(self, ws: WebSocket) -> None:
|
||||
self.active.discard(ws)
|
||||
|
||||
async def broadcast(self, payload: dict) -> None:
|
||||
dead: Set[WebSocket] = set()
|
||||
for ws in self.active:
|
||||
try:
|
||||
await ws.send_text(json.dumps(payload))
|
||||
except Exception:
|
||||
dead.add(ws)
|
||||
self.active -= dead
|
||||
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
|
||||
@app.websocket("/ws/catalyst")
|
||||
async def websocket_endpoint(ws: WebSocket) -> None:
|
||||
"""
|
||||
WebSocket endpoint for streaming live Claw Agent optimization events.
|
||||
Clients connect from <LiveOptimizationFeed /> in Catalyst.tsx.
|
||||
"""
|
||||
await manager.connect(ws)
|
||||
try:
|
||||
while True:
|
||||
# Keep-alive: wait for any incoming ping/message
|
||||
data = await ws.receive_text()
|
||||
# Echo back as acknowledgment (clients may send heartbeat pings)
|
||||
await ws.send_text(json.dumps({"type": "ack", "data": data}))
|
||||
except WebSocketDisconnect:
|
||||
manager.disconnect(ws)
|
||||
|
||||
|
||||
# ── Helper: broadcast a live event (called from routes after API mutations) ───
|
||||
|
||||
async def broadcast_live_event(
|
||||
event_type: str,
|
||||
message: str,
|
||||
campaign_name: str | None = None,
|
||||
value: str | None = None,
|
||||
) -> None:
|
||||
payload = {
|
||||
"type": event_type,
|
||||
"message": message,
|
||||
"campaignName": campaign_name,
|
||||
"value": value,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
await manager.broadcast(payload)
|
||||
|
||||
|
||||
# Attach broadcaster so routes can call it
|
||||
app.state.broadcast_live_event = broadcast_live_event
|
||||
|
||||
|
||||
# ── Health check ──────────────────────────────────────────────────────────────
|
||||
|
||||
@app.get("/health", tags=["Health"])
|
||||
async def health() -> dict:
|
||||
return {"status": "ok", "service": "catalyst-backend", "version": "1.0.0"}
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
fastapi>=0.115.0
|
||||
uvicorn[standard]>=0.32.0
|
||||
facebook-business>=21.0.0
|
||||
supabase>=2.10.0
|
||||
python-dotenv>=1.0.0
|
||||
httpx>=0.27.0
|
||||
pydantic>=2.9.0
|
||||
python-multipart>=0.0.12
|
||||
|
||||
159
comfy_engine/scripts/abantika_batch_test.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
# Config
|
||||
GATEWAY_URL = "http://54.91.19.60:8082" # Active IP
|
||||
INPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Abantika Test Sample")
|
||||
OUTPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Abantika Test Samples - Test 2")
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 4 Styles requested by user
|
||||
STYLES = [
|
||||
{
|
||||
"id": "gothic_industrial",
|
||||
"keywords": "Gothic, Indusrial Design, Black, Wood Textures and Accents, Bare Metal Edison Bulbs, Red Silk",
|
||||
"denoise": 0.75
|
||||
},
|
||||
{
|
||||
"id": "greek_minimal",
|
||||
"keywords": "Greek Aesthetic, Minimal, Grand, White, Deep Blue Silk, Emral Green Marbel",
|
||||
"denoise": 0.75
|
||||
},
|
||||
{
|
||||
"id": "turkish_vintage",
|
||||
"keywords": "Turkish Interioir, Mosaic Work, Vintage, Intricate Work, Royal",
|
||||
"denoise": 0.75
|
||||
},
|
||||
{
|
||||
"id": "bali_modern",
|
||||
"keywords": "Bali Aesthetic, Modern, Minimal, Stone, Live Indoor House Plants, Indonesian Suar wood (Samanea saman)",
|
||||
"denoise": 0.75
|
||||
}
|
||||
]
|
||||
|
||||
def map_filename_to_room_type(filename: str) -> str:
|
||||
name = filename.lower()
|
||||
if "bed-room" in name or "bedroom" in name or "bed" in name:
|
||||
return "bedroom"
|
||||
elif "bath-room" in name or "bathroom" in name or "bath" in name:
|
||||
return "bathroom"
|
||||
elif "kitchen" in name:
|
||||
return "kitchen"
|
||||
elif "dining" in name:
|
||||
return "dining_room"
|
||||
elif "balcony" in name:
|
||||
return "balcony"
|
||||
return "living_room" # default
|
||||
|
||||
def run_job(image_path: Path, style_cfg: dict):
|
||||
room_type = map_filename_to_room_type(image_path.name)
|
||||
style_id = style_cfg["id"]
|
||||
keywords = style_cfg["keywords"]
|
||||
|
||||
print(f"\n--- Processing {image_path.name} with style {style_id.upper()} ({room_type}) ---")
|
||||
|
||||
# Check if already processed
|
||||
img_out = OUTPUT_DIR / f"{image_path.stem}_{style_id}.png"
|
||||
if img_out.exists():
|
||||
print(f"[{style_id}] Already processed {img_out.name}, skipping.")
|
||||
return
|
||||
|
||||
# 1. Start generation
|
||||
try:
|
||||
with open(image_path, "rb") as f:
|
||||
files = {"image": (image_path.name, f, "image/jpeg")}
|
||||
data = {
|
||||
"keywords": keywords,
|
||||
"room_type": room_type,
|
||||
"denoise": style_cfg["denoise"]
|
||||
}
|
||||
res = requests.post(f"{GATEWAY_URL}/dream-weaver", files=files, data=data, timeout=180)
|
||||
res.raise_for_status()
|
||||
|
||||
job_data = res.json()
|
||||
job_id = job_data["job_id"]
|
||||
prompt_preview = job_data.get("prompt_preview", "")
|
||||
print(f"[{style_id}] Job submitted: {job_id}")
|
||||
print(f"[{style_id}] Prompt Preview: {prompt_preview[:150]}...")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Failed to submit {image_path.name}: {e}")
|
||||
return
|
||||
|
||||
# 2. Poll status
|
||||
status_url = f"{GATEWAY_URL}/dream-weaver/status/{job_id}"
|
||||
ready = False
|
||||
|
||||
# Poll for up to 6 minutes
|
||||
s_data = {}
|
||||
for i in range(180):
|
||||
time.sleep(2)
|
||||
try:
|
||||
s_res = requests.get(status_url, timeout=10)
|
||||
if s_res.status_code == 200:
|
||||
s_data = s_res.json()
|
||||
if s_data.get("ready"):
|
||||
ready = True
|
||||
break
|
||||
elif s_data.get("status") == "error":
|
||||
print(f"Job failed: {s_data.get('error')}")
|
||||
return
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Polling error: {e}")
|
||||
|
||||
if not ready:
|
||||
print(f"Timeout waiting for job {job_id}")
|
||||
return
|
||||
|
||||
# 3. Download image and save prompt
|
||||
print(f"[{style_id}] Job ready, downloading...")
|
||||
|
||||
# Save prompt data locally
|
||||
prompt_file = OUTPUT_DIR / f"{image_path.stem}_{style_id}_prompt.json"
|
||||
with open(prompt_file, "w") as pf:
|
||||
json.dump(s_data, pf, indent=2)
|
||||
|
||||
result_url = f"{GATEWAY_URL}/dream-weaver/result/{job_id}"
|
||||
try:
|
||||
r_res = requests.get(result_url, stream=True, timeout=60)
|
||||
r_res.raise_for_status()
|
||||
|
||||
with open(img_out, "wb") as f:
|
||||
for chunk in r_res.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
print(f"[{style_id}] Saved {img_out.name}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Failed to download result for {job_id}: {e}")
|
||||
|
||||
def main():
|
||||
# Wait for gateway and Ollama to be fully ready
|
||||
print(f"Checking Gateway at {GATEWAY_URL}/health...")
|
||||
for _ in range(3):
|
||||
try:
|
||||
h = requests.get(f"{GATEWAY_URL}/health", timeout=5)
|
||||
if h.status_code == 200:
|
||||
print("Gateway is UP.")
|
||||
break
|
||||
except requests.exceptions.RequestException:
|
||||
print("Waiting for gateway...")
|
||||
time.sleep(5)
|
||||
else:
|
||||
print("Gateway failed to answer health check. Exiting.")
|
||||
return
|
||||
|
||||
# Get images
|
||||
target_imgs = sorted([f for f in INPUT_DIR.iterdir() if f.suffix.lower() in [".jpg", ".png", ".jpeg"]])
|
||||
|
||||
print(f"Found {len(target_imgs)} target images.")
|
||||
|
||||
for img_path in target_imgs:
|
||||
for style in STYLES:
|
||||
run_job(img_path, style)
|
||||
time.sleep(1) # Small pause between submissions
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python3
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Dream Weaver API Gateway v2 — Dynamic Keyword → Local LLM → ComfyUI Pipeline
|
||||
========================================================================
|
||||
@@ -15,7 +15,7 @@ Environment variables:
|
||||
OLLAMA_URL — Ollama server (default: http://localhost:11434)
|
||||
OLLAMA_MODEL — Model name (default: qwen3.5:27b)
|
||||
"""
|
||||
import asyncio, json, time, uuid, io, sys, os, logging
|
||||
import asyncio, json, time, uuid, io, sys, os, logging, traceback
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
|
||||
@@ -31,7 +31,7 @@ SCRIPTS_DIR = Path(__file__).parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPTS_DIR))
|
||||
|
||||
try:
|
||||
from prompt_expander import expand_prompt, expand_prompt_simple, ROOM_CONTEXTS, ExpandedPrompt
|
||||
from prompt_expander import expand_prompt, ROOM_CONTEXTS, ExpandedPrompt
|
||||
LLM_AVAILABLE = True
|
||||
except ImportError:
|
||||
LLM_AVAILABLE = False
|
||||
@@ -40,7 +40,7 @@ except ImportError:
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger("DreamWeaverGateway")
|
||||
|
||||
COMFY = "http://127.0.0.1:8188"
|
||||
COMFY = "http://127.0.0.1:8118"
|
||||
COMFY_ROOT = "/opt/dlami/nvme/ComfyUI"
|
||||
|
||||
app = FastAPI(
|
||||
@@ -210,7 +210,7 @@ async def expand_endpoint(req: ExpandRequest):
|
||||
additional_notes=req.additional_notes
|
||||
)
|
||||
else:
|
||||
result = expand_prompt_simple(req.keywords, req.room_type)
|
||||
raise RuntimeError("Local LLM model is not available or disabled.")
|
||||
except Exception as e:
|
||||
logger.error(f"Prompt expansion failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"LLM expansion failed: {str(e)}")
|
||||
@@ -291,7 +291,7 @@ async def dream_weaver(
|
||||
additional_notes=additional_notes
|
||||
)
|
||||
else:
|
||||
expanded = expand_prompt_simple(kw_list, room_type)
|
||||
raise HTTPException(status_code=500, detail="LLM model is not available or disabled.")
|
||||
|
||||
# Apply manual overrides if provided
|
||||
if denoise > 0:
|
||||
@@ -335,6 +335,7 @@ async def dream_weaver(
|
||||
except Exception as e:
|
||||
jobs[job_id] = {"status": "error", "error": str(e)}
|
||||
logger.error(f"Generation failed: {e}")
|
||||
traceback.print_exc()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@@ -396,8 +397,10 @@ async def dream_weaver_sync(
|
||||
expanded = _P()
|
||||
elif keywords:
|
||||
kw_list = [k.strip() for k in keywords.split(",") if k.strip()]
|
||||
expanded = (expand_prompt(kw_list, room_type, additional_notes)
|
||||
if LLM_AVAILABLE else expand_prompt_simple(kw_list, room_type))
|
||||
if LLM_AVAILABLE:
|
||||
expanded = expand_prompt(kw_list, room_type, additional_notes)
|
||||
else:
|
||||
raise RuntimeError("Local LLM model is not available or disabled.")
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Provide keywords or custom_positive")
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ Generate JSON containing:
|
||||
1. "positive_prompt" (rich, photorealistic, 80-120 words)
|
||||
2. "negative_prompt" (preventing artifacts, 30-50 words)
|
||||
3. "cfg" (float 6.0-9.0)
|
||||
4. "denoise" (float 0.5-0.85)
|
||||
4. "denoise" (float 0.45-0.65) - CRITICAL: Must be kept low to preserve input image structure
|
||||
5. "steps" (int 25-40)
|
||||
|
||||
RULES FOR POSITIVE PROMPT:
|
||||
@@ -144,7 +144,8 @@ def _call_ollama(user_message: str) -> str:
|
||||
timeout=180 # Large models take time
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()["response"]
|
||||
resp_json = r.json()
|
||||
return resp_json["response"]
|
||||
|
||||
|
||||
def expand_prompt(keywords: list[str], room_type: str = "living_room", additional_notes: str = "") -> ExpandedPrompt:
|
||||
@@ -166,38 +167,68 @@ AVOID: {ctx['avoid']}
|
||||
logger.info("Calling local Ollama LLM...")
|
||||
raw = _call_ollama(user_message).strip()
|
||||
|
||||
json_match = re.search(r'\{[\s\S]*\}', raw)
|
||||
if json_match:
|
||||
raw_json = json_match.group(0)
|
||||
else:
|
||||
raw_json = raw
|
||||
# Log the raw response for debugging
|
||||
logger.info(f"Raw Ollama response length: {len(raw)}")
|
||||
|
||||
data = json.loads(raw_json)
|
||||
# Handle empty response
|
||||
if not raw:
|
||||
logger.error("Empty response from Ollama")
|
||||
raise ValueError("Ollama returned an empty response")
|
||||
|
||||
# Clean string of common junk (control characters, leading/trailing non-bracket junk)
|
||||
raw_cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', raw)
|
||||
|
||||
# More robust JSON block extraction
|
||||
# Try finding the first '{' and last '}'
|
||||
start_idx = raw_cleaned.find('{')
|
||||
end_idx = raw_cleaned.rfind('}')
|
||||
|
||||
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
|
||||
raw_json = raw_cleaned[start_idx:end_idx+1]
|
||||
else:
|
||||
raw_json = raw_cleaned
|
||||
|
||||
try:
|
||||
data = json.loads(raw_json)
|
||||
except json.JSONDecodeError as je:
|
||||
logger.error(f"JSON Decode failed. Raw tail: {raw_json[:100]}...")
|
||||
# Emergency fallback: if we can't parse, try to create a basic structure from keywords
|
||||
return ExpandedPrompt(
|
||||
style_name="fallback_" + (keywords[0] if keywords else "custom"),
|
||||
positive_prompt=", ".join(keywords) + f", photorealistic, high quality, {room_type}",
|
||||
negative_prompt="blurry, distorted, low quality",
|
||||
cfg=7.5,
|
||||
denoise=0.55,
|
||||
steps=30,
|
||||
reasoning="Fallback due to LLM parsing error",
|
||||
source="fallback"
|
||||
)
|
||||
|
||||
return ExpandedPrompt(
|
||||
style_name=data.get("style_name", "custom_local"),
|
||||
positive_prompt=data["positive_prompt"],
|
||||
negative_prompt=data["negative_prompt"],
|
||||
positive_prompt=data.get("positive_prompt", ", ".join(keywords)),
|
||||
negative_prompt=data.get("negative_prompt", "blurry, distorted, low quality"),
|
||||
cfg=float(data.get("cfg", 7.5)),
|
||||
denoise=float(data.get("denoise", 0.72)),
|
||||
denoise=float(data.get("denoise", 0.55)),
|
||||
steps=int(data.get("steps", 30)),
|
||||
reasoning=data.get("reasoning", ""),
|
||||
source="ollama_local"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Ollama failed, using sync fallback: {e}")
|
||||
return expand_prompt_simple(keywords, room_type)
|
||||
|
||||
|
||||
def expand_prompt_simple(keywords: list[str], room_type: str = "living_room") -> ExpandedPrompt:
|
||||
ctx = ROOM_CONTEXTS.get(room_type.replace(" ", "_"), ROOM_CONTEXTS["living_room"])
|
||||
kw_str = ", ".join(keywords)
|
||||
positive = f"{kw_str} interior design, {', '.join(ctx['key_elements'][:4])}, photorealistic {room_type.replace('_', ' ')} interior, architectural photography, 8k resolution, photorealistic"
|
||||
negative = "(worst quality, low quality, illustration, 3d render, 2d, painting, cartoon, sketch), blurry, distorted, extra windows, unrealistic lighting, structural changes"
|
||||
return ExpandedPrompt(
|
||||
style_name="fallback", positive_prompt=positive, negative_prompt=negative,
|
||||
cfg=7.5, denoise=0.72, steps=30, reasoning="No LLM", source="fallback"
|
||||
)
|
||||
logger.error(f"Ollama LLM expansion failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
# Full fallback if anything goes wrong
|
||||
return ExpandedPrompt(
|
||||
style_name="emergency_fallback",
|
||||
positive_prompt=", ".join(keywords) + f", photorealistic, {room_type}",
|
||||
negative_prompt="blurry, distorted",
|
||||
cfg=7.5,
|
||||
denoise=0.55,
|
||||
steps=30,
|
||||
reasoning=f"Emergency fallback due to: {str(e)}",
|
||||
source="emergency"
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
159
comfy_engine/scripts/sagnik_batch_test.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
# Config
|
||||
GATEWAY_URL = "http://54.91.19.60:8288" # Active IP
|
||||
INPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Sagnik Test Sample")
|
||||
OUTPUT_DIR = Path(r"f:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Sagnik Test Sample")
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 4 Styles requested by user
|
||||
STYLES = [
|
||||
{
|
||||
"id": "bali_minimal_stone",
|
||||
"keywords": "Bali Aesthetic, Modern, Minimal, Stone, Live Indoor House Plants, Indonesian, Reclaimed Wood, Eco Friendly Materials, Bare Stone",
|
||||
"denoise": 0.55
|
||||
},
|
||||
{
|
||||
"id": "gothic_industrial",
|
||||
"keywords": "Gothic, Indusrial Design, Black, Wood Textures and Accents, Bare Metal Edison Bulbs, Red Silk",
|
||||
"denoise": 0.55
|
||||
},
|
||||
{
|
||||
"id": "greek_grand",
|
||||
"keywords": "Greek Aesthetic, Minimal, Grand, Emral Green Marbel, Gold paint highlits, Deep Blue Silk-Muslin-Soft",
|
||||
"denoise": 0.55
|
||||
},
|
||||
{
|
||||
"id": "turkish_fusion",
|
||||
"keywords": "Turkish Interioir, Mosaic Work, Vintage, Intricate Work, Royal, Minimal, Fusion",
|
||||
"denoise": 0.55
|
||||
}
|
||||
]
|
||||
|
||||
def map_filename_to_room_type(filename: str) -> str:
|
||||
name = filename.lower()
|
||||
if "bed-room" in name or "bedroom" in name or "bed" in name:
|
||||
return "bedroom"
|
||||
elif "bath-room" in name or "bathroom" in name or "bath" in name:
|
||||
return "bathroom"
|
||||
elif "kitchen" in name:
|
||||
return "kitchen"
|
||||
elif "dining" in name:
|
||||
return "dining_room"
|
||||
elif "balcony" in name:
|
||||
return "balcony"
|
||||
return "living_room" # default
|
||||
|
||||
def run_job(image_path: Path, style_cfg: dict):
|
||||
room_type = map_filename_to_room_type(image_path.name)
|
||||
style_id = style_cfg["id"]
|
||||
keywords = style_cfg["keywords"]
|
||||
|
||||
print(f"\n--- Processing {image_path.name} with style {style_id.upper()} ({room_type}) ---")
|
||||
|
||||
# Check if already processed
|
||||
img_out = OUTPUT_DIR / f"{image_path.stem}_{style_id}.png"
|
||||
if img_out.exists():
|
||||
print(f"[{style_id}] Already processed {img_out.name}, skipping.")
|
||||
return
|
||||
|
||||
# 1. Start generation
|
||||
try:
|
||||
with open(image_path, "rb") as f:
|
||||
files = {"image": (image_path.name, f, "image/jpeg")}
|
||||
data = {
|
||||
"keywords": keywords,
|
||||
"room_type": room_type,
|
||||
"denoise": style_cfg["denoise"]
|
||||
}
|
||||
res = requests.post(f"{GATEWAY_URL}/dream-weaver", files=files, data=data, timeout=180)
|
||||
res.raise_for_status()
|
||||
|
||||
job_data = res.json()
|
||||
job_id = job_data["job_id"]
|
||||
prompt_preview = job_data.get("prompt_preview", "")
|
||||
print(f"[{style_id}] Job submitted: {job_id}")
|
||||
print(f"[{style_id}] Prompt Preview: {prompt_preview[:150]}...")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Failed to submit {image_path.name}: {e}")
|
||||
return
|
||||
|
||||
# 2. Poll status
|
||||
status_url = f"{GATEWAY_URL}/dream-weaver/status/{job_id}"
|
||||
ready = False
|
||||
|
||||
# Poll for up to 6 minutes
|
||||
s_data = {}
|
||||
for i in range(180):
|
||||
time.sleep(2)
|
||||
try:
|
||||
s_res = requests.get(status_url, timeout=10)
|
||||
if s_res.status_code == 200:
|
||||
s_data = s_res.json()
|
||||
if s_data.get("ready"):
|
||||
ready = True
|
||||
break
|
||||
elif s_data.get("status") == "error":
|
||||
print(f"Job failed: {s_data.get('error')}")
|
||||
return
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Polling error: {e}")
|
||||
|
||||
if not ready:
|
||||
print(f"Timeout waiting for job {job_id}")
|
||||
return
|
||||
|
||||
# 3. Download image and save prompt
|
||||
print(f"[{style_id}] Job ready, downloading...")
|
||||
|
||||
# Save prompt data locally
|
||||
prompt_file = OUTPUT_DIR / f"{image_path.stem}_{style_id}_prompt.json"
|
||||
with open(prompt_file, "w") as pf:
|
||||
json.dump(s_data, pf, indent=2)
|
||||
|
||||
result_url = f"{GATEWAY_URL}/dream-weaver/result/{job_id}"
|
||||
try:
|
||||
r_res = requests.get(result_url, stream=True, timeout=60)
|
||||
r_res.raise_for_status()
|
||||
|
||||
with open(img_out, "wb") as f:
|
||||
for chunk in r_res.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
print(f"[{style_id}] Saved {img_out.name}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Failed to download result for {job_id}: {e}")
|
||||
|
||||
def main():
|
||||
# Wait for gateway and Ollama to be fully ready
|
||||
print(f"Checking Gateway at {GATEWAY_URL}/health...")
|
||||
for _ in range(3):
|
||||
try:
|
||||
h = requests.get(f"{GATEWAY_URL}/health", timeout=5)
|
||||
if h.status_code == 200:
|
||||
print("Gateway is UP.")
|
||||
break
|
||||
except requests.exceptions.RequestException:
|
||||
print("Waiting for gateway...")
|
||||
time.sleep(5)
|
||||
else:
|
||||
print("Gateway failed to answer health check. Exiting.")
|
||||
return
|
||||
|
||||
# Get images
|
||||
target_imgs = sorted([f for f in INPUT_DIR.iterdir() if f.suffix.lower() in [".jpg", ".png", ".jpeg"]])
|
||||
|
||||
print(f"Found {len(target_imgs)} target images.")
|
||||
|
||||
for img_path in target_imgs:
|
||||
for style in STYLES:
|
||||
run_job(img_path, style)
|
||||
time.sleep(1) # Small pause between submissions
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
506
comfy_engine/scripts/test_catalyst_batch.py
Normal file
@@ -0,0 +1,506 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
test_catalyst_batch.py — 5-Prompt Batch Test for Catalyst Poster Generation
|
||||
============================================================================
|
||||
|
||||
Sends 5 distinct social media marketing poster generation requests to the
|
||||
ComfyUI server running Qwen-Image-2512. Each test uses:
|
||||
- A different Ground Truth image (room photo from the property)
|
||||
- A Style Reference image (professional real estate marketing poster)
|
||||
- A unique "Prompt Keyword" set that an end-user would type
|
||||
|
||||
The script demonstrates the full end-user flow:
|
||||
User enters: Keywords + Ground Truth Image + Style Reference → Gets Poster
|
||||
|
||||
Test Matrix:
|
||||
1. "luxury modern kitchen" → Kitchen photo + Orange card reference
|
||||
2. "cozy master bedroom" → Bedroom photo + SOLD poster reference
|
||||
3. "elegant living space" → Balcony bedroom + Magazine editorial ref
|
||||
4. "premium apartment lifestyle" → Room with AC + Bellagio luxury ad ref
|
||||
5. "smart home investment" → Corridor/room + Social media grid ref
|
||||
|
||||
Environment: ComfyUI + Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4)
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
import copy
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional, Dict, List
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# CONFIGURATION
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
COMFYUI_SERVER_URL: str = "http://54.91.19.60:8118"
|
||||
"""
|
||||
ComfyUI server URL. Options:
|
||||
- Direct (if port open via SG): http://54.91.19.60:8118
|
||||
- SSH tunnel: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443
|
||||
then use: http://127.0.0.1:8118
|
||||
"""
|
||||
|
||||
BASE_DIR = Path(r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine")
|
||||
INPUT_DIR = BASE_DIR / "test_inputs" / "Sagnik Test Sample New"
|
||||
REF_DIR = INPUT_DIR / "Sample Reference"
|
||||
OUTPUT_DIR = BASE_DIR / "test_outputs" / "catalyst_batch_results"
|
||||
WORKFLOW_PATH = BASE_DIR / "workflows" / "catalyst_poster_qwen.json"
|
||||
|
||||
# Node IDs matching catalyst_poster_qwen.json
|
||||
NODE_GROUND_TRUTH = "1"
|
||||
NODE_STYLE_REF = "2"
|
||||
NODE_POS_PROMPT = "9"
|
||||
NODE_NEG_PROMPT = "10"
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# 5 TEST CASES — Each simulates what an end-user would enter
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
TEST_CASES: List[Dict] = [
|
||||
{
|
||||
"name": "Test 1: Luxury Modern Kitchen",
|
||||
"ground_truth": "IMG_20210330_154502.jpg", # Kitchen with wooden cabinets
|
||||
"style_ref": "6301510fa187aca16f680b2a525ed6de.jpg", # Orange card-style poster
|
||||
"user_keywords": "luxury modern kitchen",
|
||||
"marketing_copy": "Cook Your Dreams to Life",
|
||||
"description": "A modular kitchen showcase — warm tones, premium finishes."
|
||||
},
|
||||
{
|
||||
"name": "Test 2: Cozy Master Bedroom",
|
||||
"ground_truth": "IMG_20210330_154512.jpg", # Bedroom with accent wall
|
||||
"style_ref": "79c9a52c9af0c1d94df025dd1505db83.jpg", # Bold SOLD poster
|
||||
"user_keywords": "cozy master bedroom",
|
||||
"marketing_copy": "Where Comfort Meets Elegance",
|
||||
"description": "Master bedroom with designer wallpaper — aspirational lifestyle."
|
||||
},
|
||||
{
|
||||
"name": "Test 3: Elegant Living Space",
|
||||
"ground_truth": "IMG_20210330_160420.jpg", # Balcony view bedroom
|
||||
"style_ref": "7bb67cbc287300b78b4e8da3da7de242.jpg", # Magazine editorial
|
||||
"user_keywords": "elegant living space",
|
||||
"marketing_copy": "A New Beginning Starts Here",
|
||||
"description": "Bedroom with balcony view — editorial magazine style."
|
||||
},
|
||||
{
|
||||
"name": "Test 4: Premium Apartment Lifestyle",
|
||||
"ground_truth": "IMG_20210330_154534.jpg", # Another room view
|
||||
"style_ref": "ee9d7efdf9303342480d5cb57cec8400.jpg", # Bellagio luxury ad
|
||||
"user_keywords": "premium apartment lifestyle",
|
||||
"marketing_copy": "Live Above the Ordinary",
|
||||
"description": "Premium apartment showcase — Dubai-style luxury marketing."
|
||||
},
|
||||
{
|
||||
"name": "Test 5: Smart Home Investment",
|
||||
"ground_truth": "IMG_20210330_160212.jpg", # Compact room
|
||||
"style_ref": "fd0586727e1b43e9c346a6f851fb50f9.jpg", # Social media grid
|
||||
"user_keywords": "smart home investment",
|
||||
"marketing_copy": "Your Dream Home Is Waiting",
|
||||
"description": "Investment-focused social media post — modern minimalist grid."
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# CORE FUNCTIONS
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def process_prompt(user_keywords: str, marketing_copy: str) -> Tuple[str, str]:
|
||||
"""Parse user input into aesthetic keywords and marketing copy.
|
||||
|
||||
In the production Catalyst UI, the user types keywords and a marketing
|
||||
headline separately. This function validates and returns them.
|
||||
|
||||
Args:
|
||||
user_keywords: Style/aesthetic descriptors (e.g., 'luxury modern kitchen').
|
||||
marketing_copy: Headline text to render on the poster.
|
||||
|
||||
Returns:
|
||||
Validated (aesthetic_keywords, marketing_copy) tuple.
|
||||
|
||||
Raises:
|
||||
ValueError: If either input is empty.
|
||||
"""
|
||||
if not user_keywords.strip():
|
||||
raise ValueError("Keyword prompt cannot be empty")
|
||||
if not marketing_copy.strip():
|
||||
raise ValueError("Marketing copy cannot be empty")
|
||||
|
||||
return user_keywords.strip(), marketing_copy.strip()
|
||||
|
||||
|
||||
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
|
||||
"""Expand user keywords into a full Qwen-Image-2512-optimized prompt.
|
||||
|
||||
Takes simple user keywords and transforms them into a richly detailed
|
||||
prompt that leverages Qwen-Image-2512's strengths: precise typography
|
||||
rendering, cinematic lighting, and photorealistic quality.
|
||||
|
||||
The expanded prompt follows this structure:
|
||||
1. Scene type declaration (marketing poster)
|
||||
2. Aesthetic keyword injection (user's style preferences)
|
||||
3. Typography instruction (exact text + font style)
|
||||
4. Technical quality boosters (8k, photorealistic, etc.)
|
||||
|
||||
Args:
|
||||
aesthetic_keywords: User's style keywords (e.g., 'luxury modern kitchen').
|
||||
marketing_copy: Exact text to appear in the poster.
|
||||
|
||||
Returns:
|
||||
A fully expanded prompt string ready for CLIPTextEncode.
|
||||
|
||||
Example:
|
||||
>>> expand_prompt('luxury modern kitchen', 'Cook Your Dreams')
|
||||
'A stunning, high-end real estate social media marketing poster...'
|
||||
"""
|
||||
return (
|
||||
f"A stunning, high-end real estate social media marketing poster. "
|
||||
f"Style: {aesthetic_keywords}, warm ambient lighting, premium materials, "
|
||||
f"cinematic composition, professional interior photography. "
|
||||
f"The poster must prominently display the exact text "
|
||||
f"'{marketing_copy}' rendered in elegant, bold, modern sans-serif "
|
||||
f"typography with high contrast against the background, crisp edges, "
|
||||
f"perfectly aligned, highly legible. "
|
||||
f"8k resolution, photorealistic quality, detailed textures, "
|
||||
f"architectural magazine aesthetic, ultra-sharp focus, "
|
||||
f"golden hour warmth, depth of field bokeh, premium brand feel, "
|
||||
f"social media optimized layout, clean negative space for text."
|
||||
)
|
||||
|
||||
|
||||
def upload_image(image_path: Path) -> str:
|
||||
"""Upload an image to the ComfyUI server's input directory.
|
||||
|
||||
Opens the image, ensures RGB mode, and uploads via /upload/image.
|
||||
|
||||
Args:
|
||||
image_path: Path to the image file.
|
||||
|
||||
Returns:
|
||||
The server-assigned filename for use in workflow JSON.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the image doesn't exist.
|
||||
ConnectionError: If ComfyUI server is unreachable.
|
||||
"""
|
||||
if not image_path.exists():
|
||||
raise FileNotFoundError(f"Image not found: {image_path}")
|
||||
|
||||
img = Image.open(image_path)
|
||||
if img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
img.save(tmp_path, format="PNG")
|
||||
|
||||
try:
|
||||
with open(tmp_path, "rb") as f:
|
||||
response = requests.post(
|
||||
f"{COMFYUI_SERVER_URL}/upload/image",
|
||||
files={"image": (image_path.name, f, "image/png")},
|
||||
data={"overwrite": "true"},
|
||||
timeout=60
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
server_name = result.get("name", "")
|
||||
if not server_name:
|
||||
raise RuntimeError(f"Upload failed: {result}")
|
||||
|
||||
return server_name
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def execute_workflow(
|
||||
workflow: dict,
|
||||
prompt_text: str,
|
||||
gt_filename: str,
|
||||
sr_filename: str
|
||||
) -> str:
|
||||
"""Inject dynamic values and queue workflow on ComfyUI.
|
||||
|
||||
Updates LoadImage nodes with uploaded filenames and CLIPTextEncode
|
||||
with the expanded prompt, then sends to /prompt endpoint.
|
||||
|
||||
Args:
|
||||
workflow: The loaded workflow JSON dict.
|
||||
prompt_text: Expanded prompt from expand_prompt().
|
||||
gt_filename: Server filename of ground truth image.
|
||||
sr_filename: Server filename of style reference image.
|
||||
|
||||
Returns:
|
||||
The prompt_id from the queue response.
|
||||
"""
|
||||
wf = copy.deepcopy(workflow)
|
||||
|
||||
wf[NODE_GROUND_TRUTH]["inputs"]["image"] = gt_filename
|
||||
wf[NODE_STYLE_REF]["inputs"]["image"] = sr_filename
|
||||
wf[NODE_POS_PROMPT]["inputs"]["text"] = prompt_text
|
||||
|
||||
payload = {
|
||||
"prompt": wf,
|
||||
"client_id": f"catalyst_batch_{int(time.time())}"
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f"{COMFYUI_SERVER_URL}/prompt",
|
||||
json=payload,
|
||||
timeout=30
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
prompt_id = result.get("prompt_id", "")
|
||||
if not prompt_id:
|
||||
raise RuntimeError(f"Queue failed: {result}")
|
||||
|
||||
return prompt_id
|
||||
|
||||
|
||||
def wait_for_completion(prompt_id: str, timeout: int = 600, poll_interval: int = 3) -> dict:
|
||||
"""Poll /history/{prompt_id} until workflow completes.
|
||||
|
||||
Qwen-Image-2512 with 50 steps on L4 GPUs may take 60-180 seconds.
|
||||
|
||||
Args:
|
||||
prompt_id: The queued prompt ID.
|
||||
timeout: Max wait time in seconds.
|
||||
poll_interval: Seconds between polls.
|
||||
|
||||
Returns:
|
||||
The history dict containing output image metadata.
|
||||
"""
|
||||
start = time.time()
|
||||
polls = 0
|
||||
|
||||
while time.time() - start < timeout:
|
||||
time.sleep(poll_interval)
|
||||
polls += 1
|
||||
|
||||
try:
|
||||
r = requests.get(
|
||||
f"{COMFYUI_SERVER_URL}/history/{prompt_id}",
|
||||
timeout=10
|
||||
)
|
||||
if r.status_code == 200:
|
||||
history = r.json()
|
||||
prompt_data = history.get(prompt_id, {})
|
||||
|
||||
# Check for error
|
||||
status = prompt_data.get("status", {})
|
||||
if status.get("status_str") == "error":
|
||||
msgs = status.get("messages", ["Unknown"])
|
||||
raise RuntimeError(f"Workflow error: {msgs}")
|
||||
|
||||
# Check for outputs
|
||||
for node_id, node_out in prompt_data.get("outputs", {}).items():
|
||||
if "images" in node_out and node_out["images"]:
|
||||
elapsed = time.time() - start
|
||||
print(f" ✓ Done in {elapsed:.1f}s ({polls} polls)")
|
||||
return prompt_data
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
if polls % 10 == 0:
|
||||
print(f" ⏳ Still waiting... ({polls} polls)")
|
||||
|
||||
raise TimeoutError(f"Timed out after {timeout}s (prompt: {prompt_id})")
|
||||
|
||||
|
||||
def download_output(history: dict, output_dir: Path, test_name: str) -> str:
|
||||
"""Download the generated poster from ComfyUI.
|
||||
|
||||
Args:
|
||||
history: The prompt history dict.
|
||||
output_dir: Local directory to save to.
|
||||
test_name: Name for the output file.
|
||||
|
||||
Returns:
|
||||
Path to the saved image.
|
||||
"""
|
||||
for node_id, node_out in history.get("outputs", {}).items():
|
||||
images = node_out.get("images", [])
|
||||
if images:
|
||||
img_info = images[0]
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("No output images in history")
|
||||
|
||||
view_url = (
|
||||
f"{COMFYUI_SERVER_URL}/view"
|
||||
f"?filename={img_info['filename']}"
|
||||
f"&subfolder={img_info.get('subfolder', '')}"
|
||||
f"&type={img_info.get('type', 'output')}"
|
||||
)
|
||||
|
||||
r = requests.get(view_url, stream=True, timeout=60)
|
||||
r.raise_for_status()
|
||||
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', test_name).lower()
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
out_path = output_dir / f"{safe_name}_{timestamp}.png"
|
||||
|
||||
with open(out_path, "wb") as f:
|
||||
for chunk in r.iter_content(8192):
|
||||
f.write(chunk)
|
||||
|
||||
size_mb = out_path.stat().st_size / (1024 * 1024)
|
||||
print(f" 💾 Saved: {out_path.name} ({size_mb:.1f} MB)")
|
||||
return str(out_path)
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# MAIN EXECUTION
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 72)
|
||||
print(" CATALYST BATCH TEST — 5 Social Media Poster Prompts")
|
||||
print(" Model: Qwen-Image-2512 | Server: " + COMFYUI_SERVER_URL)
|
||||
print("=" * 72)
|
||||
|
||||
# ── Setup ──
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
print(f"\n📂 Output: {OUTPUT_DIR}")
|
||||
print(f"📄 Workflow: {WORKFLOW_PATH}")
|
||||
print(f"🖼️ Inputs: {INPUT_DIR}")
|
||||
print(f"🎨 Refs: {REF_DIR}")
|
||||
|
||||
# ── Verify server connectivity ──
|
||||
print(f"\n🔌 Testing connection to {COMFYUI_SERVER_URL} ...")
|
||||
try:
|
||||
r = requests.get(f"{COMFYUI_SERVER_URL}/system_stats", timeout=10)
|
||||
r.raise_for_status()
|
||||
stats = r.json()
|
||||
gpu_info = stats.get("devices", [])
|
||||
print(f" ✓ Connected! GPUs: {len(gpu_info)}")
|
||||
for gpu in gpu_info:
|
||||
name = gpu.get("name", "unknown")
|
||||
vram_total = gpu.get("vram_total", 0) / (1024**3)
|
||||
vram_free = gpu.get("vram_free", 0) / (1024**3)
|
||||
print(f" • {name}: {vram_free:.1f}/{vram_total:.1f} GB free")
|
||||
except requests.exceptions.ConnectionError:
|
||||
print(f" ✗ FAILED: Cannot reach {COMFYUI_SERVER_URL}")
|
||||
print(f" Ensure ComfyUI is running and the port is accessible.")
|
||||
print(f" Try: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f" ⚠ Warning: {e}")
|
||||
|
||||
# ── Load workflow ──
|
||||
try:
|
||||
with open(WORKFLOW_PATH, "r", encoding="utf-8") as f:
|
||||
workflow = json.load(f)
|
||||
print(f"\n📋 Workflow loaded ({len(workflow)} nodes)")
|
||||
except Exception as e:
|
||||
print(f"\n✗ Failed to load workflow: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# ── Run 5 tests ──
|
||||
results = []
|
||||
total_start = time.time()
|
||||
|
||||
for i, test in enumerate(TEST_CASES, 1):
|
||||
print(f"\n{'━' * 72}")
|
||||
print(f" TEST {i}/5: {test['name']}")
|
||||
print(f" {test['description']}")
|
||||
print(f"{'━' * 72}")
|
||||
|
||||
gt_path = INPUT_DIR / test["ground_truth"]
|
||||
sr_path = REF_DIR / test["style_ref"]
|
||||
|
||||
print(f" 📸 Ground Truth: {test['ground_truth']}")
|
||||
print(f" 🎨 Style Ref: {test['style_ref']}")
|
||||
print(f" 🏷️ Keywords: {test['user_keywords']}")
|
||||
print(f" ✍️ Copy: \"{test['marketing_copy']}\"")
|
||||
|
||||
try:
|
||||
# Step 1: Parse
|
||||
keywords, copy_text = process_prompt(
|
||||
test["user_keywords"],
|
||||
test["marketing_copy"]
|
||||
)
|
||||
|
||||
# Step 2: Expand
|
||||
expanded = expand_prompt(keywords, copy_text)
|
||||
print(f"\n 📝 Expanded prompt ({len(expanded)} chars):")
|
||||
print(f" {expanded[:100]}...")
|
||||
|
||||
# Step 3: Upload
|
||||
print(f"\n ⬆️ Uploading images...")
|
||||
gt_name = upload_image(gt_path)
|
||||
print(f" GT → {gt_name}")
|
||||
sr_name = upload_image(sr_path)
|
||||
print(f" SR → {sr_name}")
|
||||
|
||||
# Step 4: Execute
|
||||
print(f"\n 🚀 Queuing workflow...")
|
||||
prompt_id = execute_workflow(workflow, expanded, gt_name, sr_name)
|
||||
print(f" prompt_id: {prompt_id}")
|
||||
|
||||
# Step 5: Wait
|
||||
print(f"\n ⏳ Waiting for generation...")
|
||||
history = wait_for_completion(prompt_id, timeout=600)
|
||||
|
||||
# Step 6: Download
|
||||
print(f"\n ⬇️ Downloading result...")
|
||||
out_path = download_output(history, OUTPUT_DIR, test["name"])
|
||||
|
||||
results.append({
|
||||
"test": test["name"],
|
||||
"status": "✅ SUCCESS",
|
||||
"output": out_path,
|
||||
"prompt_id": prompt_id
|
||||
})
|
||||
|
||||
except FileNotFoundError as e:
|
||||
print(f"\n ✗ FILE NOT FOUND: {e}")
|
||||
results.append({"test": test["name"], "status": "❌ FILE NOT FOUND", "error": str(e)})
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f"\n ✗ CONNECTION ERROR: {e}")
|
||||
results.append({"test": test["name"], "status": "❌ CONNECTION ERROR", "error": str(e)})
|
||||
except TimeoutError as e:
|
||||
print(f"\n ✗ TIMEOUT: {e}")
|
||||
results.append({"test": test["name"], "status": "❌ TIMEOUT", "error": str(e)})
|
||||
except RuntimeError as e:
|
||||
print(f"\n ✗ RUNTIME ERROR: {e}")
|
||||
results.append({"test": test["name"], "status": "❌ RUNTIME ERROR", "error": str(e)})
|
||||
except Exception as e:
|
||||
print(f"\n ✗ UNEXPECTED ERROR: {type(e).__name__}: {e}")
|
||||
results.append({"test": test["name"], "status": "❌ ERROR", "error": str(e)})
|
||||
|
||||
# ── Summary ──
|
||||
total_time = time.time() - total_start
|
||||
print(f"\n\n{'=' * 72}")
|
||||
print(f" BATCH TEST SUMMARY")
|
||||
print(f" Total time: {total_time:.1f}s | Tests: {len(TEST_CASES)}")
|
||||
print(f"{'=' * 72}")
|
||||
|
||||
successes = 0
|
||||
for r in results:
|
||||
print(f" {r['status']} {r['test']}")
|
||||
if "output" in r:
|
||||
print(f" → {r['output']}")
|
||||
successes += 1
|
||||
elif "error" in r:
|
||||
print(f" → {r['error'][:80]}")
|
||||
|
||||
print(f"\n Result: {successes}/{len(TEST_CASES)} passed")
|
||||
print(f"{'=' * 72}")
|
||||
|
||||
# Save results to JSON
|
||||
results_file = OUTPUT_DIR / "batch_results.json"
|
||||
with open(results_file, "w") as f:
|
||||
json.dump(results, f, indent=2, default=str)
|
||||
print(f"\n 📊 Results saved: {results_file}")
|
||||
569
comfy_engine/scripts/test_catalyst_workflow.py
Normal file
@@ -0,0 +1,569 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
test_catalyst_workflow.py — Catalyst Poster Generation via ComfyUI + Qwen-Image-2512
|
||||
=====================================================================================
|
||||
|
||||
This script tests the Catalyst real-estate poster generation workflow by:
|
||||
1. Uploading a Ground Truth (architectural/floorplan) image to ComfyUI
|
||||
2. Uploading a Style Reference image (from Google/Pinterest) to ComfyUI
|
||||
3. Parsing raw UI input to extract marketing copy and aesthetic keywords
|
||||
4. Expanding the parsed input into a full Qwen-Image-2512-tuned prompt
|
||||
5. Dynamically injecting filenames and prompts into the workflow JSON
|
||||
6. Queuing the workflow on the ComfyUI server and polling for completion
|
||||
7. Downloading the final poster to a local output directory
|
||||
|
||||
Environment:
|
||||
- ComfyUI backend running Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4, 96GB VRAM)
|
||||
- Model location: /home/ubuntu/models/Qwen-Image-2512 (diffusers sharded format)
|
||||
- ComfyUI location: /home/ubuntu/velocity/
|
||||
- Internal ComfyUI port: 8118 | External gateway port: 8288
|
||||
|
||||
Usage:
|
||||
python test_catalyst_workflow.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# CONFIGURATION — Update COMFYUI_SERVER_URL with your AWS instance IP
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
COMFYUI_SERVER_URL: str = "http://<AWS_INSTANCE_IP>:8188"
|
||||
"""
|
||||
ComfyUI server URL. Replace <AWS_INSTANCE_IP> with the actual IP address.
|
||||
- For direct access (SSH tunnel): http://127.0.0.1:8118
|
||||
- For external access (if port 8188 is open): http://54.91.19.60:8188
|
||||
- Via Dream Weaver gateway (does NOT apply here): http://54.91.19.60:8288
|
||||
Note: The internal ComfyUI port on the AWS instance is 8118. If SSH-tunnelling,
|
||||
map local port 8188 to remote port 8118.
|
||||
"""
|
||||
|
||||
INPUT_DIR: str = r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_inputs\Sagnik Test Sample New"
|
||||
"""Base directory containing Ground Truth and Style Reference test images."""
|
||||
|
||||
OUTPUT_DIR: str = r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine\test_outputs\Sagnik Test Sample New"
|
||||
"""Directory to save generated poster outputs."""
|
||||
|
||||
WORKFLOW_JSON_PATH: str = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)),
|
||||
"..", "workflows", "catalyst_poster_qwen.json"
|
||||
)
|
||||
"""Path to the catalyst_poster_qwen.json workflow file (relative to this script)."""
|
||||
|
||||
# Node IDs in the workflow JSON (must match catalyst_poster_qwen.json)
|
||||
NODE_ID_GROUND_TRUTH: str = "1" # LoadImage node for Ground Truth
|
||||
NODE_ID_STYLE_REF: str = "2" # LoadImage node for Style Reference
|
||||
NODE_ID_POSITIVE_PROMPT: str = "9" # CLIPTextEncode node for positive prompt
|
||||
NODE_ID_NEGATIVE_PROMPT: str = "10" # CLIPTextEncode node for negative prompt
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 1: Prompt Parsing
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def process_prompt(raw_ui_input: str) -> Tuple[str, str]:
|
||||
"""Parse raw UI input into aesthetic keywords and marketing copy.
|
||||
|
||||
The raw input must contain marketing copy enclosed in double quotes.
|
||||
Everything outside the quotes is treated as aesthetic/style keywords.
|
||||
|
||||
Args:
|
||||
raw_ui_input: Raw string from the UI, e.g.:
|
||||
'modern luxury warm lighting "Your Dream Home Awaits"'
|
||||
|
||||
Returns:
|
||||
A tuple of (aesthetic_keywords, marketing_copy).
|
||||
|
||||
Raises:
|
||||
ValueError: If no text enclosed in double quotes is found.
|
||||
|
||||
Example:
|
||||
>>> process_prompt('art deco gold "Live in Elegance"')
|
||||
('art deco gold', 'Live in Elegance')
|
||||
"""
|
||||
match = re.search(r'"([^"]+)"', raw_ui_input)
|
||||
|
||||
if not match:
|
||||
raise ValueError(
|
||||
"Marketing copy must be enclosed in double quotes. "
|
||||
"Example: 'modern luxury \"Your Dream Home Awaits\"'"
|
||||
)
|
||||
|
||||
marketing_copy: str = match.group(1).strip()
|
||||
|
||||
# Extract everything outside the quotes as aesthetic keywords
|
||||
aesthetic_keywords: str = raw_ui_input[:match.start()] + raw_ui_input[match.end():]
|
||||
aesthetic_keywords = re.sub(r'\s+', ' ', aesthetic_keywords).strip()
|
||||
|
||||
return aesthetic_keywords, marketing_copy
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 2: Prompt Expansion
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
|
||||
"""Expand parsed inputs into a full Qwen-Image-2512-optimized prompt.
|
||||
|
||||
Constructs a semantically rich prompt formatted for Qwen-Image-2512's
|
||||
typography rendering capabilities. The prompt explicitly instructs the
|
||||
model to render text within the generated image.
|
||||
|
||||
Args:
|
||||
aesthetic_keywords: Style descriptors (e.g., 'modern luxury warm lighting').
|
||||
marketing_copy: Exact text to render in the poster (e.g., 'Your Dream Home Awaits').
|
||||
|
||||
Returns:
|
||||
A complete prompt string ready for CLIPTextEncode.
|
||||
|
||||
Example:
|
||||
>>> expand_prompt('modern luxury', 'Live in Style')
|
||||
'A highly realistic, cinematic realestate marketing poster...'
|
||||
"""
|
||||
return (
|
||||
f"A highly realistic, cinematic realestate marketing poster. "
|
||||
f"Interior style: {aesthetic_keywords}. "
|
||||
f"The image must prominently feature the exact text "
|
||||
f"'{marketing_copy}' written in elegant, modern, highly legible "
|
||||
f"typography. Professional lighting, 8k resolution, photorealistic "
|
||||
f"quality, detailed textures."
|
||||
)
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 3: Image Upload
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def upload_image(image_path: str) -> str:
|
||||
"""Upload an image to the ComfyUI server for use in workflows.
|
||||
|
||||
Opens the image, converts to RGB if necessary, saves as a
|
||||
temporary PNG, and uploads via the /upload/image endpoint.
|
||||
|
||||
Args:
|
||||
image_path: Absolute path to the image file on disk.
|
||||
|
||||
Returns:
|
||||
The server-side filename assigned by ComfyUI (used in workflow JSON).
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the image file does not exist.
|
||||
requests.exceptions.ConnectionError: If the ComfyUI server is unreachable.
|
||||
requests.exceptions.Timeout: If the upload times out.
|
||||
RuntimeError: If the server returns an unexpected response.
|
||||
"""
|
||||
path = Path(image_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Image not found: {image_path}")
|
||||
|
||||
# Open and ensure RGB mode
|
||||
img = Image.open(path)
|
||||
if img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
|
||||
# Save to a temporary PNG buffer for upload
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
img.save(tmp_path, format="PNG")
|
||||
|
||||
try:
|
||||
with open(tmp_path, "rb") as f:
|
||||
files = {
|
||||
"image": (path.name, f, "image/png")
|
||||
}
|
||||
data = {
|
||||
"overwrite": "true"
|
||||
}
|
||||
response = requests.post(
|
||||
f"{COMFYUI_SERVER_URL}/upload/image",
|
||||
files=files,
|
||||
data=data,
|
||||
timeout=60
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
server_filename: str = result.get("name", "")
|
||||
|
||||
if not server_filename:
|
||||
raise RuntimeError(
|
||||
f"ComfyUI upload returned unexpected response: {result}"
|
||||
)
|
||||
|
||||
print(f" ✓ Uploaded '{path.name}' → server filename: '{server_filename}'")
|
||||
return server_filename
|
||||
|
||||
finally:
|
||||
# Clean up temp file
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 4: Execute Workflow
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def execute_workflow(
|
||||
workflow_json: dict,
|
||||
prompt_text: str,
|
||||
ground_truth_filename: str,
|
||||
style_ref_filename: str
|
||||
) -> str:
|
||||
"""Inject dynamic values into the workflow JSON and queue it on ComfyUI.
|
||||
|
||||
Updates the following nodes in the workflow:
|
||||
- Node 1 (LoadImage): Sets Ground Truth filename
|
||||
- Node 2 (LoadImage): Sets Style Reference filename
|
||||
- Node 9 (CLIPTextEncode): Sets the expanded positive prompt
|
||||
|
||||
Args:
|
||||
workflow_json: The loaded workflow JSON dict (API format).
|
||||
prompt_text: The expanded prompt string from expand_prompt().
|
||||
ground_truth_filename: Server-side filename of the ground truth image.
|
||||
style_ref_filename: Server-side filename of the style reference image.
|
||||
|
||||
Returns:
|
||||
The prompt_id string from ComfyUI's queue response.
|
||||
|
||||
Raises:
|
||||
requests.exceptions.ConnectionError: If the server is unreachable.
|
||||
requests.exceptions.Timeout: If the request times out.
|
||||
KeyError: If expected node IDs are missing from the workflow JSON.
|
||||
"""
|
||||
# Deep copy to avoid mutating the original
|
||||
import copy
|
||||
wf = copy.deepcopy(workflow_json)
|
||||
|
||||
# Inject Ground Truth image filename
|
||||
wf[NODE_ID_GROUND_TRUTH]["inputs"]["image"] = ground_truth_filename
|
||||
|
||||
# Inject Style Reference image filename
|
||||
wf[NODE_ID_STYLE_REF]["inputs"]["image"] = style_ref_filename
|
||||
|
||||
# Inject expanded positive prompt
|
||||
wf[NODE_ID_POSITIVE_PROMPT]["inputs"]["text"] = prompt_text
|
||||
|
||||
# Build the API payload
|
||||
payload = {
|
||||
"prompt": wf,
|
||||
"client_id": f"catalyst_{int(time.time())}"
|
||||
}
|
||||
|
||||
print(f" → Queuing workflow on {COMFYUI_SERVER_URL}/prompt ...")
|
||||
|
||||
response = requests.post(
|
||||
f"{COMFYUI_SERVER_URL}/prompt",
|
||||
json=payload,
|
||||
timeout=30
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
prompt_id: str = result.get("prompt_id", "")
|
||||
|
||||
if not prompt_id:
|
||||
raise RuntimeError(
|
||||
f"ComfyUI /prompt returned unexpected response: {result}"
|
||||
)
|
||||
|
||||
print(f" ✓ Queued successfully. prompt_id: {prompt_id}")
|
||||
return prompt_id
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 5: Poll for Completion
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def wait_for_completion(
|
||||
prompt_id: str,
|
||||
timeout: int = 300,
|
||||
poll_interval: int = 2
|
||||
) -> dict:
|
||||
"""Poll the ComfyUI history endpoint until the workflow completes.
|
||||
|
||||
Repeatedly checks /history/{prompt_id} for output images. The Qwen-Image-2512
|
||||
model with 50 inference steps on 4x L4 GPUs typically takes 30-90 seconds.
|
||||
|
||||
Args:
|
||||
prompt_id: The prompt ID returned by execute_workflow().
|
||||
timeout: Maximum seconds to wait before raising TimeoutError.
|
||||
poll_interval: Seconds between poll requests.
|
||||
|
||||
Returns:
|
||||
The history dict for this prompt_id (contains output image metadata).
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the workflow doesn't complete within timeout seconds.
|
||||
requests.exceptions.ConnectionError: If the server is unreachable.
|
||||
RuntimeError: If the workflow reports an error status.
|
||||
"""
|
||||
history_url = f"{COMFYUI_SERVER_URL}/history/{prompt_id}"
|
||||
start_time = time.time()
|
||||
poll_count = 0
|
||||
|
||||
print(f" ⏳ Polling for completion (timeout: {timeout}s) ...")
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
time.sleep(poll_interval)
|
||||
poll_count += 1
|
||||
|
||||
try:
|
||||
response = requests.get(history_url, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
history = response.json()
|
||||
prompt_history = history.get(prompt_id, {})
|
||||
|
||||
# Check for error status
|
||||
status_info = prompt_history.get("status", {})
|
||||
if status_info.get("status_str") == "error":
|
||||
error_msgs = status_info.get("messages", ["Unknown error"])
|
||||
raise RuntimeError(
|
||||
f"Workflow execution failed: {error_msgs}"
|
||||
)
|
||||
|
||||
# Check for output images
|
||||
outputs = prompt_history.get("outputs", {})
|
||||
for node_id, node_output in outputs.items():
|
||||
if "images" in node_output and node_output["images"]:
|
||||
elapsed = time.time() - start_time
|
||||
print(
|
||||
f" ✓ Completed in {elapsed:.1f}s "
|
||||
f"({poll_count} polls)"
|
||||
)
|
||||
return prompt_history
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
# Server might be busy with GPU inference, retry
|
||||
print(f" Poll #{poll_count}: Connection interrupted, retrying...")
|
||||
except requests.exceptions.Timeout:
|
||||
print(f" Poll #{poll_count}: Timeout, retrying...")
|
||||
|
||||
raise TimeoutError(
|
||||
f"Workflow did not complete within {timeout} seconds "
|
||||
f"(prompt_id: {prompt_id})"
|
||||
)
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# FUNCTION 6: Download Output
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
def download_output(history: dict, output_dir: str) -> str:
|
||||
"""Extract and download the generated poster from ComfyUI history.
|
||||
|
||||
Reads the output image metadata from the history response, constructs
|
||||
the /view URL, downloads the image, and saves it with a timestamped
|
||||
filename.
|
||||
|
||||
Args:
|
||||
history: The prompt history dict returned by wait_for_completion().
|
||||
output_dir: Local directory to save the output image.
|
||||
|
||||
Returns:
|
||||
The absolute path to the saved output image.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If no output images are found in the history.
|
||||
requests.exceptions.ConnectionError: If the server is unreachable.
|
||||
"""
|
||||
# Find the output image in the history
|
||||
output_image: Optional[dict] = None
|
||||
|
||||
for node_id, node_output in history.get("outputs", {}).items():
|
||||
images = node_output.get("images", [])
|
||||
if images:
|
||||
output_image = images[0]
|
||||
break
|
||||
|
||||
if not output_image:
|
||||
raise RuntimeError("No output images found in workflow history")
|
||||
|
||||
# Construct the ComfyUI /view URL
|
||||
filename = output_image["filename"]
|
||||
subfolder = output_image.get("subfolder", "")
|
||||
img_type = output_image.get("type", "output")
|
||||
|
||||
view_url = (
|
||||
f"{COMFYUI_SERVER_URL}/view"
|
||||
f"?filename={filename}"
|
||||
f"&subfolder={subfolder}"
|
||||
f"&type={img_type}"
|
||||
)
|
||||
|
||||
print(f" ⬇ Downloading: {filename} ...")
|
||||
|
||||
response = requests.get(view_url, stream=True, timeout=60)
|
||||
response.raise_for_status()
|
||||
|
||||
# Save with timestamp
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
output_filename = f"catalyst_poster_{timestamp}.png"
|
||||
output_path = os.path.join(output_dir, output_filename)
|
||||
|
||||
with open(output_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
file_size_mb = os.path.getsize(output_path) / (1024 * 1024)
|
||||
print(f" ✓ Saved: {output_path} ({file_size_mb:.1f} MB)")
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
# MAIN EXECUTION
|
||||
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 72)
|
||||
print(" CATALYST POSTER GENERATION — Qwen-Image-2512 Workflow Test")
|
||||
print("=" * 72)
|
||||
|
||||
# ── Ensure output directory exists ──
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
print(f"\n📂 Output dir: {OUTPUT_DIR}")
|
||||
|
||||
# ── Load workflow JSON ──
|
||||
workflow_path = os.path.normpath(WORKFLOW_JSON_PATH)
|
||||
print(f"📄 Workflow: {workflow_path}")
|
||||
|
||||
try:
|
||||
with open(workflow_path, "r", encoding="utf-8") as f:
|
||||
workflow = json.load(f)
|
||||
print(f" ✓ Loaded workflow ({len(workflow)} nodes)")
|
||||
except FileNotFoundError:
|
||||
print(f" ✗ ERROR: Workflow file not found: {workflow_path}")
|
||||
print(" Ensure catalyst_poster_qwen.json is in ../workflows/")
|
||||
exit(1)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f" ✗ ERROR: Invalid JSON in workflow file: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Define test inputs ──
|
||||
# Update these paths to your actual test images
|
||||
ground_truth_image = os.path.join(INPUT_DIR, "ground_truth.jpg")
|
||||
style_reference_image = os.path.join(INPUT_DIR, "style_reference.jpg")
|
||||
|
||||
raw_prompt = (
|
||||
'modern luxury warm ambient lighting premium materials '
|
||||
'golden hour cinematic architectural photography '
|
||||
'"Your Dream Home Awaits"'
|
||||
)
|
||||
|
||||
print(f"\n🖼️ Ground Truth: {ground_truth_image}")
|
||||
print(f"🎨 Style Reference: {style_reference_image}")
|
||||
print(f"📝 Raw Prompt: {raw_prompt}")
|
||||
|
||||
# ── Step 1: Parse the prompt ──
|
||||
print("\n── Step 1: Parsing prompt ──")
|
||||
try:
|
||||
aesthetic_keywords, marketing_copy = process_prompt(raw_prompt)
|
||||
print(f" Keywords: {aesthetic_keywords}")
|
||||
print(f" Copy: \"{marketing_copy}\"")
|
||||
except ValueError as e:
|
||||
print(f" ✗ ERROR: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Step 2: Expand the prompt ──
|
||||
print("\n── Step 2: Expanding prompt ──")
|
||||
expanded = expand_prompt(aesthetic_keywords, marketing_copy)
|
||||
print(f" Expanded ({len(expanded)} chars):")
|
||||
print(f" {expanded[:120]}...")
|
||||
|
||||
# ── Step 3: Upload images ──
|
||||
print("\n── Step 3: Uploading images ──")
|
||||
try:
|
||||
gt_filename = upload_image(ground_truth_image)
|
||||
sr_filename = upload_image(style_reference_image)
|
||||
except FileNotFoundError as e:
|
||||
print(f" ✗ ERROR: {e}")
|
||||
print(" Place test images in the INPUT_DIR directory.")
|
||||
exit(1)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f" ✗ CONNECTION ERROR: Cannot reach {COMFYUI_SERVER_URL}")
|
||||
print(f" Details: {e}")
|
||||
print(" Ensure ComfyUI is running and the URL is correct.")
|
||||
print(" If using SSH tunnel: ssh -L 8188:127.0.0.1:8118 ...")
|
||||
exit(1)
|
||||
except requests.exceptions.Timeout:
|
||||
print(f" ✗ TIMEOUT: Upload timed out to {COMFYUI_SERVER_URL}")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
print(f" ✗ UNEXPECTED ERROR during upload: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Step 4: Execute workflow ──
|
||||
print("\n── Step 4: Executing workflow ──")
|
||||
try:
|
||||
prompt_id = execute_workflow(
|
||||
workflow_json=workflow,
|
||||
prompt_text=expanded,
|
||||
ground_truth_filename=gt_filename,
|
||||
style_ref_filename=sr_filename
|
||||
)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f" ✗ CONNECTION ERROR: {e}")
|
||||
exit(1)
|
||||
except requests.exceptions.Timeout:
|
||||
print(f" ✗ TIMEOUT: Could not queue workflow")
|
||||
exit(1)
|
||||
except KeyError as e:
|
||||
print(f" ✗ WORKFLOW ERROR: Missing node ID {e} in workflow JSON")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
print(f" ✗ UNEXPECTED ERROR: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Step 5: Poll for completion ──
|
||||
print("\n── Step 5: Waiting for completion ──")
|
||||
try:
|
||||
history = wait_for_completion(
|
||||
prompt_id=prompt_id,
|
||||
timeout=300,
|
||||
poll_interval=2
|
||||
)
|
||||
except TimeoutError as e:
|
||||
print(f" ✗ TIMEOUT: {e}")
|
||||
exit(1)
|
||||
except RuntimeError as e:
|
||||
print(f" ✗ EXECUTION ERROR: {e}")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
print(f" ✗ UNEXPECTED ERROR: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Step 6: Download output ──
|
||||
print("\n── Step 6: Downloading output ──")
|
||||
try:
|
||||
output_path = download_output(
|
||||
history=history,
|
||||
output_dir=OUTPUT_DIR
|
||||
)
|
||||
except RuntimeError as e:
|
||||
print(f" ✗ ERROR: {e}")
|
||||
exit(1)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f" ✗ DOWNLOAD ERROR: {e}")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
print(f" ✗ UNEXPECTED ERROR: {e}")
|
||||
exit(1)
|
||||
|
||||
# ── Success ──
|
||||
print("\n" + "=" * 72)
|
||||
print(f" ✅ SUCCESS — Poster saved to:")
|
||||
print(f" {output_path}")
|
||||
print("=" * 72)
|
||||
|
After Width: | Height: | Size: 107 KiB |
|
After Width: | Height: | Size: 76 KiB |
|
After Width: | Height: | Size: 67 KiB |
|
After Width: | Height: | Size: 70 KiB |
|
After Width: | Height: | Size: 88 KiB |
|
After Width: | Height: | Size: 58 KiB |
|
After Width: | Height: | Size: 48 KiB |
|
After Width: | Height: | Size: 88 KiB |
|
After Width: | Height: | Size: 3.0 MiB |
|
After Width: | Height: | Size: 2.9 MiB |
|
After Width: | Height: | Size: 3.4 MiB |
|
After Width: | Height: | Size: 2.5 MiB |
|
After Width: | Height: | Size: 243 KiB |
|
After Width: | Height: | Size: 273 KiB |
|
After Width: | Height: | Size: 234 KiB |
|
After Width: | Height: | Size: 194 KiB |
|
After Width: | Height: | Size: 386 KiB |
|
After Width: | Height: | Size: 320 KiB |
|
After Width: | Height: | Size: 3.6 MiB |
|
After Width: | Height: | Size: 271 KiB |
|
After Width: | Height: | Size: 173 KiB |
|
After Width: | Height: | Size: 214 KiB |
|
After Width: | Height: | Size: 282 KiB |
|
After Width: | Height: | Size: 209 KiB |
|
After Width: | Height: | Size: 274 KiB |
|
After Width: | Height: | Size: 280 KiB |
|
After Width: | Height: | Size: 101 KiB |
|
After Width: | Height: | Size: 157 KiB |
|
After Width: | Height: | Size: 280 KiB |
|
After Width: | Height: | Size: 345 KiB |
|
After Width: | Height: | Size: 210 KiB |
|
After Width: | Height: | Size: 3.0 MiB |
|
After Width: | Height: | Size: 174 KiB |
|
After Width: | Height: | Size: 213 KiB |
|
After Width: | Height: | Size: 264 KiB |
|
After Width: | Height: | Size: 267 KiB |
|
After Width: | Height: | Size: 243 KiB |
|
After Width: | Height: | Size: 273 KiB |
|
After Width: | Height: | Size: 234 KiB |
|
After Width: | Height: | Size: 194 KiB |
|
After Width: | Height: | Size: 386 KiB |
|
After Width: | Height: | Size: 320 KiB |
|
After Width: | Height: | Size: 20 KiB |
|
After Width: | Height: | Size: 271 KiB |
|
After Width: | Height: | Size: 173 KiB |
|
After Width: | Height: | Size: 214 KiB |
|
After Width: | Height: | Size: 282 KiB |
|
After Width: | Height: | Size: 209 KiB |
|
After Width: | Height: | Size: 274 KiB |
|
After Width: | Height: | Size: 280 KiB |
@@ -0,0 +1,155 @@
|
||||
{
|
||||
"1": {
|
||||
"class_type": "LoadImage",
|
||||
"_meta": {"title": "Ground Truth (Architectural/Floorplan)"},
|
||||
"inputs": {
|
||||
"image": "ground_truth_input.png",
|
||||
"upload": "image"
|
||||
}
|
||||
},
|
||||
"2": {
|
||||
"class_type": "LoadImage",
|
||||
"_meta": {"title": "Style Reference (Google/Pinterest)"},
|
||||
"inputs": {
|
||||
"image": "style_reference_input.png",
|
||||
"upload": "image"
|
||||
}
|
||||
},
|
||||
"3": {
|
||||
"class_type": "DiffusersLoader",
|
||||
"_meta": {"title": "Qwen-Image-2512 Model Loader"},
|
||||
"inputs": {
|
||||
"model_path": "/home/ubuntu/models/Qwen-Image-2512"
|
||||
}
|
||||
},
|
||||
"4": {
|
||||
"class_type": "ImageScale",
|
||||
"_meta": {"title": "Scale Ground Truth"},
|
||||
"inputs": {
|
||||
"image": ["1", 0],
|
||||
"upscale_method": "lanczos",
|
||||
"width": 1104,
|
||||
"height": 1472,
|
||||
"crop": "center"
|
||||
}
|
||||
},
|
||||
"5": {
|
||||
"class_type": "CannyEdgePreprocessor",
|
||||
"_meta": {"title": "Canny Edge (Spatial Geometry)"},
|
||||
"inputs": {
|
||||
"image": ["4", 0],
|
||||
"low_threshold": 80,
|
||||
"high_threshold": 160,
|
||||
"resolution": 1024
|
||||
}
|
||||
},
|
||||
"6": {
|
||||
"class_type": "ControlNetLoader",
|
||||
"_meta": {"title": "ControlNet Canny Loader"},
|
||||
"inputs": {
|
||||
"control_net_name": "control_v11p_sd15_canny.pth"
|
||||
}
|
||||
},
|
||||
"7": {
|
||||
"class_type": "CLIPVisionLoader",
|
||||
"_meta": {"title": "CLIP Vision for IP-Adapter"},
|
||||
"inputs": {
|
||||
"clip_name": "CLIP-ViT-H-14-laion2B-s32B-b79K.safetensors"
|
||||
}
|
||||
},
|
||||
"8": {
|
||||
"class_type": "IPAdapterModelLoader",
|
||||
"_meta": {"title": "IP-Adapter Model"},
|
||||
"inputs": {
|
||||
"ipadapter_file": "ip-adapter_sd15.bin"
|
||||
}
|
||||
},
|
||||
"9": {
|
||||
"class_type": "CLIPTextEncode",
|
||||
"_meta": {"title": "Positive Prompt (Typography + Aesthetic)"},
|
||||
"inputs": {
|
||||
"text": "A highly realistic, cinematic real estate marketing poster. Interior style: modern luxury, warm ambient lighting, premium materials. The image must prominently feature the exact text 'YOUR DREAM HOME AWAITS' written in elegant, modern, highly legible typography positioned at the lower third of the image, clean sans-serif font, crisp rendering, high contrast text. Professional cinematic lighting, 8k resolution, photorealistic quality, detailed textures, architectural photography, ultra-sharp focus, golden hour warmth, premium real estate aesthetic.",
|
||||
"clip": ["3", 1]
|
||||
}
|
||||
},
|
||||
"10": {
|
||||
"class_type": "CLIPTextEncode",
|
||||
"_meta": {"title": "Negative Prompt"},
|
||||
"inputs": {
|
||||
"text": "deformed, blurry, bad anatomy, watermark, logo, extra text, distorted text, blurry text, misaligned text, low quality, worst quality, illustration, 3d render, painting, cartoon, sketch, artifacts, noise, oversaturated, unrealistic lighting, structural changes, extra windows, extra doors, warped perspective, low resolution, pixelated",
|
||||
"clip": ["3", 1]
|
||||
}
|
||||
},
|
||||
"11": {
|
||||
"class_type": "EmptyLatentImage",
|
||||
"_meta": {"title": "Poster Canvas (3:4 Portrait)"},
|
||||
"inputs": {
|
||||
"width": 1104,
|
||||
"height": 1472,
|
||||
"batch_size": 1
|
||||
}
|
||||
},
|
||||
"12": {
|
||||
"class_type": "IPAdapterAdvanced",
|
||||
"_meta": {"title": "Style Transfer from Reference"},
|
||||
"inputs": {
|
||||
"model": ["3", 0],
|
||||
"ipadapter": ["8", 0],
|
||||
"image": ["2", 0],
|
||||
"clip_vision": ["7", 0],
|
||||
"weight": 0.55,
|
||||
"weight_type": "linear",
|
||||
"combine_embeds": "concat",
|
||||
"start_at": 0.0,
|
||||
"end_at": 0.5,
|
||||
"embeds_scaling": "V only",
|
||||
"noise": 0.0
|
||||
}
|
||||
},
|
||||
"13": {
|
||||
"class_type": "ControlNetApplyAdvanced",
|
||||
"_meta": {"title": "ControlNet Apply (Canny Geometry)"},
|
||||
"inputs": {
|
||||
"positive": ["9", 0],
|
||||
"negative": ["10", 0],
|
||||
"control_net": ["6", 0],
|
||||
"image": ["5", 0],
|
||||
"strength": 0.75,
|
||||
"start_percent": 0.0,
|
||||
"end_percent": 0.65
|
||||
}
|
||||
},
|
||||
"14": {
|
||||
"class_type": "KSampler",
|
||||
"_meta": {"title": "Qwen Sampler (true_cfg compatible)"},
|
||||
"inputs": {
|
||||
"model": ["12", 0],
|
||||
"positive": ["13", 0],
|
||||
"negative": ["13", 1],
|
||||
"latent_image": ["11", 0],
|
||||
"seed": 42,
|
||||
"control_after_generate": "randomize",
|
||||
"steps": 50,
|
||||
"cfg": 4.0,
|
||||
"sampler_name": "euler",
|
||||
"scheduler": "normal",
|
||||
"denoise": 1.0
|
||||
}
|
||||
},
|
||||
"15": {
|
||||
"class_type": "VAEDecode",
|
||||
"_meta": {"title": "Decode Latent to Image"},
|
||||
"inputs": {
|
||||
"samples": ["14", 0],
|
||||
"vae": ["3", 2]
|
||||
}
|
||||
},
|
||||
"16": {
|
||||
"class_type": "SaveImage",
|
||||
"_meta": {"title": "Save Catalyst Poster Output"},
|
||||
"inputs": {
|
||||
"images": ["15", 0],
|
||||
"filename_prefix": "catalyst_poster_qwen"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||