""" backend/routers/cctv.py - CCTV ingestion and auto-mode session linkage. """ from __future__ import annotations from datetime import datetime, timezone from typing import Any import asyncpg from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from backend.auth.dependencies import UserPrincipal, require_role from backend.db.pool import get_pool from backend.services.auto_mode_matcher import auto_mode_match_session from backend.services.nemoclaw_client import profile_cctv_visitor router = APIRouter() class CCTVEventRequest(BaseModel): zone: str session_id: str | None = None license_plate: str | None = None face_description: str | None = None vehicle_description: str | None = None raw_payload: dict[str, Any] = Field(default_factory=dict) captured_at: datetime | None = None class FinalizeAutoModeRequest(BaseModel): session_id: str async def _ensure_session( conn: asyncpg.Connection, *, session_id: str | None, ) -> None: if not session_id: return await conn.execute( """ INSERT INTO perception_sessions (id, session_mode, video_asset_id, auto_mode_evidence) VALUES ($1::uuid, 'auto', 'unknown', '{}'::jsonb) ON CONFLICT (id) DO NOTHING """, session_id, ) @router.post("/event", summary="Receive a CCTV frame event from the ONVIF/RTSP bridge") async def ingest_cctv_event( body: CCTVEventRequest, pool: asyncpg.Pool = Depends(get_pool), user: UserPrincipal = Depends(require_role("SENIOR_BROKER")), ) -> dict[str, Any]: del user profile = await profile_cctv_visitor( license_plate=body.license_plate, zone=body.zone, face_description=body.face_description, vehicle_description=body.vehicle_description, ) captured_at = body.captured_at or datetime.now(timezone.utc) async with pool.acquire() as conn: async with conn.transaction(): await _ensure_session(conn, session_id=body.session_id) row = await conn.fetchrow( """ INSERT INTO cctv_events (zone, license_plate, vehicle_class, wealth_indicator, nemoclaw_tags, nemoclaw_notes, linked_session_id, captured_at, raw_payload) VALUES ($1, $2, $3, $4, $5::text[], $6, $7::uuid, $8, $9::jsonb) RETURNING id::text """, body.zone, body.license_plate, profile.vehicle_class, profile.wealth_indicator, profile.tags_to_add, profile.notes, body.session_id, captured_at, body.raw_payload, ) if body.session_id: await conn.execute( """ UPDATE perception_sessions SET auto_mode_evidence = auto_mode_evidence || $1::jsonb WHERE id = $2::uuid """, { "license_plate": body.license_plate, "vehicle_description": body.vehicle_description, "face_description": body.face_description, "vehicle_class": profile.vehicle_class, "wealth_indicator": profile.wealth_indicator, "nemoclaw_tags": profile.tags_to_add, "latest_cctv_event_id": row["id"], }, body.session_id, ) return { "status": "ingested", "event_id": row["id"], "session_id": body.session_id, "wealth_indicator": profile.wealth_indicator, "vehicle_class": profile.vehicle_class, "tags_to_add": profile.tags_to_add, } @router.post("/finalize-auto-mode", summary="Match or create a lead after an auto mode session") async def finalize_auto_mode( body: FinalizeAutoModeRequest, pool: asyncpg.Pool = Depends(get_pool), user: UserPrincipal = Depends(require_role("SENIOR_BROKER")), ) -> dict[str, Any]: del user async with pool.acquire() as conn: async with conn.transaction(): try: result = await auto_mode_match_session(conn, session_id=body.session_id) except ValueError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return { "status": "matched", "session_id": body.session_id, "lead_id": result.lead_id, "action": result.action, "confidence": result.confidence, "rationale": result.rationale, "tags_applied": result.tags_applied, }