Files
2026-04-12 02:02:58 +05:30

143 lines
4.7 KiB
Python

"""
backend/routers/cctv.py - CCTV ingestion and auto-mode session linkage.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import asyncpg
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from backend.auth.dependencies import UserPrincipal, require_role
from backend.db.pool import get_pool
from backend.services.auto_mode_matcher import auto_mode_match_session
from backend.services.nemoclaw_client import profile_cctv_visitor
router = APIRouter()
class CCTVEventRequest(BaseModel):
zone: str
session_id: str | None = None
license_plate: str | None = None
face_description: str | None = None
vehicle_description: str | None = None
raw_payload: dict[str, Any] = Field(default_factory=dict)
captured_at: datetime | None = None
class FinalizeAutoModeRequest(BaseModel):
session_id: str
async def _ensure_session(
conn: asyncpg.Connection,
*,
session_id: str | None,
) -> None:
if not session_id:
return
await conn.execute(
"""
INSERT INTO perception_sessions (id, session_mode, video_asset_id, auto_mode_evidence)
VALUES ($1::uuid, 'auto', 'unknown', '{}'::jsonb)
ON CONFLICT (id) DO NOTHING
""",
session_id,
)
@router.post("/event", summary="Receive a CCTV frame event from the ONVIF/RTSP bridge")
async def ingest_cctv_event(
body: CCTVEventRequest,
pool: asyncpg.Pool = Depends(get_pool),
user: UserPrincipal = Depends(require_role("SENIOR_BROKER")),
) -> dict[str, Any]:
del user
profile = await profile_cctv_visitor(
license_plate=body.license_plate,
zone=body.zone,
face_description=body.face_description,
vehicle_description=body.vehicle_description,
)
captured_at = body.captured_at or datetime.now(timezone.utc)
async with pool.acquire() as conn:
async with conn.transaction():
await _ensure_session(conn, session_id=body.session_id)
row = await conn.fetchrow(
"""
INSERT INTO cctv_events
(zone, license_plate, vehicle_class, wealth_indicator, nemoclaw_tags,
nemoclaw_notes, linked_session_id, captured_at, raw_payload)
VALUES
($1, $2, $3, $4, $5::text[], $6, $7::uuid, $8, $9::jsonb)
RETURNING id::text
""",
body.zone,
body.license_plate,
profile.vehicle_class,
profile.wealth_indicator,
profile.tags_to_add,
profile.notes,
body.session_id,
captured_at,
body.raw_payload,
)
if body.session_id:
await conn.execute(
"""
UPDATE perception_sessions
SET auto_mode_evidence = auto_mode_evidence || $1::jsonb
WHERE id = $2::uuid
""",
{
"license_plate": body.license_plate,
"vehicle_description": body.vehicle_description,
"face_description": body.face_description,
"vehicle_class": profile.vehicle_class,
"wealth_indicator": profile.wealth_indicator,
"nemoclaw_tags": profile.tags_to_add,
"latest_cctv_event_id": row["id"],
},
body.session_id,
)
return {
"status": "ingested",
"event_id": row["id"],
"session_id": body.session_id,
"wealth_indicator": profile.wealth_indicator,
"vehicle_class": profile.vehicle_class,
"tags_to_add": profile.tags_to_add,
}
@router.post("/finalize-auto-mode", summary="Match or create a lead after an auto mode session")
async def finalize_auto_mode(
body: FinalizeAutoModeRequest,
pool: asyncpg.Pool = Depends(get_pool),
user: UserPrincipal = Depends(require_role("SENIOR_BROKER")),
) -> dict[str, Any]:
del user
async with pool.acquire() as conn:
async with conn.transaction():
try:
result = await auto_mode_match_session(conn, session_id=body.session_id)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return {
"status": "matched",
"session_id": body.session_id,
"lead_id": result.lead_id,
"action": result.action,
"confidence": result.confidence,
"rationale": result.rationale,
"tags_applied": result.tags_applied,
}