""" backend/routers/sentinel.py - Sentinel WebSocket and biometric endpoints. """ from __future__ import annotations import asyncio import json import logging import re import uuid from datetime import datetime, timezone from typing import Any, Set import asyncpg from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect from pydantic import BaseModel from backend.auth.dependencies import UserPrincipal, require_role from backend.db.pool import get_pool from backend.services.auto_mode_matcher import auto_mode_match_session from backend.services.nemoclaw_client import score_qd, tag_lead logger = logging.getLogger("velocity.sentinel") router = APIRouter() _UUID_RE = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" ) class SentinelConnectionManager: def __init__(self) -> None: self._channels: dict[str, Set[WebSocket]] = { "notifications": set(), "perception": set(), } async def connect(self, ws: WebSocket, channel: str) -> None: await ws.accept() self._channels.setdefault(channel, set()).add(ws) logger.info("WS connected: channel=%s total=%d", channel, len(self._channels[channel])) def disconnect(self, ws: WebSocket, channel: str) -> None: self._channels.get(channel, set()).discard(ws) async def broadcast(self, payload: dict[str, Any], channel: str = "notifications") -> None: dead: Set[WebSocket] = set() for ws in list(self._channels.get(channel, set())): try: await ws.send_text(json.dumps(payload)) except Exception: dead.add(ws) self._channels[channel] -= dead async def broadcast_all(self, payload: dict[str, Any]) -> None: for channel in self._channels: await self.broadcast(payload, channel) manager = SentinelConnectionManager() def _is_uuid(value: str | None) -> bool: return bool(value and _UUID_RE.match(value)) async def _resolve_scene_label( conn: asyncpg.Connection, video_asset_id: str | None, video_ts_ms: int, ) -> str | None: if not video_asset_id: return None row = await conn.fetchrow( """ SELECT room_type, description FROM video_scene_maps WHERE video_asset_id = $1 AND start_ms <= $2 AND end_ms >= $2 ORDER BY start_ms DESC LIMIT 1 """, video_asset_id, video_ts_ms, ) if not row: return None description = row["description"] return f"{row['room_type']} - {description}" if description else str(row["room_type"]) async def _ensure_session_row( conn: asyncpg.Connection, *, session_id: str, session_mode: str, lead_id: str | None, video_asset_id: str | None, ) -> None: await conn.execute( """ INSERT INTO perception_sessions (id, session_mode, lead_id, video_asset_id, auto_mode_evidence) VALUES ($1::uuid, $2::session_mode_enum, $3::uuid, $4, '{}'::jsonb) ON CONFLICT (id) DO UPDATE SET video_asset_id = EXCLUDED.video_asset_id, lead_id = COALESCE(perception_sessions.lead_id, EXCLUDED.lead_id) """, session_id, session_mode, lead_id if _is_uuid(lead_id) else None, video_asset_id or "unknown", ) @router.websocket("/ws/notifications") async def notifications_ws(ws: WebSocket) -> None: await manager.connect(ws, "notifications") try: while True: data = await ws.receive_text() await ws.send_text(json.dumps({"type": "ack", "data": data})) except WebSocketDisconnect: manager.disconnect(ws, "notifications") @router.websocket("/ws/perception") async def perception_ws(ws: WebSocket) -> None: await manager.connect(ws, "perception") pool: asyncpg.Pool | None = getattr(ws.app.state, "db_pool", None) if pool is None: await ws.send_text(json.dumps({"type": "system", "data": {"error": "Database unavailable"}})) await ws.close(code=1011) return try: while True: raw = await ws.receive_text() try: packet = json.loads(raw) except json.JSONDecodeError: continue if packet.get("event") != "BIOMETRIC_PACKET": continue lead_id = packet.get("lead_id") session_id = packet.get("session_id") session_mode = packet.get("session_mode", "assigned") video_ts_ms = int(packet.get("video_ts_ms", 0)) video_asset_id = packet.get("video_asset_id") blend_shapes = packet.get("blend_shapes", {}) if ( not session_id or not _is_uuid(session_id) or session_mode not in {"assigned", "auto"} or not isinstance(blend_shapes, dict) or not blend_shapes ): continue async def _score( sid: str = session_id, lid: str | None = lead_id, mode: str = session_mode, bts: int = video_ts_ms, bs: dict[str, float] = blend_shapes, asset_id: str | None = video_asset_id, ) -> None: try: async with pool.acquire() as conn: async with conn.transaction(): await _ensure_session_row( conn, session_id=sid, session_mode=mode, lead_id=lid, video_asset_id=asset_id, ) lead_row = None if _is_uuid(lid): lead_row = await conn.fetchrow( """ SELECT quantum_dynamics_score, budget, interest, tags FROM leads_intelligence WHERE id = $1::uuid """, lid, ) session_row = await conn.fetchrow( """ SELECT final_qd_score, auto_mode_evidence FROM perception_sessions WHERE id = $1::uuid """, sid, ) scene_label = await _resolve_scene_label(conn, asset_id, bts) crm = { "budget": (lead_row["budget"] if lead_row else None) or "unknown", "interest": (lead_row["interest"] if lead_row else None) or "unknown", "prior_interaction_count": await conn.fetchval( """ SELECT COUNT(*) FROM omnichannel_logs WHERE lead_id = $1::uuid """, lid, ) if _is_uuid(lid) else 0, "tags": list((lead_row["tags"] if lead_row else None) or []), "session_mode": mode, } result = await score_qd( lead_id=lid or sid, batch_id=sid, blend_shapes=bs, video_ts_ms=bts, scene_label=scene_label, crm_context=crm, current_qd_score=( lead_row["quantum_dynamics_score"] if lead_row else (session_row["final_qd_score"] if session_row else 50) ), ) evidence = dict((session_row["auto_mode_evidence"] if session_row else {}) or {}) evidence.update( { "last_scene_label": scene_label, "last_video_ts_ms": bts, } ) await conn.execute( """ UPDATE perception_sessions SET final_qd_score = $1, auto_mode_evidence = $2::jsonb WHERE id = $3::uuid """, result.qd_score, evidence, sid, ) if lead_row and _is_uuid(lid): await conn.execute( """ INSERT INTO omnichannel_logs (event_type, lead_id, payload, video_timestamp_ms) VALUES ('SENTIMENT_SPIKE', $1::uuid, $2::jsonb, $3) """, lid, json.dumps( { "blend_shapes": bs, "scene_label": scene_label, "qd_before": lead_row["quantum_dynamics_score"], "qd_after": result.qd_score, "confidence": result.confidence, "session_id": sid, } ), bts, ) await conn.execute( """ UPDATE leads_intelligence SET quantum_dynamics_score = $1, updated_at = NOW() WHERE id = $2::uuid """, result.qd_score, lid, ) baseline = ( lead_row["quantum_dynamics_score"] if lead_row and lead_row["quantum_dynamics_score"] is not None else ((session_row["final_qd_score"] if session_row else None) or 50) ) event = { "type": "QD_UPDATED", "data": { "lead_id": lid, "session_id": sid, "qd_score": result.qd_score, "delta": result.qd_score - baseline, "reasoning": result.reasoning, "scene_label": scene_label, "timestamp": datetime.now(timezone.utc).isoformat(), }, } await manager.broadcast_all(event) except Exception as exc: logger.exception("QD scoring failed for session %s: %s", sid, exc) asyncio.create_task(_score()) except WebSocketDisconnect: manager.disconnect(ws, "perception") class ConsentRequest(BaseModel): lead_id: str ip_address: str | None = None user_agent: str | None = None class TagLeadRequest(BaseModel): lead_id: str phone: str budget: str | None = None message_text: str class SessionCompleteRequest(BaseModel): session_id: str session_mode: str lead_id: str | None = None final_qd_score: int | None = None @router.post("/consent", status_code=201, summary="Record biometric consent") async def record_consent( body: ConsentRequest, pool: asyncpg.Pool = Depends(get_pool), ) -> dict[str, str]: async with pool.acquire() as conn: await conn.execute( """ INSERT INTO consent_log (lead_id, ip_address, user_agent, action) VALUES ($1::uuid, $2, $3, 'granted') """, body.lead_id, body.ip_address, body.user_agent, ) return {"status": "consent_recorded"} @router.post("/session/complete", summary="Close a perception session and finalize auto mode if needed") async def complete_session( body: SessionCompleteRequest, pool: asyncpg.Pool = Depends(get_pool), ) -> dict[str, Any]: if not _is_uuid(body.session_id): raise HTTPException(status_code=400, detail="session_id must be a UUID.") if body.session_mode not in {"assigned", "auto"}: raise HTTPException(status_code=400, detail="session_mode must be assigned or auto.") async with pool.acquire() as conn: async with conn.transaction(): await _ensure_session_row( conn, session_id=body.session_id, session_mode=body.session_mode, lead_id=body.lead_id, video_asset_id=None, ) await conn.execute( """ UPDATE perception_sessions SET ended_at = NOW(), final_qd_score = COALESCE($1, final_qd_score) WHERE id = $2::uuid """, body.final_qd_score, body.session_id, ) if body.session_mode == "auto": result = await auto_mode_match_session(conn, session_id=body.session_id) event = { "type": "LEAD_TAGGED", "data": { "lead_id": result.lead_id, "tags": result.tags_applied, "lead_name": "Auto-matched lead", "session_id": body.session_id, }, } await manager.broadcast(event, "notifications") return { "status": "completed", "session_id": body.session_id, "lead_id": result.lead_id, "match_action": result.action, "match_confidence": result.confidence, "tags_applied": result.tags_applied, } return {"status": "completed", "session_id": body.session_id} @router.post("/tag-lead", summary="Apply NemoClaw lead tagging to a CRM lead") async def tag_lead_route( body: TagLeadRequest, pool: asyncpg.Pool = Depends(get_pool), user: UserPrincipal = Depends(require_role("SENIOR_BROKER")), ) -> dict[str, Any]: result = await tag_lead( lead_id=body.lead_id, phone=body.phone, budget=body.budget, message_text=body.message_text, ) async with pool.acquire() as conn: await conn.execute( """ UPDATE leads_intelligence SET tags = ARRAY( SELECT DISTINCT unnest( COALESCE(tags, ARRAY[]::text[]) || $1::text[] ) ) WHERE id = $2::uuid """, result.tags_to_add, body.lead_id, ) await conn.execute( """ INSERT INTO omnichannel_logs (event_type, lead_id, payload) VALUES ('LEAD_TAGGED', $1::uuid, $2::jsonb) """, body.lead_id, json.dumps( { "tags_added": result.tags_to_add, "tags_removed": result.tags_to_remove, "actor_user_id": user.user_id, } ), ) event = { "type": "LEAD_TAGGED", "data": { "lead_id": body.lead_id, "tags": result.tags_to_add, }, } await manager.broadcast(event, "notifications") return { "lead_id": body.lead_id, "tags_to_add": result.tags_to_add, "tags_to_remove": result.tags_to_remove, } @router.get("/qd-score/{lead_id}", summary="Current Quantum Dynamics score for a lead") async def get_qd_score( lead_id: str, pool: asyncpg.Pool = Depends(get_pool), user: UserPrincipal = Depends(require_role("SENIOR_BROKER")), ) -> dict[str, Any]: del user async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT quantum_dynamics_score, tags FROM leads_intelligence WHERE id = $1::uuid", lead_id, ) if not row: raise HTTPException(status_code=404, detail="Lead not found.") return { "lead_id": lead_id, "qd_score": row["quantum_dynamics_score"], "tags": list(row["tags"] or []), } async def broadcast_sentinel_event(payload: dict[str, Any]) -> None: await manager.broadcast(payload, "notifications")