Files
Project_Velocity/backend/routers/sentinel.py
2026-04-12 02:02:58 +05:30

480 lines
17 KiB
Python

"""
backend/routers/sentinel.py - Sentinel WebSocket and biometric endpoints.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import uuid
from datetime import datetime, timezone
from typing import Any, Set
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from backend.auth.dependencies import UserPrincipal, require_role
from backend.db.pool import get_pool
from backend.services.auto_mode_matcher import auto_mode_match_session
from backend.services.nemoclaw_client import score_qd, tag_lead
logger = logging.getLogger("velocity.sentinel")
router = APIRouter()
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
)
class SentinelConnectionManager:
def __init__(self) -> None:
self._channels: dict[str, Set[WebSocket]] = {
"notifications": set(),
"perception": set(),
}
async def connect(self, ws: WebSocket, channel: str) -> None:
await ws.accept()
self._channels.setdefault(channel, set()).add(ws)
logger.info("WS connected: channel=%s total=%d", channel, len(self._channels[channel]))
def disconnect(self, ws: WebSocket, channel: str) -> None:
self._channels.get(channel, set()).discard(ws)
async def broadcast(self, payload: dict[str, Any], channel: str = "notifications") -> None:
dead: Set[WebSocket] = set()
for ws in list(self._channels.get(channel, set())):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.add(ws)
self._channels[channel] -= dead
async def broadcast_all(self, payload: dict[str, Any]) -> None:
for channel in self._channels:
await self.broadcast(payload, channel)
manager = SentinelConnectionManager()
def _is_uuid(value: str | None) -> bool:
return bool(value and _UUID_RE.match(value))
async def _resolve_scene_label(
conn: asyncpg.Connection,
video_asset_id: str | None,
video_ts_ms: int,
) -> str | None:
if not video_asset_id:
return None
row = await conn.fetchrow(
"""
SELECT room_type, description
FROM video_scene_maps
WHERE video_asset_id = $1
AND start_ms <= $2
AND end_ms >= $2
ORDER BY start_ms DESC
LIMIT 1
""",
video_asset_id,
video_ts_ms,
)
if not row:
return None
description = row["description"]
return f"{row['room_type']} - {description}" if description else str(row["room_type"])
async def _ensure_session_row(
conn: asyncpg.Connection,
*,
session_id: str,
session_mode: str,
lead_id: str | None,
video_asset_id: str | None,
) -> None:
await conn.execute(
"""
INSERT INTO perception_sessions (id, session_mode, lead_id, video_asset_id, auto_mode_evidence)
VALUES ($1::uuid, $2::session_mode_enum, $3::uuid, $4, '{}'::jsonb)
ON CONFLICT (id) DO UPDATE
SET video_asset_id = EXCLUDED.video_asset_id,
lead_id = COALESCE(perception_sessions.lead_id, EXCLUDED.lead_id)
""",
session_id,
session_mode,
lead_id if _is_uuid(lead_id) else None,
video_asset_id or "unknown",
)
@router.websocket("/ws/notifications")
async def notifications_ws(ws: WebSocket) -> None:
await manager.connect(ws, "notifications")
try:
while True:
data = await ws.receive_text()
await ws.send_text(json.dumps({"type": "ack", "data": data}))
except WebSocketDisconnect:
manager.disconnect(ws, "notifications")
@router.websocket("/ws/perception")
async def perception_ws(ws: WebSocket) -> None:
await manager.connect(ws, "perception")
pool: asyncpg.Pool | None = getattr(ws.app.state, "db_pool", None)
if pool is None:
await ws.send_text(json.dumps({"type": "system", "data": {"error": "Database unavailable"}}))
await ws.close(code=1011)
return
try:
while True:
raw = await ws.receive_text()
try:
packet = json.loads(raw)
except json.JSONDecodeError:
continue
if packet.get("event") != "BIOMETRIC_PACKET":
continue
lead_id = packet.get("lead_id")
session_id = packet.get("session_id")
session_mode = packet.get("session_mode", "assigned")
video_ts_ms = int(packet.get("video_ts_ms", 0))
video_asset_id = packet.get("video_asset_id")
blend_shapes = packet.get("blend_shapes", {})
if (
not session_id
or not _is_uuid(session_id)
or session_mode not in {"assigned", "auto"}
or not isinstance(blend_shapes, dict)
or not blend_shapes
):
continue
async def _score(
sid: str = session_id,
lid: str | None = lead_id,
mode: str = session_mode,
bts: int = video_ts_ms,
bs: dict[str, float] = blend_shapes,
asset_id: str | None = video_asset_id,
) -> None:
try:
async with pool.acquire() as conn:
async with conn.transaction():
await _ensure_session_row(
conn,
session_id=sid,
session_mode=mode,
lead_id=lid,
video_asset_id=asset_id,
)
lead_row = None
if _is_uuid(lid):
lead_row = await conn.fetchrow(
"""
SELECT quantum_dynamics_score, budget, interest, tags
FROM leads_intelligence
WHERE id = $1::uuid
""",
lid,
)
session_row = await conn.fetchrow(
"""
SELECT final_qd_score, auto_mode_evidence
FROM perception_sessions
WHERE id = $1::uuid
""",
sid,
)
scene_label = await _resolve_scene_label(conn, asset_id, bts)
crm = {
"budget": (lead_row["budget"] if lead_row else None) or "unknown",
"interest": (lead_row["interest"] if lead_row else None) or "unknown",
"prior_interaction_count": await conn.fetchval(
"""
SELECT COUNT(*)
FROM omnichannel_logs
WHERE lead_id = $1::uuid
""",
lid,
)
if _is_uuid(lid)
else 0,
"tags": list((lead_row["tags"] if lead_row else None) or []),
"session_mode": mode,
}
result = await score_qd(
lead_id=lid or sid,
batch_id=sid,
blend_shapes=bs,
video_ts_ms=bts,
scene_label=scene_label,
crm_context=crm,
current_qd_score=(
lead_row["quantum_dynamics_score"]
if lead_row
else (session_row["final_qd_score"] if session_row else 50)
),
)
evidence = dict((session_row["auto_mode_evidence"] if session_row else {}) or {})
evidence.update(
{
"last_scene_label": scene_label,
"last_video_ts_ms": bts,
}
)
await conn.execute(
"""
UPDATE perception_sessions
SET final_qd_score = $1,
auto_mode_evidence = $2::jsonb
WHERE id = $3::uuid
""",
result.qd_score,
evidence,
sid,
)
if lead_row and _is_uuid(lid):
await conn.execute(
"""
INSERT INTO omnichannel_logs (event_type, lead_id, payload, video_timestamp_ms)
VALUES ('SENTIMENT_SPIKE', $1::uuid, $2::jsonb, $3)
""",
lid,
json.dumps(
{
"blend_shapes": bs,
"scene_label": scene_label,
"qd_before": lead_row["quantum_dynamics_score"],
"qd_after": result.qd_score,
"confidence": result.confidence,
"session_id": sid,
}
),
bts,
)
await conn.execute(
"""
UPDATE leads_intelligence
SET quantum_dynamics_score = $1, updated_at = NOW()
WHERE id = $2::uuid
""",
result.qd_score,
lid,
)
baseline = (
lead_row["quantum_dynamics_score"]
if lead_row and lead_row["quantum_dynamics_score"] is not None
else ((session_row["final_qd_score"] if session_row else None) or 50)
)
event = {
"type": "QD_UPDATED",
"data": {
"lead_id": lid,
"session_id": sid,
"qd_score": result.qd_score,
"delta": result.qd_score - baseline,
"reasoning": result.reasoning,
"scene_label": scene_label,
"timestamp": datetime.now(timezone.utc).isoformat(),
},
}
await manager.broadcast_all(event)
except Exception as exc:
logger.exception("QD scoring failed for session %s: %s", sid, exc)
asyncio.create_task(_score())
except WebSocketDisconnect:
manager.disconnect(ws, "perception")
class ConsentRequest(BaseModel):
lead_id: str
ip_address: str | None = None
user_agent: str | None = None
class TagLeadRequest(BaseModel):
lead_id: str
phone: str
budget: str | None = None
message_text: str
class SessionCompleteRequest(BaseModel):
session_id: str
session_mode: str
lead_id: str | None = None
final_qd_score: int | None = None
@router.post("/consent", status_code=201, summary="Record biometric consent")
async def record_consent(
body: ConsentRequest,
pool: asyncpg.Pool = Depends(get_pool),
) -> dict[str, str]:
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO consent_log (lead_id, ip_address, user_agent, action)
VALUES ($1::uuid, $2, $3, 'granted')
""",
body.lead_id,
body.ip_address,
body.user_agent,
)
return {"status": "consent_recorded"}
@router.post("/session/complete", summary="Close a perception session and finalize auto mode if needed")
async def complete_session(
body: SessionCompleteRequest,
pool: asyncpg.Pool = Depends(get_pool),
) -> dict[str, Any]:
if not _is_uuid(body.session_id):
raise HTTPException(status_code=400, detail="session_id must be a UUID.")
if body.session_mode not in {"assigned", "auto"}:
raise HTTPException(status_code=400, detail="session_mode must be assigned or auto.")
async with pool.acquire() as conn:
async with conn.transaction():
await _ensure_session_row(
conn,
session_id=body.session_id,
session_mode=body.session_mode,
lead_id=body.lead_id,
video_asset_id=None,
)
await conn.execute(
"""
UPDATE perception_sessions
SET ended_at = NOW(),
final_qd_score = COALESCE($1, final_qd_score)
WHERE id = $2::uuid
""",
body.final_qd_score,
body.session_id,
)
if body.session_mode == "auto":
result = await auto_mode_match_session(conn, session_id=body.session_id)
event = {
"type": "LEAD_TAGGED",
"data": {
"lead_id": result.lead_id,
"tags": result.tags_applied,
"lead_name": "Auto-matched lead",
"session_id": body.session_id,
},
}
await manager.broadcast(event, "notifications")
return {
"status": "completed",
"session_id": body.session_id,
"lead_id": result.lead_id,
"match_action": result.action,
"match_confidence": result.confidence,
"tags_applied": result.tags_applied,
}
return {"status": "completed", "session_id": body.session_id}
@router.post("/tag-lead", summary="Apply NemoClaw lead tagging to a CRM lead")
async def tag_lead_route(
body: TagLeadRequest,
pool: asyncpg.Pool = Depends(get_pool),
user: UserPrincipal = Depends(require_role("SENIOR_BROKER")),
) -> dict[str, Any]:
result = await tag_lead(
lead_id=body.lead_id,
phone=body.phone,
budget=body.budget,
message_text=body.message_text,
)
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE leads_intelligence
SET tags = ARRAY(
SELECT DISTINCT unnest(
COALESCE(tags, ARRAY[]::text[]) || $1::text[]
)
)
WHERE id = $2::uuid
""",
result.tags_to_add,
body.lead_id,
)
await conn.execute(
"""
INSERT INTO omnichannel_logs (event_type, lead_id, payload)
VALUES ('LEAD_TAGGED', $1::uuid, $2::jsonb)
""",
body.lead_id,
json.dumps(
{
"tags_added": result.tags_to_add,
"tags_removed": result.tags_to_remove,
"actor_user_id": user.user_id,
}
),
)
event = {
"type": "LEAD_TAGGED",
"data": {
"lead_id": body.lead_id,
"tags": result.tags_to_add,
},
}
await manager.broadcast(event, "notifications")
return {
"lead_id": body.lead_id,
"tags_to_add": result.tags_to_add,
"tags_to_remove": result.tags_to_remove,
}
@router.get("/qd-score/{lead_id}", summary="Current Quantum Dynamics score for a lead")
async def get_qd_score(
lead_id: str,
pool: asyncpg.Pool = Depends(get_pool),
user: UserPrincipal = Depends(require_role("SENIOR_BROKER")),
) -> dict[str, Any]:
del user
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT quantum_dynamics_score, tags FROM leads_intelligence WHERE id = $1::uuid",
lead_id,
)
if not row:
raise HTTPException(status_code=404, detail="Lead not found.")
return {
"lead_id": lead_id,
"qd_score": row["quantum_dynamics_score"],
"tags": list(row["tags"] or []),
}
async def broadcast_sentinel_event(payload: dict[str, Any]) -> None:
await manager.broadcast(payload, "notifications")