forked from sagnik/Project_Velocity
#11 Added the complete ComfyUI engine. Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local> Reviewed-on: sagnik/Project_Velocity#12
507 lines
20 KiB
Python
507 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
test_catalyst_batch.py — 5-Prompt Batch Test for Catalyst Poster Generation
|
|
============================================================================
|
|
|
|
Sends 5 distinct social media marketing poster generation requests to the
|
|
ComfyUI server running Qwen-Image-2512. Each test uses:
|
|
- A different Ground Truth image (room photo from the property)
|
|
- A Style Reference image (professional real estate marketing poster)
|
|
- A unique "Prompt Keyword" set that an end-user would type
|
|
|
|
The script demonstrates the full end-user flow:
|
|
User enters: Keywords + Ground Truth Image + Style Reference → Gets Poster
|
|
|
|
Test Matrix:
|
|
1. "luxury modern kitchen" → Kitchen photo + Orange card reference
|
|
2. "cozy master bedroom" → Bedroom photo + SOLD poster reference
|
|
3. "elegant living space" → Balcony bedroom + Magazine editorial ref
|
|
4. "premium apartment lifestyle" → Room with AC + Bellagio luxury ad ref
|
|
5. "smart home investment" → Corridor/room + Social media grid ref
|
|
|
|
Environment: ComfyUI + Qwen-Image-2512 on AWS EC2 (4x NVIDIA L4)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import copy
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional, Dict, List
|
|
|
|
import requests
|
|
from PIL import Image
|
|
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
# CONFIGURATION
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
COMFYUI_SERVER_URL: str = "http://54.91.19.60:8118"
|
|
"""
|
|
ComfyUI server URL. Options:
|
|
- Direct (if port open via SG): http://54.91.19.60:8118
|
|
- SSH tunnel: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443
|
|
then use: http://127.0.0.1:8118
|
|
"""
|
|
|
|
BASE_DIR = Path(r"F:\Workin In Progress\DESINEURON\GITLAB\Project_Velocity\comfy_engine")
|
|
INPUT_DIR = BASE_DIR / "test_inputs" / "Sagnik Test Sample New"
|
|
REF_DIR = INPUT_DIR / "Sample Reference"
|
|
OUTPUT_DIR = BASE_DIR / "test_outputs" / "catalyst_batch_results"
|
|
WORKFLOW_PATH = BASE_DIR / "workflows" / "catalyst_poster_qwen.json"
|
|
|
|
# Node IDs matching catalyst_poster_qwen.json
|
|
NODE_GROUND_TRUTH = "1"
|
|
NODE_STYLE_REF = "2"
|
|
NODE_POS_PROMPT = "9"
|
|
NODE_NEG_PROMPT = "10"
|
|
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
# 5 TEST CASES — Each simulates what an end-user would enter
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
TEST_CASES: List[Dict] = [
|
|
{
|
|
"name": "Test 1: Luxury Modern Kitchen",
|
|
"ground_truth": "IMG_20210330_154502.jpg", # Kitchen with wooden cabinets
|
|
"style_ref": "6301510fa187aca16f680b2a525ed6de.jpg", # Orange card-style poster
|
|
"user_keywords": "luxury modern kitchen",
|
|
"marketing_copy": "Cook Your Dreams to Life",
|
|
"description": "A modular kitchen showcase — warm tones, premium finishes."
|
|
},
|
|
{
|
|
"name": "Test 2: Cozy Master Bedroom",
|
|
"ground_truth": "IMG_20210330_154512.jpg", # Bedroom with accent wall
|
|
"style_ref": "79c9a52c9af0c1d94df025dd1505db83.jpg", # Bold SOLD poster
|
|
"user_keywords": "cozy master bedroom",
|
|
"marketing_copy": "Where Comfort Meets Elegance",
|
|
"description": "Master bedroom with designer wallpaper — aspirational lifestyle."
|
|
},
|
|
{
|
|
"name": "Test 3: Elegant Living Space",
|
|
"ground_truth": "IMG_20210330_160420.jpg", # Balcony view bedroom
|
|
"style_ref": "7bb67cbc287300b78b4e8da3da7de242.jpg", # Magazine editorial
|
|
"user_keywords": "elegant living space",
|
|
"marketing_copy": "A New Beginning Starts Here",
|
|
"description": "Bedroom with balcony view — editorial magazine style."
|
|
},
|
|
{
|
|
"name": "Test 4: Premium Apartment Lifestyle",
|
|
"ground_truth": "IMG_20210330_154534.jpg", # Another room view
|
|
"style_ref": "ee9d7efdf9303342480d5cb57cec8400.jpg", # Bellagio luxury ad
|
|
"user_keywords": "premium apartment lifestyle",
|
|
"marketing_copy": "Live Above the Ordinary",
|
|
"description": "Premium apartment showcase — Dubai-style luxury marketing."
|
|
},
|
|
{
|
|
"name": "Test 5: Smart Home Investment",
|
|
"ground_truth": "IMG_20210330_160212.jpg", # Compact room
|
|
"style_ref": "fd0586727e1b43e9c346a6f851fb50f9.jpg", # Social media grid
|
|
"user_keywords": "smart home investment",
|
|
"marketing_copy": "Your Dream Home Is Waiting",
|
|
"description": "Investment-focused social media post — modern minimalist grid."
|
|
}
|
|
]
|
|
|
|
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
# CORE FUNCTIONS
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
def process_prompt(user_keywords: str, marketing_copy: str) -> Tuple[str, str]:
|
|
"""Parse user input into aesthetic keywords and marketing copy.
|
|
|
|
In the production Catalyst UI, the user types keywords and a marketing
|
|
headline separately. This function validates and returns them.
|
|
|
|
Args:
|
|
user_keywords: Style/aesthetic descriptors (e.g., 'luxury modern kitchen').
|
|
marketing_copy: Headline text to render on the poster.
|
|
|
|
Returns:
|
|
Validated (aesthetic_keywords, marketing_copy) tuple.
|
|
|
|
Raises:
|
|
ValueError: If either input is empty.
|
|
"""
|
|
if not user_keywords.strip():
|
|
raise ValueError("Keyword prompt cannot be empty")
|
|
if not marketing_copy.strip():
|
|
raise ValueError("Marketing copy cannot be empty")
|
|
|
|
return user_keywords.strip(), marketing_copy.strip()
|
|
|
|
|
|
def expand_prompt(aesthetic_keywords: str, marketing_copy: str) -> str:
|
|
"""Expand user keywords into a full Qwen-Image-2512-optimized prompt.
|
|
|
|
Takes simple user keywords and transforms them into a richly detailed
|
|
prompt that leverages Qwen-Image-2512's strengths: precise typography
|
|
rendering, cinematic lighting, and photorealistic quality.
|
|
|
|
The expanded prompt follows this structure:
|
|
1. Scene type declaration (marketing poster)
|
|
2. Aesthetic keyword injection (user's style preferences)
|
|
3. Typography instruction (exact text + font style)
|
|
4. Technical quality boosters (8k, photorealistic, etc.)
|
|
|
|
Args:
|
|
aesthetic_keywords: User's style keywords (e.g., 'luxury modern kitchen').
|
|
marketing_copy: Exact text to appear in the poster.
|
|
|
|
Returns:
|
|
A fully expanded prompt string ready for CLIPTextEncode.
|
|
|
|
Example:
|
|
>>> expand_prompt('luxury modern kitchen', 'Cook Your Dreams')
|
|
'A stunning, high-end real estate social media marketing poster...'
|
|
"""
|
|
return (
|
|
f"A stunning, high-end real estate social media marketing poster. "
|
|
f"Style: {aesthetic_keywords}, warm ambient lighting, premium materials, "
|
|
f"cinematic composition, professional interior photography. "
|
|
f"The poster must prominently display the exact text "
|
|
f"'{marketing_copy}' rendered in elegant, bold, modern sans-serif "
|
|
f"typography with high contrast against the background, crisp edges, "
|
|
f"perfectly aligned, highly legible. "
|
|
f"8k resolution, photorealistic quality, detailed textures, "
|
|
f"architectural magazine aesthetic, ultra-sharp focus, "
|
|
f"golden hour warmth, depth of field bokeh, premium brand feel, "
|
|
f"social media optimized layout, clean negative space for text."
|
|
)
|
|
|
|
|
|
def upload_image(image_path: Path) -> str:
|
|
"""Upload an image to the ComfyUI server's input directory.
|
|
|
|
Opens the image, ensures RGB mode, and uploads via /upload/image.
|
|
|
|
Args:
|
|
image_path: Path to the image file.
|
|
|
|
Returns:
|
|
The server-assigned filename for use in workflow JSON.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the image doesn't exist.
|
|
ConnectionError: If ComfyUI server is unreachable.
|
|
"""
|
|
if not image_path.exists():
|
|
raise FileNotFoundError(f"Image not found: {image_path}")
|
|
|
|
img = Image.open(image_path)
|
|
if img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
|
|
tmp_path = tmp.name
|
|
img.save(tmp_path, format="PNG")
|
|
|
|
try:
|
|
with open(tmp_path, "rb") as f:
|
|
response = requests.post(
|
|
f"{COMFYUI_SERVER_URL}/upload/image",
|
|
files={"image": (image_path.name, f, "image/png")},
|
|
data={"overwrite": "true"},
|
|
timeout=60
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
server_name = result.get("name", "")
|
|
if not server_name:
|
|
raise RuntimeError(f"Upload failed: {result}")
|
|
|
|
return server_name
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def execute_workflow(
|
|
workflow: dict,
|
|
prompt_text: str,
|
|
gt_filename: str,
|
|
sr_filename: str
|
|
) -> str:
|
|
"""Inject dynamic values and queue workflow on ComfyUI.
|
|
|
|
Updates LoadImage nodes with uploaded filenames and CLIPTextEncode
|
|
with the expanded prompt, then sends to /prompt endpoint.
|
|
|
|
Args:
|
|
workflow: The loaded workflow JSON dict.
|
|
prompt_text: Expanded prompt from expand_prompt().
|
|
gt_filename: Server filename of ground truth image.
|
|
sr_filename: Server filename of style reference image.
|
|
|
|
Returns:
|
|
The prompt_id from the queue response.
|
|
"""
|
|
wf = copy.deepcopy(workflow)
|
|
|
|
wf[NODE_GROUND_TRUTH]["inputs"]["image"] = gt_filename
|
|
wf[NODE_STYLE_REF]["inputs"]["image"] = sr_filename
|
|
wf[NODE_POS_PROMPT]["inputs"]["text"] = prompt_text
|
|
|
|
payload = {
|
|
"prompt": wf,
|
|
"client_id": f"catalyst_batch_{int(time.time())}"
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{COMFYUI_SERVER_URL}/prompt",
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
prompt_id = result.get("prompt_id", "")
|
|
if not prompt_id:
|
|
raise RuntimeError(f"Queue failed: {result}")
|
|
|
|
return prompt_id
|
|
|
|
|
|
def wait_for_completion(prompt_id: str, timeout: int = 600, poll_interval: int = 3) -> dict:
|
|
"""Poll /history/{prompt_id} until workflow completes.
|
|
|
|
Qwen-Image-2512 with 50 steps on L4 GPUs may take 60-180 seconds.
|
|
|
|
Args:
|
|
prompt_id: The queued prompt ID.
|
|
timeout: Max wait time in seconds.
|
|
poll_interval: Seconds between polls.
|
|
|
|
Returns:
|
|
The history dict containing output image metadata.
|
|
"""
|
|
start = time.time()
|
|
polls = 0
|
|
|
|
while time.time() - start < timeout:
|
|
time.sleep(poll_interval)
|
|
polls += 1
|
|
|
|
try:
|
|
r = requests.get(
|
|
f"{COMFYUI_SERVER_URL}/history/{prompt_id}",
|
|
timeout=10
|
|
)
|
|
if r.status_code == 200:
|
|
history = r.json()
|
|
prompt_data = history.get(prompt_id, {})
|
|
|
|
# Check for error
|
|
status = prompt_data.get("status", {})
|
|
if status.get("status_str") == "error":
|
|
msgs = status.get("messages", ["Unknown"])
|
|
raise RuntimeError(f"Workflow error: {msgs}")
|
|
|
|
# Check for outputs
|
|
for node_id, node_out in prompt_data.get("outputs", {}).items():
|
|
if "images" in node_out and node_out["images"]:
|
|
elapsed = time.time() - start
|
|
print(f" ✓ Done in {elapsed:.1f}s ({polls} polls)")
|
|
return prompt_data
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if polls % 10 == 0:
|
|
print(f" ⏳ Still waiting... ({polls} polls)")
|
|
|
|
raise TimeoutError(f"Timed out after {timeout}s (prompt: {prompt_id})")
|
|
|
|
|
|
def download_output(history: dict, output_dir: Path, test_name: str) -> str:
|
|
"""Download the generated poster from ComfyUI.
|
|
|
|
Args:
|
|
history: The prompt history dict.
|
|
output_dir: Local directory to save to.
|
|
test_name: Name for the output file.
|
|
|
|
Returns:
|
|
Path to the saved image.
|
|
"""
|
|
for node_id, node_out in history.get("outputs", {}).items():
|
|
images = node_out.get("images", [])
|
|
if images:
|
|
img_info = images[0]
|
|
break
|
|
else:
|
|
raise RuntimeError("No output images in history")
|
|
|
|
view_url = (
|
|
f"{COMFYUI_SERVER_URL}/view"
|
|
f"?filename={img_info['filename']}"
|
|
f"&subfolder={img_info.get('subfolder', '')}"
|
|
f"&type={img_info.get('type', 'output')}"
|
|
)
|
|
|
|
r = requests.get(view_url, stream=True, timeout=60)
|
|
r.raise_for_status()
|
|
|
|
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', test_name).lower()
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|
out_path = output_dir / f"{safe_name}_{timestamp}.png"
|
|
|
|
with open(out_path, "wb") as f:
|
|
for chunk in r.iter_content(8192):
|
|
f.write(chunk)
|
|
|
|
size_mb = out_path.stat().st_size / (1024 * 1024)
|
|
print(f" 💾 Saved: {out_path.name} ({size_mb:.1f} MB)")
|
|
return str(out_path)
|
|
|
|
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
# MAIN EXECUTION
|
|
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
if __name__ == "__main__":
|
|
print("=" * 72)
|
|
print(" CATALYST BATCH TEST — 5 Social Media Poster Prompts")
|
|
print(" Model: Qwen-Image-2512 | Server: " + COMFYUI_SERVER_URL)
|
|
print("=" * 72)
|
|
|
|
# ── Setup ──
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
print(f"\n📂 Output: {OUTPUT_DIR}")
|
|
print(f"📄 Workflow: {WORKFLOW_PATH}")
|
|
print(f"🖼️ Inputs: {INPUT_DIR}")
|
|
print(f"🎨 Refs: {REF_DIR}")
|
|
|
|
# ── Verify server connectivity ──
|
|
print(f"\n🔌 Testing connection to {COMFYUI_SERVER_URL} ...")
|
|
try:
|
|
r = requests.get(f"{COMFYUI_SERVER_URL}/system_stats", timeout=10)
|
|
r.raise_for_status()
|
|
stats = r.json()
|
|
gpu_info = stats.get("devices", [])
|
|
print(f" ✓ Connected! GPUs: {len(gpu_info)}")
|
|
for gpu in gpu_info:
|
|
name = gpu.get("name", "unknown")
|
|
vram_total = gpu.get("vram_total", 0) / (1024**3)
|
|
vram_free = gpu.get("vram_free", 0) / (1024**3)
|
|
print(f" • {name}: {vram_free:.1f}/{vram_total:.1f} GB free")
|
|
except requests.exceptions.ConnectionError:
|
|
print(f" ✗ FAILED: Cannot reach {COMFYUI_SERVER_URL}")
|
|
print(f" Ensure ComfyUI is running and the port is accessible.")
|
|
print(f" Try: ssh -L 8118:127.0.0.1:8118 ubuntu@54.91.19.60 -p 443")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f" ⚠ Warning: {e}")
|
|
|
|
# ── Load workflow ──
|
|
try:
|
|
with open(WORKFLOW_PATH, "r", encoding="utf-8") as f:
|
|
workflow = json.load(f)
|
|
print(f"\n📋 Workflow loaded ({len(workflow)} nodes)")
|
|
except Exception as e:
|
|
print(f"\n✗ Failed to load workflow: {e}")
|
|
sys.exit(1)
|
|
|
|
# ── Run 5 tests ──
|
|
results = []
|
|
total_start = time.time()
|
|
|
|
for i, test in enumerate(TEST_CASES, 1):
|
|
print(f"\n{'━' * 72}")
|
|
print(f" TEST {i}/5: {test['name']}")
|
|
print(f" {test['description']}")
|
|
print(f"{'━' * 72}")
|
|
|
|
gt_path = INPUT_DIR / test["ground_truth"]
|
|
sr_path = REF_DIR / test["style_ref"]
|
|
|
|
print(f" 📸 Ground Truth: {test['ground_truth']}")
|
|
print(f" 🎨 Style Ref: {test['style_ref']}")
|
|
print(f" 🏷️ Keywords: {test['user_keywords']}")
|
|
print(f" ✍️ Copy: \"{test['marketing_copy']}\"")
|
|
|
|
try:
|
|
# Step 1: Parse
|
|
keywords, copy_text = process_prompt(
|
|
test["user_keywords"],
|
|
test["marketing_copy"]
|
|
)
|
|
|
|
# Step 2: Expand
|
|
expanded = expand_prompt(keywords, copy_text)
|
|
print(f"\n 📝 Expanded prompt ({len(expanded)} chars):")
|
|
print(f" {expanded[:100]}...")
|
|
|
|
# Step 3: Upload
|
|
print(f"\n ⬆️ Uploading images...")
|
|
gt_name = upload_image(gt_path)
|
|
print(f" GT → {gt_name}")
|
|
sr_name = upload_image(sr_path)
|
|
print(f" SR → {sr_name}")
|
|
|
|
# Step 4: Execute
|
|
print(f"\n 🚀 Queuing workflow...")
|
|
prompt_id = execute_workflow(workflow, expanded, gt_name, sr_name)
|
|
print(f" prompt_id: {prompt_id}")
|
|
|
|
# Step 5: Wait
|
|
print(f"\n ⏳ Waiting for generation...")
|
|
history = wait_for_completion(prompt_id, timeout=600)
|
|
|
|
# Step 6: Download
|
|
print(f"\n ⬇️ Downloading result...")
|
|
out_path = download_output(history, OUTPUT_DIR, test["name"])
|
|
|
|
results.append({
|
|
"test": test["name"],
|
|
"status": "✅ SUCCESS",
|
|
"output": out_path,
|
|
"prompt_id": prompt_id
|
|
})
|
|
|
|
except FileNotFoundError as e:
|
|
print(f"\n ✗ FILE NOT FOUND: {e}")
|
|
results.append({"test": test["name"], "status": "❌ FILE NOT FOUND", "error": str(e)})
|
|
except requests.exceptions.ConnectionError as e:
|
|
print(f"\n ✗ CONNECTION ERROR: {e}")
|
|
results.append({"test": test["name"], "status": "❌ CONNECTION ERROR", "error": str(e)})
|
|
except TimeoutError as e:
|
|
print(f"\n ✗ TIMEOUT: {e}")
|
|
results.append({"test": test["name"], "status": "❌ TIMEOUT", "error": str(e)})
|
|
except RuntimeError as e:
|
|
print(f"\n ✗ RUNTIME ERROR: {e}")
|
|
results.append({"test": test["name"], "status": "❌ RUNTIME ERROR", "error": str(e)})
|
|
except Exception as e:
|
|
print(f"\n ✗ UNEXPECTED ERROR: {type(e).__name__}: {e}")
|
|
results.append({"test": test["name"], "status": "❌ ERROR", "error": str(e)})
|
|
|
|
# ── Summary ──
|
|
total_time = time.time() - total_start
|
|
print(f"\n\n{'=' * 72}")
|
|
print(f" BATCH TEST SUMMARY")
|
|
print(f" Total time: {total_time:.1f}s | Tests: {len(TEST_CASES)}")
|
|
print(f"{'=' * 72}")
|
|
|
|
successes = 0
|
|
for r in results:
|
|
print(f" {r['status']} {r['test']}")
|
|
if "output" in r:
|
|
print(f" → {r['output']}")
|
|
successes += 1
|
|
elif "error" in r:
|
|
print(f" → {r['error'][:80]}")
|
|
|
|
print(f"\n Result: {successes}/{len(TEST_CASES)} passed")
|
|
print(f"{'=' * 72}")
|
|
|
|
# Save results to JSON
|
|
results_file = OUTPUT_DIR / "batch_results.json"
|
|
with open(results_file, "w") as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
print(f"\n 📊 Results saved: {results_file}")
|