Files
Project_Velocity/backend/services/nemoclaw_client.py
2026-04-12 02:02:58 +05:30

414 lines
14 KiB
Python

"""
backend/services/nemoclaw_client.py - NemoClaw inference client.
Primary path:
1. NVIDIA-hosted OpenAI-compatible chat completions.
2. Optional compatible endpoint via NEMOCLAW_BASE_URL.
3. Optional local Ollama fallback only when ALLOW_LOCAL_FALLBACK=true.
"""
from __future__ import annotations
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from typing import Optional
import httpx
logger = logging.getLogger("velocity.nemoclaw")
NEMOCLAW_TIMEOUT = float(os.getenv("NEMOCLAW_TIMEOUT_S", "45.0"))
NEMOCLAW_TEMPERATURE = float(os.getenv("NEMOCLAW_TEMPERATURE", "0.2"))
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "")
NVIDIA_BASE_URL = os.getenv("NVIDIA_BASE_URL", "https://integrate.api.nvidia.com/v1")
NVIDIA_CHAT_URL = os.getenv("NVIDIA_CHAT_URL", f"{NVIDIA_BASE_URL}/chat/completions")
NVIDIA_MODEL = os.getenv("NVIDIA_MODEL", "nvidia/nemotron-3-super-120b-a12b")
NVIDIA_FALLBACK_MODEL = os.getenv(
"NVIDIA_FALLBACK_MODEL",
"nvidia/llama-3.3-nemotron-super-49b-v1",
)
NEMOCLAW_BASE_URL = os.getenv("NEMOCLAW_BASE_URL", "")
NEMOCLAW_CHAT_URL = (
os.getenv("NEMOCLAW_CHAT_URL") or f"{NEMOCLAW_BASE_URL}/v1/chat/completions"
if NEMOCLAW_BASE_URL
else ""
)
NEMOCLAW_MODEL = os.getenv("NEMOCLAW_MODEL", NVIDIA_MODEL)
NEMOCLAW_API_TOKEN = os.getenv("NEMOCLAW_API_TOKEN", "")
ALLOW_LOCAL_FALLBACK = os.getenv("ALLOW_LOCAL_FALLBACK", "false").lower() == "true"
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434")
OLLAMA_CHAT_URL = f"{OLLAMA_BASE_URL}/v1/chat/completions"
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:27b")
_PROMPT_DIR = os.getenv("NEMOCLAW_PROMPT_DIR", "/opt/dlami/nvme/nemoclaw/prompts")
def _load_system_prompt(name: str) -> str:
local_fallback = os.path.join(
os.path.dirname(__file__), "..", "nemoclaw_prompts", f"{name}.md"
)
for path in (os.path.join(_PROMPT_DIR, f"{name}.md"), local_fallback):
try:
with open(path, encoding="utf-8") as handle:
return "\n".join(
line for line in handle.read().splitlines() if not line.startswith("#")
).strip()
except FileNotFoundError:
continue
logger.warning("Prompt '%s' not found, using inline fallback.", name)
return _PROMPTS.get(name, "")
_PROMPTS = {
"qd_calculator": (
"You are a behavioral intelligence analyst for a luxury real estate sales platform.\n"
"Compute a Quantum Dynamics score between 1 and 100 using blend shapes, CRM context, "
"and the active scene label when present.\n"
'Respond with JSON only: {"qd_score": <int>, "reasoning": "<one sentence>", "confidence": <float>}'
),
"lead_tagger": (
"You are a lead intelligence analyst. Classify a real estate lead as HNI or NRI.\n"
'Respond with JSON only: {"tags_to_add": [...], "tags_to_remove": []}'
),
"cctv_profiler": (
"You are a visitor profiling analyst for a luxury real estate development CCTV system.\n"
'Respond with JSON only: {"wealth_indicator": "HNI"|"standard"|"unknown", '
'"vehicle_class": "luxury"|"standard"|"unknown", "tags_to_add": [...], "notes": "<string>"}'
),
}
@dataclass
class QDResult:
qd_score: int
reasoning: str
confidence: float
@dataclass
class TagResult:
tags_to_add: list[str] = field(default_factory=list)
tags_to_remove: list[str] = field(default_factory=list)
@dataclass
class CCTVProfileResult:
wealth_indicator: str
vehicle_class: str
tags_to_add: list[str] = field(default_factory=list)
notes: str = ""
async def _attempt_chat(
*,
label: str,
url: str,
model: str,
system_content: str,
user_content: str,
timeout: float,
headers: dict[str, str],
) -> dict:
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_content},
{"role": "user", "content": user_content},
],
"temperature": NEMOCLAW_TEMPERATURE,
"response_format": {"type": "json_object"},
"max_tokens": 1024,
}
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
body = response.json()
raw_content = body["choices"][0]["message"]["content"]
logger.debug("NemoClaw response via %s: %s", label, raw_content[:200])
return _parse_model_response(raw_content)
def _extract_text(raw_content: object) -> str:
if isinstance(raw_content, str):
return raw_content
if isinstance(raw_content, list):
parts: list[str] = []
for item in raw_content:
if isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "\n".join(parts).strip()
return str(raw_content)
def _parse_model_response(raw_content: object) -> dict:
text = _extract_text(raw_content).strip()
if not text:
return {}
try:
return json.loads(text)
except json.JSONDecodeError:
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
candidate = text[start : end + 1]
try:
return json.loads(candidate)
except json.JSONDecodeError:
pass
parsed: dict[str, object] = {}
int_match = re.search(r'"qd_score"\s*:\s*(\d+)', text)
if int_match:
parsed["qd_score"] = int(int_match.group(1))
conf_match = re.search(r'"confidence"\s*:\s*([0-9]*\.?[0-9]+)', text)
if conf_match:
parsed["confidence"] = float(conf_match.group(1))
reason_match = re.search(r'"reasoning"\s*:\s*"([^"]*)"', text)
if reason_match:
parsed["reasoning"] = reason_match.group(1)
wealth_match = re.search(r'"wealth_indicator"\s*:\s*"([^"]*)"', text)
if wealth_match:
parsed["wealth_indicator"] = wealth_match.group(1)
vehicle_match = re.search(r'"vehicle_class"\s*:\s*"([^"]*)"', text)
if vehicle_match:
parsed["vehicle_class"] = vehicle_match.group(1)
notes_match = re.search(r'"notes"\s*:\s*"([^"]*)"', text)
if notes_match:
parsed["notes"] = notes_match.group(1)
tags_match = re.search(r'"tags_to_add"\s*:\s*\[(.*?)\]', text, flags=re.S)
if tags_match:
parsed["tags_to_add"] = re.findall(r'"([^"]+)"', tags_match.group(1))
remove_tags_match = re.search(r'"tags_to_remove"\s*:\s*\[(.*?)\]', text, flags=re.S)
if remove_tags_match:
parsed["tags_to_remove"] = re.findall(r'"([^"]+)"', remove_tags_match.group(1))
if parsed:
logger.warning("Recovered partial NemoClaw JSON payload from malformed model output.")
return parsed
raise json.JSONDecodeError("Unable to parse model JSON", text, 0)
async def _nemoclaw_chat(
system_content: str,
user_content: str,
timeout: float = NEMOCLAW_TIMEOUT,
) -> dict:
endpoints: list[tuple[str, str, str, dict[str, str]]] = []
if NVIDIA_API_KEY:
endpoints.append(
(
"nvidia_primary",
NVIDIA_CHAT_URL,
NVIDIA_MODEL,
{
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
},
)
)
if NVIDIA_FALLBACK_MODEL and NVIDIA_FALLBACK_MODEL != NVIDIA_MODEL:
endpoints.append(
(
"nvidia_fallback",
NVIDIA_CHAT_URL,
NVIDIA_FALLBACK_MODEL,
{
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
},
)
)
if NEMOCLAW_CHAT_URL:
headers = {"Content-Type": "application/json"}
if NEMOCLAW_API_TOKEN:
headers["Authorization"] = f"Bearer {NEMOCLAW_API_TOKEN}"
endpoints.append(("compatible_endpoint", NEMOCLAW_CHAT_URL, NEMOCLAW_MODEL, headers))
if ALLOW_LOCAL_FALLBACK:
endpoints.append(
("ollama_fallback", OLLAMA_CHAT_URL, OLLAMA_MODEL, {"Content-Type": "application/json"})
)
if not endpoints:
raise RuntimeError(
"No NemoClaw inference endpoint is configured. "
"Set NVIDIA_API_KEY or NEMOCLAW_BASE_URL."
)
t_start = time.monotonic()
last_error: Exception | None = None
for label, url, model, headers in endpoints:
try:
result = await _attempt_chat(
label=label,
url=url,
model=model,
system_content=system_content,
user_content=user_content,
timeout=timeout,
headers=headers,
)
logger.info(
"NemoClaw inference via %s model=%s elapsed=%.2fs",
label,
model,
time.monotonic() - t_start,
)
return result
except (httpx.ConnectError, httpx.TimeoutException) as exc:
logger.warning("NemoClaw %s unreachable (%s), trying next endpoint", label, exc)
last_error = exc
except httpx.HTTPStatusError as exc:
logger.error(
"NemoClaw %s HTTP %s: %s",
label,
exc.response.status_code,
exc.response.text[:300],
)
last_error = exc
except (KeyError, IndexError, TypeError, json.JSONDecodeError) as exc:
logger.error("NemoClaw %s returned invalid JSON: %s", label, exc)
last_error = exc
raise RuntimeError(f"All NemoClaw endpoints failed. Last error: {last_error}")
async def score_qd(
*,
lead_id: str,
batch_id: str,
blend_shapes: dict[str, float],
video_ts_ms: int,
scene_label: Optional[str] = None,
crm_context: dict,
current_qd_score: Optional[int] = None,
) -> QDResult:
system_prompt = _load_system_prompt("qd_calculator")
user_content = json.dumps(
{
"lead_id": lead_id,
"batch_id": batch_id,
"video_ts_ms": video_ts_ms,
"scene_label": scene_label,
"current_qd_score": current_qd_score,
"crm_context": crm_context,
"blend_shapes": blend_shapes,
},
indent=2,
)
data = await _nemoclaw_chat(system_prompt, user_content)
raw_score = int(data.get("qd_score", current_qd_score or 50))
return QDResult(
qd_score=max(1, min(100, raw_score)),
reasoning=str(data.get("reasoning", "")),
confidence=float(data.get("confidence", 0.7)),
)
async def tag_lead(
*,
lead_id: str,
phone: str,
budget: Optional[str],
message_text: str,
) -> TagResult:
system_prompt = _load_system_prompt("lead_tagger")
user_content = (
f"Lead ID: {lead_id}\n"
f"Phone: {phone}\n"
f"Budget indicator: {budget or 'unknown'}\n"
f"First message: {message_text}"
)
try:
data = await _nemoclaw_chat(system_prompt, user_content)
except Exception as exc:
logger.error("Lead tagging failed for %s: %s", lead_id, exc)
return TagResult()
return TagResult(
tags_to_add=data.get("tags_to_add", []),
tags_to_remove=data.get("tags_to_remove", []),
)
async def profile_cctv_visitor(
*,
license_plate: Optional[str],
zone: str,
face_description: Optional[str] = None,
vehicle_description: Optional[str] = None,
) -> CCTVProfileResult:
system_prompt = _load_system_prompt("cctv_profiler")
user_content = json.dumps(
{
"license_plate": license_plate,
"zone": zone,
"face_description": face_description,
"vehicle_description": vehicle_description,
},
indent=2,
)
try:
data = await _nemoclaw_chat(system_prompt, user_content, timeout=20.0)
except Exception as exc:
logger.error("CCTV profiling failed (zone=%s): %s", zone, exc)
return CCTVProfileResult(wealth_indicator="unknown", vehicle_class="unknown")
return CCTVProfileResult(
wealth_indicator=data.get("wealth_indicator", "unknown"),
vehicle_class=data.get("vehicle_class", "unknown"),
tags_to_add=data.get("tags_to_add", []),
notes=data.get("notes", ""),
)
async def health_check() -> dict:
results: dict[str, str] = {}
endpoints: list[tuple[str, str, str, dict[str, str]]] = []
if NVIDIA_API_KEY:
endpoints.append(
(
"nvidia_primary",
NVIDIA_CHAT_URL,
NVIDIA_MODEL,
{
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
},
)
)
if NEMOCLAW_CHAT_URL:
headers = {"Content-Type": "application/json"}
if NEMOCLAW_API_TOKEN:
headers["Authorization"] = f"Bearer {NEMOCLAW_API_TOKEN}"
endpoints.append(("compatible_endpoint", NEMOCLAW_CHAT_URL, NEMOCLAW_MODEL, headers))
if ALLOW_LOCAL_FALLBACK:
endpoints.append(
("ollama_fallback", OLLAMA_CHAT_URL, OLLAMA_MODEL, {"Content-Type": "application/json"})
)
for name, url, model, headers in endpoints:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
url,
json={
"model": model,
"messages": [{"role": "user", "content": "ping"}],
"max_tokens": 5,
},
headers=headers,
)
results[name] = "ok" if response.status_code < 500 else f"http_{response.status_code}"
except Exception as exc:
results[name] = f"error: {exc}"
results["model"] = NVIDIA_MODEL if NVIDIA_API_KEY else NEMOCLAW_MODEL
results["primary_url"] = NVIDIA_CHAT_URL if NVIDIA_API_KEY else (NEMOCLAW_CHAT_URL or OLLAMA_CHAT_URL)
return results