feat: Oracle CRM Page, Synthetic Client Data and Live Snapshot when hitting emotion hotpoint

This commit is contained in:
Sagnik
2026-04-19 00:43:01 +05:30
parent f616a33ab0
commit 4b21c2cad6
197 changed files with 105054 additions and 89 deletions

View File

@@ -153,7 +153,26 @@ async def login(body: LoginRequest):
@app.get("/api/auth/me", tags=["Auth"])
async def me(user: UserPrincipal = Depends(get_current_user)):
return {"user_id": user.user_id, "role": user.role}
pool = app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT full_name, email
FROM users_and_roles
WHERE id = $1::uuid
""",
user.user_id,
)
return {
"user_id": user.user_id,
"role": user.role,
"full_name": row["full_name"] if row else None,
"email": row["email"] if row else None,
}
# ── Catalyst WebSocket (preserved from v1) ────────────────────────────────────

View File

@@ -114,6 +114,263 @@ async def _ensure_session_row(
)
async def _resolve_canonical_context(
conn: asyncpg.Connection,
*,
person_id: str | None,
canonical_lead_id: str | None,
legacy_lead_id: str | None,
) -> dict[str, Any] | None:
if _is_uuid(person_id):
row = await conn.fetchrow(
"""
SELECT
p.person_id::text AS person_id,
p.full_name,
p.primary_phone,
p.buyer_type,
p.legacy_li_id::text AS legacy_li_id,
cl.lead_id::text AS lead_id,
cl.status AS lead_status,
cl.budget_band,
cl.urgency,
COALESCE((
SELECT current_value
FROM intel_qd_scores
WHERE person_id = p.person_id AND score_type = 'engagement_score'
ORDER BY computed_at DESC
LIMIT 1
), 0.50) AS engagement_score
FROM crm_people p
LEFT JOIN crm_leads cl ON cl.person_id = p.person_id
WHERE p.person_id = $1::uuid
ORDER BY cl.created_at DESC NULLS LAST
LIMIT 1
""",
person_id,
)
if row:
return dict(row)
if _is_uuid(canonical_lead_id):
row = await conn.fetchrow(
"""
SELECT
p.person_id::text AS person_id,
p.full_name,
p.primary_phone,
p.buyer_type,
p.legacy_li_id::text AS legacy_li_id,
cl.lead_id::text AS lead_id,
cl.status AS lead_status,
cl.budget_band,
cl.urgency,
COALESCE((
SELECT current_value
FROM intel_qd_scores
WHERE person_id = p.person_id AND score_type = 'engagement_score'
ORDER BY computed_at DESC
LIMIT 1
), 0.50) AS engagement_score
FROM crm_leads cl
INNER JOIN crm_people p ON p.person_id = cl.person_id
WHERE cl.lead_id = $1::uuid
LIMIT 1
""",
canonical_lead_id,
)
if row:
return dict(row)
if _is_uuid(legacy_lead_id):
row = await conn.fetchrow(
"""
SELECT
p.person_id::text AS person_id,
p.full_name,
p.primary_phone,
p.buyer_type,
p.legacy_li_id::text AS legacy_li_id,
cl.lead_id::text AS lead_id,
cl.status AS lead_status,
cl.budget_band,
cl.urgency,
COALESCE((
SELECT current_value
FROM intel_qd_scores
WHERE person_id = p.person_id AND score_type = 'engagement_score'
ORDER BY computed_at DESC
LIMIT 1
), 0.50) AS engagement_score
FROM crm_people p
LEFT JOIN crm_leads cl ON cl.person_id = p.person_id
WHERE p.legacy_li_id = $1::uuid
ORDER BY cl.created_at DESC NULLS LAST
LIMIT 1
""",
legacy_lead_id,
)
if row:
return dict(row)
return None
async def _ensure_canonical_interaction(
conn: asyncpg.Connection,
*,
person_id: str,
canonical_lead_id: str | None,
session_id: str,
session_mode: str,
video_asset_id: str | None,
) -> str:
existing = await conn.fetchrow(
"""
SELECT interaction_id::text
FROM intel_interactions
WHERE source_ref = $1
AND channel = 'perception_session'::intel_channel
ORDER BY created_at DESC
LIMIT 1
""",
session_id,
)
if existing:
return existing["interaction_id"]
row = await conn.fetchrow(
"""
INSERT INTO intel_interactions (
person_id, lead_id, channel, interaction_type, source_ref, summary, metadata_json
)
VALUES (
$1::uuid,
$2::uuid,
'perception_session'::intel_channel,
'sentinel_live_session',
$3,
$4,
$5::jsonb
)
ON CONFLICT DO NOTHING
RETURNING interaction_id::text
""",
person_id,
canonical_lead_id if _is_uuid(canonical_lead_id) else None,
session_id,
f"Sentinel live session ({session_mode})",
json.dumps(
{
"session_id": session_id,
"session_mode": session_mode,
"video_asset_id": video_asset_id,
}
),
)
return row["interaction_id"]
async def _persist_canonical_qd(
conn: asyncpg.Connection,
*,
person_id: str,
interaction_id: str,
session_id: str,
scene_label: str | None,
video_ts_ms: int,
result_qd_score: int,
baseline_score: int,
blend_shapes: dict[str, float],
session_mode: str,
) -> None:
normalized_score = max(0.0, min(1.0, result_qd_score / 100.0))
normalized_delta = max(-1.0, min(1.0, (result_qd_score - baseline_score) / 100.0))
event_type = "engagement_sample"
if result_qd_score - baseline_score >= 8:
event_type = "engagement_spike"
elif result_qd_score - baseline_score <= -8:
event_type = "negative_shift"
metadata = {
"interaction_id": interaction_id,
"scene_label": scene_label,
"video_ts_ms": video_ts_ms,
"qd_before": baseline_score,
"qd_after": result_qd_score,
"delta": result_qd_score - baseline_score,
"session_mode": session_mode,
"blend_shapes": blend_shapes,
}
await conn.execute(
"""
INSERT INTO intel_perception_events (
person_id, session_ref, event_type, engagement_score, media_ref, happened_at, metadata_json
)
VALUES ($1::uuid, $2, $3, $4, $5, NOW(), $6::jsonb)
""",
person_id,
session_id,
event_type,
normalized_score,
scene_label or f"video_ts:{video_ts_ms}",
json.dumps(metadata),
)
await conn.execute(
"""
INSERT INTO intel_qd_scores (
person_id, score_type, current_value, computed_at, evidence_refs_json, reasoning, metadata_json
)
VALUES (
$1::uuid,
'engagement_score',
$2,
NOW(),
$3::jsonb,
$4,
$5::jsonb
)
ON CONFLICT (person_id, score_type)
DO UPDATE SET
current_value = EXCLUDED.current_value,
computed_at = EXCLUDED.computed_at,
evidence_refs_json = EXCLUDED.evidence_refs_json,
reasoning = EXCLUDED.reasoning,
metadata_json = EXCLUDED.metadata_json
""",
person_id,
normalized_score,
json.dumps([session_id, interaction_id]),
f"Sentinel session updated engagement to {result_qd_score}/100",
json.dumps(metadata),
)
await conn.execute(
"""
INSERT INTO intel_qd_timeseries (
person_id, score_type, signal_source, timestamp, value, delta, evidence_ref, metadata_json
)
VALUES (
$1::uuid,
'engagement_score',
'sentinel_live_session',
NOW(),
$2,
$3,
$4,
$5::jsonb
)
""",
person_id,
normalized_score,
normalized_delta,
session_id,
json.dumps(metadata),
)
@router.websocket("/ws/notifications")
async def notifications_ws(ws: WebSocket) -> None:
await manager.connect(ws, "notifications")
@@ -145,6 +402,8 @@ async def perception_ws(ws: WebSocket) -> None:
if packet.get("event") != "BIOMETRIC_PACKET":
continue
person_id = packet.get("person_id")
canonical_lead_id = packet.get("canonical_lead_id")
lead_id = packet.get("lead_id")
session_id = packet.get("session_id")
session_mode = packet.get("session_mode", "assigned")
@@ -172,23 +431,31 @@ async def perception_ws(ws: WebSocket) -> None:
try:
async with pool.acquire() as conn:
async with conn.transaction():
canonical = await _resolve_canonical_context(
conn,
person_id=person_id,
canonical_lead_id=canonical_lead_id,
legacy_lead_id=lid,
)
await _ensure_session_row(
conn,
session_id=sid,
session_mode=mode,
lead_id=lid,
lead_id=(canonical["legacy_li_id"] if canonical and canonical.get("legacy_li_id") else lid),
video_asset_id=asset_id,
)
lead_row = None
if _is_uuid(lid):
legacy_lead_id = canonical["legacy_li_id"] if canonical and canonical.get("legacy_li_id") else lid
if _is_uuid(legacy_lead_id):
lead_row = await conn.fetchrow(
"""
SELECT quantum_dynamics_score, budget, interest, tags
FROM leads_intelligence
WHERE id = $1::uuid
""",
lid,
legacy_lead_id,
)
session_row = await conn.fetchrow(
@@ -202,7 +469,9 @@ async def perception_ws(ws: WebSocket) -> None:
scene_label = await _resolve_scene_label(conn, asset_id, bts)
crm = {
"budget": (lead_row["budget"] if lead_row else None) or "unknown",
"budget": (canonical["budget_band"] if canonical and canonical.get("budget_band") else None)
or (lead_row["budget"] if lead_row else None)
or "unknown",
"interest": (lead_row["interest"] if lead_row else None) or "unknown",
"prior_interaction_count": await conn.fetchval(
"""
@@ -210,26 +479,31 @@ async def perception_ws(ws: WebSocket) -> None:
FROM omnichannel_logs
WHERE lead_id = $1::uuid
""",
lid,
legacy_lead_id,
)
if _is_uuid(lid)
if _is_uuid(legacy_lead_id)
else 0,
"tags": list((lead_row["tags"] if lead_row else None) or []),
"session_mode": mode,
}
baseline_score = (
lead_row["quantum_dynamics_score"]
if lead_row and lead_row["quantum_dynamics_score"] is not None
else (
int(round(float(canonical["engagement_score"]) * 100))
if canonical and canonical.get("engagement_score") is not None
else ((session_row["final_qd_score"] if session_row else None) or 50)
)
)
result = await score_qd(
lead_id=lid or sid,
lead_id=(canonical["person_id"] if canonical else None) or lid or sid,
batch_id=sid,
blend_shapes=bs,
video_ts_ms=bts,
scene_label=scene_label,
crm_context=crm,
current_qd_score=(
lead_row["quantum_dynamics_score"]
if lead_row
else (session_row["final_qd_score"] if session_row else 50)
),
current_qd_score=baseline_score,
)
evidence = dict((session_row["auto_mode_evidence"] if session_row else {}) or {})
@@ -237,6 +511,7 @@ async def perception_ws(ws: WebSocket) -> None:
{
"last_scene_label": scene_label,
"last_video_ts_ms": bts,
"person_id": canonical["person_id"] if canonical else None,
}
)
await conn.execute(
@@ -251,21 +526,44 @@ async def perception_ws(ws: WebSocket) -> None:
sid,
)
if lead_row and _is_uuid(lid):
if canonical and _is_uuid(canonical["person_id"]):
interaction_id = await _ensure_canonical_interaction(
conn,
person_id=canonical["person_id"],
canonical_lead_id=canonical["lead_id"],
session_id=sid,
session_mode=mode,
video_asset_id=asset_id,
)
await _persist_canonical_qd(
conn,
person_id=canonical["person_id"],
interaction_id=interaction_id,
session_id=sid,
scene_label=scene_label,
video_ts_ms=bts,
result_qd_score=result.qd_score,
baseline_score=baseline_score,
blend_shapes=bs,
session_mode=mode,
)
if lead_row and _is_uuid(legacy_lead_id):
await conn.execute(
"""
INSERT INTO omnichannel_logs (event_type, lead_id, payload, video_timestamp_ms)
VALUES ('SENTIMENT_SPIKE', $1::uuid, $2::jsonb, $3)
""",
lid,
legacy_lead_id,
json.dumps(
{
"blend_shapes": bs,
"scene_label": scene_label,
"qd_before": lead_row["quantum_dynamics_score"],
"qd_before": baseline_score,
"qd_after": result.qd_score,
"confidence": result.confidence,
"session_id": sid,
"person_id": canonical["person_id"] if canonical else None,
}
),
bts,
@@ -277,21 +575,17 @@ async def perception_ws(ws: WebSocket) -> None:
WHERE id = $2::uuid
""",
result.qd_score,
lid,
legacy_lead_id,
)
baseline = (
lead_row["quantum_dynamics_score"]
if lead_row and lead_row["quantum_dynamics_score"] is not None
else ((session_row["final_qd_score"] if session_row else None) or 50)
)
event = {
"type": "QD_UPDATED",
"data": {
"lead_id": lid,
"person_id": canonical["person_id"] if canonical else None,
"lead_id": legacy_lead_id,
"session_id": sid,
"qd_score": result.qd_score,
"delta": result.qd_score - baseline,
"delta": result.qd_score - baseline_score,
"reasoning": result.reasoning,
"scene_label": scene_label,
"timestamp": datetime.now(timezone.utc).isoformat(),
@@ -322,7 +616,10 @@ class TagLeadRequest(BaseModel):
class SessionCompleteRequest(BaseModel):
session_id: str
session_mode: str
person_id: str | None = None
canonical_lead_id: str | None = None
lead_id: str | None = None
lead_name: str | None = None
final_qd_score: int | None = None
@@ -363,6 +660,13 @@ async def complete_session(
lead_id=body.lead_id,
video_asset_id=None,
)
canonical = await _resolve_canonical_context(
conn,
person_id=body.person_id,
canonical_lead_id=body.canonical_lead_id,
legacy_lead_id=body.lead_id,
)
await conn.execute(
"""
UPDATE perception_sessions
@@ -374,6 +678,28 @@ async def complete_session(
body.session_id,
)
if canonical and body.final_qd_score is not None:
interaction_id = await _ensure_canonical_interaction(
conn,
person_id=canonical["person_id"],
canonical_lead_id=canonical["lead_id"],
session_id=body.session_id,
session_mode=body.session_mode,
video_asset_id=None,
)
await _persist_canonical_qd(
conn,
person_id=canonical["person_id"],
interaction_id=interaction_id,
session_id=body.session_id,
scene_label="session_complete",
video_ts_ms=0,
result_qd_score=body.final_qd_score,
baseline_score=body.final_qd_score,
blend_shapes={},
session_mode=body.session_mode,
)
if body.session_mode == "auto":
result = await auto_mode_match_session(conn, session_id=body.session_id)
event = {

View File

@@ -300,12 +300,15 @@ async def get_contact_list(
p.primary_email,
p.primary_phone,
p.buyer_type,
p.legacy_li_id,
p.created_at,
cl.lead_id,
cl.status AS lead_status,
cl.budget_band,
cl.urgency,
pi.project_name AS primary_interest,
COALESCE(qs.intent_value, 0.0) AS intent_score,
COALESCE(qs.engagement_value, qs.intent_value, 0.0) AS engagement_score,
COALESCE(qs.urgency_value, 0.0) AS urgency_score,
(SELECT COUNT(*) FROM intel_interactions ii WHERE ii.person_id = p.person_id) AS interaction_count,
(SELECT MAX(happened_at) FROM intel_interactions ii WHERE ii.person_id = p.person_id) AS last_interaction_at,
@@ -318,9 +321,17 @@ async def get_contact_list(
ORDER BY created_at DESC
LIMIT 1
) cl ON TRUE
LEFT JOIN LATERAL (
SELECT project_name
FROM crm_property_interests
WHERE person_id = p.person_id
ORDER BY priority ASC, created_at DESC
LIMIT 1
) pi ON TRUE
LEFT JOIN LATERAL (
SELECT
MAX(CASE WHEN score_type = 'intent_score' THEN current_value END) AS intent_value,
MAX(CASE WHEN score_type = 'engagement_score' THEN current_value END) AS engagement_value,
MAX(CASE WHEN score_type = 'urgency_score' THEN current_value END) AS urgency_value
FROM intel_qd_scores
WHERE person_id = p.person_id
@@ -350,10 +361,13 @@ async def get_contact_list(
"primary_phone": r["primary_phone"],
"buyer_type": r["buyer_type"],
"lead_id": str(r["lead_id"]) if r["lead_id"] else None,
"legacy_li_id": str(r["legacy_li_id"]) if r["legacy_li_id"] else None,
"lead_status": r["lead_status"],
"budget_band": r["budget_band"],
"urgency": r["urgency"],
"primary_interest": r["primary_interest"],
"intent_score": float(r["intent_score"]),
"engagement_score": float(r["engagement_score"]),
"urgency_score": float(r["urgency_score"]),
"interaction_count": int(r["interaction_count"]),
"last_interaction_at": r["last_interaction_at"].isoformat() if r["last_interaction_at"] else None,