Files
Project_Velocity/comfy_engine/scripts/test_catalyst_batch.py
sayan 8e1ffe0e43 feat: Added the ComfyUI engine (#12)
#11 Added the complete ComfyUI engine.

Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local>
Reviewed-on: #12
2026-03-27 22:48:34 +05:30

507 lines
20 KiB
Python

#!/usr/bin/env python3
"""
test_catalyst_batch.py — 5-Prompt Batch Test for Catalyst Poster Generation
============================================================================
Sends 5 distinct social media marketing poster generation requests to the
ComfyUI server running Qwen-Image-2512. Each test uses:
- A different Ground Truth image (room photo from the property)
- A Style Reference image (professional real estate marketing poster)
- A unique "Prompt Keyword" set that an end-user would type
The script demonstrates the full end-user flow:
User enters: Keywords + Ground Truth Image + Style Reference → Gets Poster
Test Matrix:
1. "luxury modern kitchen" → Kitchen photo + Orange card reference
2. "cozy master bedroom" → Bedroom photo + SOLD poster reference
3. "elegant living space" → Balcony bedroom + Magazine editorial ref
4. "premium apartment lifestyle" → Room with AC + Bellagio luxury ad ref
5. "smart home investment" → Corridor/room + Social media grid ref
Environment: ComfyUI + Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4)
"""
import os
import sys
import json
import re
import copy
import time
from pathlib import Path
from typing import Tuple, Optional, Dict, List
import requests
from PIL import Image
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CONFIGURATION
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
COMFYUI_SERVER_URL: str = "http://54.91.19.60:8118"
"""
ComfyUI server URL. Options:
- Direct (if port open via SG): http://54.91.19.60:8118
- SSH tunnel: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443
then use: http://127.0.0.1:8118
"""
BASE_DIR = Path(r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine")
INPUT_DIR = BASE_DIR / "test_inputs" / "Sagnik Test Sample New"
REF_DIR = INPUT_DIR / "Sample Reference"
OUTPUT_DIR = BASE_DIR / "test_outputs" / "catalyst_batch_results"
WORKFLOW_PATH = BASE_DIR / "workflows" / "catalyst_poster_qwen.json"
# Node IDs matching catalyst_poster_qwen.json
NODE_GROUND_TRUTH = "1"
NODE_STYLE_REF = "2"
NODE_POS_PROMPT = "9"
NODE_NEG_PROMPT = "10"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 5 TEST CASES — Each simulates what an end-user would enter
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
TEST_CASES: List[Dict] = [
{
"name": "Test 1: Luxury Modern Kitchen",
"ground_truth": "IMG_20210330_154502.jpg", # Kitchen with wooden cabinets
"style_ref": "6301510fa187aca16f680b2a525ed6de.jpg", # Orange card-style poster
"user_keywords": "luxury modern kitchen",
"marketing_copy": "Cook Your Dreams to Life",
"description": "A modular kitchen showcase — warm tones, premium finishes."
},
{
"name": "Test 2: Cozy Master Bedroom",
"ground_truth": "IMG_20210330_154512.jpg", # Bedroom with accent wall
"style_ref": "79c9a52c9af0c1d94df025dd1505db83.jpg", # Bold SOLD poster
"user_keywords": "cozy master bedroom",
"marketing_copy": "Where Comfort Meets Elegance",
"description": "Master bedroom with designer wallpaper — aspirational lifestyle."
},
{
"name": "Test 3: Elegant Living Space",
"ground_truth": "IMG_20210330_160420.jpg", # Balcony view bedroom
"style_ref": "7bb67cbc287300b78b4e8da3da7de242.jpg", # Magazine editorial
"user_keywords": "elegant living space",
"marketing_copy": "A New Beginning Starts Here",
"description": "Bedroom with balcony view — editorial magazine style."
},
{
"name": "Test 4: Premium Apartment Lifestyle",
"ground_truth": "IMG_20210330_154534.jpg", # Another room view
"style_ref": "ee9d7efdf9303342480d5cb57cec8400.jpg", # Bellagio luxury ad
"user_keywords": "premium apartment lifestyle",
"marketing_copy": "Live Above the Ordinary",
"description": "Premium apartment showcase — Dubai-style luxury marketing."
},
{
"name": "Test 5: Smart Home Investment",
"ground_truth": "IMG_20210330_160212.jpg", # Compact room
"style_ref": "fd0586727e1b43e9c346a6f851fb50f9.jpg", # Social media grid
"user_keywords": "smart home investment",
"marketing_copy": "Your Dream Home Is Waiting",
"description": "Investment-focused social media post — modern minimalist grid."
}
]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# CORE FUNCTIONS
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def process_prompt(user_keywords: str, marketing_copy: str) -> Tuple[str, str]:
"""Parse user input into aesthetic keywords and marketing copy.
In the production Catalyst UI, the user types keywords and a marketing
headline separately. This function validates and returns them.
Args:
user_keywords: Style/aesthetic descriptors (e.g., 'luxury modern kitchen').
marketing_copy: Headline text to render on the poster.
Returns:
Validated (aesthetic_keywords, marketing_copy) tuple.
Raises:
ValueError: If either input is empty.
"""
if not user_keywords.strip():
raise ValueError("Keyword prompt cannot be empty")
if not marketing_copy.strip():
raise ValueError("Marketing copy cannot be empty")
return user_keywords.strip(), marketing_copy.strip()
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
"""Expand user keywords into a full Qwen-Image-2512-optimized prompt.
Takes simple user keywords and transforms them into a richly detailed
prompt that leverages Qwen-Image-2512's strengths: precise typography
rendering, cinematic lighting, and photorealistic quality.
The expanded prompt follows this structure:
1. Scene type declaration (marketing poster)
2. Aesthetic keyword injection (user's style preferences)
3. Typography instruction (exact text + font style)
4. Technical quality boosters (8k, photorealistic, etc.)
Args:
aesthetic_keywords: User's style keywords (e.g., 'luxury modern kitchen').
marketing_copy: Exact text to appear in the poster.
Returns:
A fully expanded prompt string ready for CLIPTextEncode.
Example:
>>> expand_prompt('luxury modern kitchen', 'Cook Your Dreams')
'A stunning, high-end real estate social media marketing poster...'
"""
return (
f"A stunning, high-end real estate social media marketing poster. "
f"Style: {aesthetic_keywords}, warm ambient lighting, premium materials, "
f"cinematic composition, professional interior photography. "
f"The poster must prominently display the exact text "
f"'{marketing_copy}' rendered in elegant, bold, modern sans-serif "
f"typography with high contrast against the background, crisp edges, "
f"perfectly aligned, highly legible. "
f"8k resolution, photorealistic quality, detailed textures, "
f"architectural magazine aesthetic, ultra-sharp focus, "
f"golden hour warmth, depth of field bokeh, premium brand feel, "
f"social media optimized layout, clean negative space for text."
)
def upload_image(image_path: Path) -> str:
"""Upload an image to the ComfyUI server's input directory.
Opens the image, ensures RGB mode, and uploads via /upload/image.
Args:
image_path: Path to the image file.
Returns:
The server-assigned filename for use in workflow JSON.
Raises:
FileNotFoundError: If the image doesn't exist.
ConnectionError: If ComfyUI server is unreachable.
"""
if not image_path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
img = Image.open(image_path)
if img.mode != "RGB":
img = img.convert("RGB")
import tempfile
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
tmp_path = tmp.name
img.save(tmp_path, format="PNG")
try:
with open(tmp_path, "rb") as f:
response = requests.post(
f"{COMFYUI_SERVER_URL}/upload/image",
files={"image": (image_path.name, f, "image/png")},
data={"overwrite": "true"},
timeout=60
)
response.raise_for_status()
result = response.json()
server_name = result.get("name", "")
if not server_name:
raise RuntimeError(f"Upload failed: {result}")
return server_name
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
def execute_workflow(
workflow: dict,
prompt_text: str,
gt_filename: str,
sr_filename: str
) -> str:
"""Inject dynamic values and queue workflow on ComfyUI.
Updates LoadImage nodes with uploaded filenames and CLIPTextEncode
with the expanded prompt, then sends to /prompt endpoint.
Args:
workflow: The loaded workflow JSON dict.
prompt_text: Expanded prompt from expand_prompt().
gt_filename: Server filename of ground truth image.
sr_filename: Server filename of style reference image.
Returns:
The prompt_id from the queue response.
"""
wf = copy.deepcopy(workflow)
wf[NODE_GROUND_TRUTH]["inputs"]["image"] = gt_filename
wf[NODE_STYLE_REF]["inputs"]["image"] = sr_filename
wf[NODE_POS_PROMPT]["inputs"]["text"] = prompt_text
payload = {
"prompt": wf,
"client_id": f"catalyst_batch_{int(time.time())}"
}
response = requests.post(
f"{COMFYUI_SERVER_URL}/prompt",
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
prompt_id = result.get("prompt_id", "")
if not prompt_id:
raise RuntimeError(f"Queue failed: {result}")
return prompt_id
def wait_for_completion(prompt_id: str, timeout: int = 600, poll_interval: int = 3) -> dict:
"""Poll /history/{prompt_id} until workflow completes.
Qwen-Image-2512 with 50 steps on L4 GPUs may take 60-180 seconds.
Args:
prompt_id: The queued prompt ID.
timeout: Max wait time in seconds.
poll_interval: Seconds between polls.
Returns:
The history dict containing output image metadata.
"""
start = time.time()
polls = 0
while time.time() - start < timeout:
time.sleep(poll_interval)
polls += 1
try:
r = requests.get(
f"{COMFYUI_SERVER_URL}/history/{prompt_id}",
timeout=10
)
if r.status_code == 200:
history = r.json()
prompt_data = history.get(prompt_id, {})
# Check for error
status = prompt_data.get("status", {})
if status.get("status_str") == "error":
msgs = status.get("messages", ["Unknown"])
raise RuntimeError(f"Workflow error: {msgs}")
# Check for outputs
for node_id, node_out in prompt_data.get("outputs", {}).items():
if "images" in node_out and node_out["images"]:
elapsed = time.time() - start
print(f" ✓ Done in {elapsed:.1f}s ({polls} polls)")
return prompt_data
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
if polls % 10 == 0:
print(f" ⏳ Still waiting... ({polls} polls)")
raise TimeoutError(f"Timed out after {timeout}s (prompt: {prompt_id})")
def download_output(history: dict, output_dir: Path, test_name: str) -> str:
"""Download the generated poster from ComfyUI.
Args:
history: The prompt history dict.
output_dir: Local directory to save to.
test_name: Name for the output file.
Returns:
Path to the saved image.
"""
for node_id, node_out in history.get("outputs", {}).items():
images = node_out.get("images", [])
if images:
img_info = images[0]
break
else:
raise RuntimeError("No output images in history")
view_url = (
f"{COMFYUI_SERVER_URL}/view"
f"?filename={img_info['filename']}"
f"&subfolder={img_info.get('subfolder', '')}"
f"&type={img_info.get('type', 'output')}"
)
r = requests.get(view_url, stream=True, timeout=60)
r.raise_for_status()
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', test_name).lower()
timestamp = time.strftime("%Y%m%d_%H%M%S")
out_path = output_dir / f"{safe_name}_{timestamp}.png"
with open(out_path, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
size_mb = out_path.stat().st_size / (1024 * 1024)
print(f" 💾 Saved: {out_path.name} ({size_mb:.1f} MB)")
return str(out_path)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# MAIN EXECUTION
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if __name__ == "__main__":
print("=" * 72)
print(" CATALYST BATCH TEST — 5 Social Media Poster Prompts")
print(" Model: Qwen-Image-2512 | Server: " + COMFYUI_SERVER_URL)
print("=" * 72)
# ── Setup ──
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"\n📂 Output: {OUTPUT_DIR}")
print(f"📄 Workflow: {WORKFLOW_PATH}")
print(f"🖼️ Inputs: {INPUT_DIR}")
print(f"🎨 Refs: {REF_DIR}")
# ── Verify server connectivity ──
print(f"\n🔌 Testing connection to {COMFYUI_SERVER_URL} ...")
try:
r = requests.get(f"{COMFYUI_SERVER_URL}/system_stats", timeout=10)
r.raise_for_status()
stats = r.json()
gpu_info = stats.get("devices", [])
print(f" ✓ Connected! GPUs: {len(gpu_info)}")
for gpu in gpu_info:
name = gpu.get("name", "unknown")
vram_total = gpu.get("vram_total", 0) / (1024**3)
vram_free = gpu.get("vram_free", 0) / (1024**3)
print(f"{name}: {vram_free:.1f}/{vram_total:.1f} GB free")
except requests.exceptions.ConnectionError:
print(f" ✗ FAILED: Cannot reach {COMFYUI_SERVER_URL}")
print(f" Ensure ComfyUI is running and the port is accessible.")
print(f" Try: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443")
sys.exit(1)
except Exception as e:
print(f" ⚠ Warning: {e}")
# ── Load workflow ──
try:
with open(WORKFLOW_PATH, "r", encoding="utf-8") as f:
workflow = json.load(f)
print(f"\n📋 Workflow loaded ({len(workflow)} nodes)")
except Exception as e:
print(f"\n✗ Failed to load workflow: {e}")
sys.exit(1)
# ── Run 5 tests ──
results = []
total_start = time.time()
for i, test in enumerate(TEST_CASES, 1):
print(f"\n{'' * 72}")
print(f" TEST {i}/5: {test['name']}")
print(f" {test['description']}")
print(f"{'' * 72}")
gt_path = INPUT_DIR / test["ground_truth"]
sr_path = REF_DIR / test["style_ref"]
print(f" 📸 Ground Truth: {test['ground_truth']}")
print(f" 🎨 Style Ref: {test['style_ref']}")
print(f" 🏷️ Keywords: {test['user_keywords']}")
print(f" ✍️ Copy: \"{test['marketing_copy']}\"")
try:
# Step 1: Parse
keywords, copy_text = process_prompt(
test["user_keywords"],
test["marketing_copy"]
)
# Step 2: Expand
expanded = expand_prompt(keywords, copy_text)
print(f"\n 📝 Expanded prompt ({len(expanded)} chars):")
print(f" {expanded[:100]}...")
# Step 3: Upload
print(f"\n ⬆️ Uploading images...")
gt_name = upload_image(gt_path)
print(f" GT → {gt_name}")
sr_name = upload_image(sr_path)
print(f" SR → {sr_name}")
# Step 4: Execute
print(f"\n 🚀 Queuing workflow...")
prompt_id = execute_workflow(workflow, expanded, gt_name, sr_name)
print(f" prompt_id: {prompt_id}")
# Step 5: Wait
print(f"\n ⏳ Waiting for generation...")
history = wait_for_completion(prompt_id, timeout=600)
# Step 6: Download
print(f"\n ⬇️ Downloading result...")
out_path = download_output(history, OUTPUT_DIR, test["name"])
results.append({
"test": test["name"],
"status": "✅ SUCCESS",
"output": out_path,
"prompt_id": prompt_id
})
except FileNotFoundError as e:
print(f"\n ✗ FILE NOT FOUND: {e}")
results.append({"test": test["name"], "status": "❌ FILE NOT FOUND", "error": str(e)})
except requests.exceptions.ConnectionError as e:
print(f"\n ✗ CONNECTION ERROR: {e}")
results.append({"test": test["name"], "status": "❌ CONNECTION ERROR", "error": str(e)})
except TimeoutError as e:
print(f"\n ✗ TIMEOUT: {e}")
results.append({"test": test["name"], "status": "❌ TIMEOUT", "error": str(e)})
except RuntimeError as e:
print(f"\n ✗ RUNTIME ERROR: {e}")
results.append({"test": test["name"], "status": "❌ RUNTIME ERROR", "error": str(e)})
except Exception as e:
print(f"\n ✗ UNEXPECTED ERROR: {type(e).__name__}: {e}")
results.append({"test": test["name"], "status": "❌ ERROR", "error": str(e)})
# ── Summary ──
total_time = time.time() - total_start
print(f"\n\n{'=' * 72}")
print(f" BATCH TEST SUMMARY")
print(f" Total time: {total_time:.1f}s | Tests: {len(TEST_CASES)}")
print(f"{'=' * 72}")
successes = 0
for r in results:
print(f" {r['status']} {r['test']}")
if "output" in r:
print(f"{r['output']}")
successes += 1
elif "error" in r:
print(f"{r['error'][:80]}")
print(f"\n Result: {successes}/{len(TEST_CASES)} passed")
print(f"{'=' * 72}")
# Save results to JSON
results_file = OUTPUT_DIR / "batch_results.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n 📊 Results saved: {results_file}")