forked from sagnik/Project_Velocity
Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local> Reviewed-on: sagnik/Project_Velocity#29
659 lines
25 KiB
Python
659 lines
25 KiB
Python
"""
|
|
routes_mobile_edge.py
|
|
─────────────────────
|
|
Mobile Edge API — serves iPhone Edge and Android Phone Edge apps.
|
|
|
|
Surfaces:
|
|
GET /mobile-edge/events — communication events for a lead
|
|
POST /mobile-edge/events — log a new communication event
|
|
GET /mobile-edge/memory — memory facts for a lead
|
|
POST /mobile-edge/imports — operator-assisted import of a recording/note
|
|
POST /mobile-edge/notes — quick note attached to a lead
|
|
GET /mobile-edge/calendar — calendar events for the authed user
|
|
POST /mobile-edge/calendar — create a calendar event
|
|
PATCH /mobile-edge/calendar/{id} — update a calendar event
|
|
DELETE /mobile-edge/calendar/{id} — cancel a calendar event
|
|
GET /mobile-edge/transcripts/{id} — transcript segments for an event
|
|
GET /mobile-edge/insights/{lead_id}— insight recommendations for a lead
|
|
POST /mobile-edge/insights/{id}/act — act on or dismiss an insight
|
|
GET /mobile-edge/alerts — active alerts for the authed user
|
|
POST /mobile-edge/session — register a surface session heartbeat
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
from typing import Any, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
|
|
from pydantic import BaseModel, Field
|
|
|
|
from backend.auth.dependencies import get_current_user
|
|
|
|
logger = logging.getLogger("velocity.mobile_edge")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _pool(request: Request):
|
|
pool = request.app.state.db_pool
|
|
if pool is None:
|
|
raise HTTPException(status_code=503, detail="Database unavailable.")
|
|
return pool
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(UTC).isoformat()
|
|
|
|
|
|
# ── Pydantic models ───────────────────────────────────────────────────────────
|
|
|
|
VALID_CHANNELS = {
|
|
"pstn", "whatsapp_message", "whatsapp_voice", "whatsapp_video",
|
|
"email", "facebook_message", "instagram_message", "in_app_voip", "manual_note",
|
|
}
|
|
|
|
VALID_CAPTURE_MODES = {"direct_api", "provider_routed", "operator_import", "operator_note"}
|
|
|
|
VALID_DIRECTIONS = {"inbound", "outbound"}
|
|
|
|
VALID_CONSENT = {"unknown", "granted", "denied", "not_required"}
|
|
|
|
|
|
class CommunicationEventCreate(BaseModel):
|
|
lead_id: str
|
|
channel: str
|
|
direction: str = "inbound"
|
|
provider: Optional[str] = None
|
|
capture_mode: str
|
|
consent_state: str = "unknown"
|
|
duration_seconds: Optional[int] = None
|
|
summary: Optional[str] = None
|
|
raw_reference: Optional[str] = None
|
|
recording_ref: Optional[str] = None
|
|
provider_metadata: dict = Field(default_factory=dict)
|
|
|
|
|
|
class ImportCreate(BaseModel):
|
|
lead_id: str
|
|
channel: str
|
|
capture_mode: str = "operator_import"
|
|
recording_ref: Optional[str] = None
|
|
summary: Optional[str] = None
|
|
consent_state: str = "granted"
|
|
|
|
|
|
class NoteCreate(BaseModel):
|
|
lead_id: str
|
|
note_text: str
|
|
fact_type: str = "custom"
|
|
effective_date: Optional[str] = None
|
|
|
|
|
|
class CalendarEventCreate(BaseModel):
|
|
lead_id: Optional[str] = None
|
|
source_event_id: Optional[str] = None
|
|
title: str
|
|
description: Optional[str] = None
|
|
start_at: str # ISO8601
|
|
end_at: str # ISO8601
|
|
all_day: bool = False
|
|
reminder_minutes: list[int] = Field(default_factory=lambda: [15])
|
|
location: Optional[str] = None
|
|
metadata: dict = Field(default_factory=dict)
|
|
|
|
|
|
class CalendarEventUpdate(BaseModel):
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
start_at: Optional[str] = None
|
|
end_at: Optional[str] = None
|
|
status: Optional[str] = None
|
|
reminder_minutes: Optional[list[int]] = None
|
|
location: Optional[str] = None
|
|
|
|
|
|
class InsightActionRequest(BaseModel):
|
|
action: str = Field(..., pattern="^(accepted|dismissed|acted_upon)$")
|
|
|
|
|
|
class SessionHeartbeat(BaseModel):
|
|
surface_type: str
|
|
app_version: str
|
|
screen: Optional[str] = None
|
|
metadata: dict = Field(default_factory=dict)
|
|
|
|
|
|
# ── Communication Events ───────────────────────────────────────────────────────
|
|
|
|
@router.get("/events", summary="List communication events for a lead")
|
|
async def list_events(
|
|
request: Request,
|
|
lead_id: str = Query(..., description="Lead ID to fetch events for"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Return paginated communication events for a given lead, newest first."""
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT event_id, lead_id, channel, direction, provider, capture_mode,
|
|
consent_state, timestamp, duration_seconds, summary, raw_reference,
|
|
recording_ref, provider_metadata, created_at
|
|
FROM edge_communication_events
|
|
WHERE tenant_id = $1 AND lead_id = $2
|
|
ORDER BY timestamp DESC
|
|
LIMIT $3 OFFSET $4
|
|
""",
|
|
user.role, # tenant_id derived from role scope; production uses dedicated tenant field
|
|
lead_id, limit, offset,
|
|
)
|
|
total = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM edge_communication_events WHERE tenant_id = $1 AND lead_id = $2",
|
|
user.role, lead_id,
|
|
)
|
|
return {
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"events": [dict(r) for r in rows],
|
|
}
|
|
|
|
|
|
@router.post("/events", status_code=status.HTTP_201_CREATED, summary="Log a communication event")
|
|
async def create_event(
|
|
request: Request,
|
|
body: CommunicationEventCreate,
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""
|
|
Create a new communication event record.
|
|
Supports all three capture modes: direct_api, provider_routed, operator_import.
|
|
"""
|
|
if body.channel not in VALID_CHANNELS:
|
|
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
|
|
if body.capture_mode not in VALID_CAPTURE_MODES:
|
|
raise HTTPException(400, f"Invalid capture_mode. Valid: {sorted(VALID_CAPTURE_MODES)}")
|
|
if body.direction not in VALID_DIRECTIONS:
|
|
raise HTTPException(400, "direction must be 'inbound' or 'outbound'")
|
|
if body.consent_state not in VALID_CONSENT:
|
|
raise HTTPException(400, f"Invalid consent_state. Valid: {sorted(VALID_CONSENT)}")
|
|
|
|
pool = _pool(request)
|
|
import json
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO edge_communication_events (
|
|
tenant_id, lead_id, channel, direction, provider, capture_mode,
|
|
consent_state, duration_seconds, summary, raw_reference,
|
|
recording_ref, provider_metadata
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12::jsonb)
|
|
RETURNING event_id, created_at
|
|
""",
|
|
user.role, body.lead_id, body.channel, body.direction, body.provider,
|
|
body.capture_mode, body.consent_state, body.duration_seconds,
|
|
body.summary, body.raw_reference, body.recording_ref,
|
|
json.dumps(body.provider_metadata),
|
|
)
|
|
logger.info("Created communication event %s for lead %s", row["event_id"], body.lead_id)
|
|
return {"event_id": str(row["event_id"]), "created_at": str(row["created_at"])}
|
|
|
|
|
|
# ── Communication Memory Facts ────────────────────────────────────────────────
|
|
|
|
@router.get("/memory", summary="List memory facts for a lead")
|
|
async def list_memory_facts(
|
|
request: Request,
|
|
lead_id: str = Query(...),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT fact_id, lead_id, event_id, fact_type, fact_text,
|
|
effective_date, confidence, extracted_from, is_confirmed,
|
|
confirmed_by, confirmed_at, created_at
|
|
FROM edge_communication_memory_facts
|
|
WHERE tenant_id = $1 AND lead_id = $2
|
|
ORDER BY created_at DESC
|
|
LIMIT $3 OFFSET $4
|
|
""",
|
|
user.role, lead_id, limit, offset,
|
|
)
|
|
total = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM edge_communication_memory_facts WHERE tenant_id=$1 AND lead_id=$2",
|
|
user.role, lead_id,
|
|
)
|
|
return {"total": total, "limit": limit, "offset": offset, "facts": [dict(r) for r in rows]}
|
|
|
|
|
|
# ── Operator-Assisted Import ──────────────────────────────────────────────────
|
|
|
|
@router.post("/imports", status_code=status.HTTP_201_CREATED, summary="Operator-assisted import")
|
|
async def create_import(
|
|
request: Request,
|
|
body: ImportCreate,
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""
|
|
Mode C import: user uploads recording ref or confirms a note manually.
|
|
Creates an event with capture_mode = 'operator_import' and triggers a
|
|
transcription job if a recording_ref is supplied.
|
|
"""
|
|
if body.channel not in VALID_CHANNELS:
|
|
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
|
|
|
|
pool = _pool(request)
|
|
import json
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
event_row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO edge_communication_events (
|
|
tenant_id, lead_id, channel, direction, capture_mode,
|
|
consent_state, recording_ref, summary
|
|
) VALUES ($1,$2,$3,'inbound',$4,$5,$6,$7)
|
|
RETURNING event_id, created_at
|
|
""",
|
|
user.role, body.lead_id, body.channel, body.capture_mode,
|
|
body.consent_state, body.recording_ref, body.summary,
|
|
)
|
|
event_id = event_row["event_id"]
|
|
|
|
job_id = None
|
|
if body.recording_ref:
|
|
job_row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO edge_transcription_jobs (
|
|
tenant_id, event_id, media_type, consent_state
|
|
) VALUES ($1,$2,'audio',$3)
|
|
RETURNING transcription_job_id
|
|
""",
|
|
user.role, event_id, body.consent_state,
|
|
)
|
|
job_id = str(job_row["transcription_job_id"])
|
|
|
|
return {
|
|
"event_id": str(event_id),
|
|
"transcription_job_id": job_id,
|
|
"created_at": str(event_row["created_at"]),
|
|
}
|
|
|
|
|
|
# ── Quick Notes ───────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/notes", status_code=status.HTTP_201_CREATED, summary="Create a quick note for a lead")
|
|
async def create_note(
|
|
request: Request,
|
|
body: NoteCreate,
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""
|
|
Create a manual memory fact from an operator note.
|
|
No event is created — this is a direct fact insertion.
|
|
"""
|
|
pool = _pool(request)
|
|
from datetime import date
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO edge_communication_memory_facts (
|
|
tenant_id, lead_id, fact_type, fact_text, effective_date,
|
|
extracted_from, confidence, is_confirmed
|
|
) VALUES ($1,$2,$3,$4,$5,'operator_note',1.0, TRUE)
|
|
RETURNING fact_id, created_at
|
|
""",
|
|
user.role, body.lead_id, body.fact_type, body.note_text,
|
|
body.effective_date,
|
|
)
|
|
return {"fact_id": str(row["fact_id"]), "created_at": str(row["created_at"])}
|
|
|
|
|
|
# ── Calendar ──────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/calendar", summary="Get calendar events for the authed user")
|
|
async def list_calendar_events(
|
|
request: Request,
|
|
from_date: Optional[str] = Query(None),
|
|
to_date: Optional[str] = Query(None),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
if from_date and to_date:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
|
|
all_day, status, reminder_minutes, created_by, location, metadata, created_at
|
|
FROM user_calendar_events
|
|
WHERE tenant_id=$1 AND owner_user_id=$2
|
|
AND start_at >= $3::timestamptz AND end_at <= $4::timestamptz
|
|
ORDER BY start_at ASC LIMIT $5
|
|
""",
|
|
user.role, user.user_id, from_date, to_date, limit,
|
|
)
|
|
else:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
|
|
all_day, status, reminder_minutes, created_by, location, metadata, created_at
|
|
FROM user_calendar_events
|
|
WHERE tenant_id=$1 AND owner_user_id=$2
|
|
ORDER BY start_at ASC LIMIT $3
|
|
""",
|
|
user.role, user.user_id, limit,
|
|
)
|
|
return {"events": [dict(r) for r in rows]}
|
|
|
|
|
|
@router.post("/calendar", status_code=status.HTTP_201_CREATED, summary="Create a calendar event")
|
|
async def create_calendar_event(
|
|
request: Request,
|
|
body: CalendarEventCreate,
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
import json
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO user_calendar_events (
|
|
tenant_id, owner_user_id, lead_id, source_event_id, title, description,
|
|
start_at, end_at, all_day, reminder_minutes, created_by, location, metadata
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7::timestamptz,$8::timestamptz,$9,$10,$11,$12,$13::jsonb)
|
|
RETURNING calendar_event_id, created_at
|
|
""",
|
|
user.role, user.user_id, body.lead_id, body.source_event_id,
|
|
body.title, body.description, body.start_at, body.end_at,
|
|
body.all_day, body.reminder_minutes, "user",
|
|
body.location, json.dumps(body.metadata),
|
|
)
|
|
return {"calendar_event_id": str(row["calendar_event_id"]), "created_at": str(row["created_at"])}
|
|
|
|
|
|
@router.patch("/calendar/{calendar_event_id}", summary="Update a calendar event")
|
|
async def update_calendar_event(
|
|
calendar_event_id: str,
|
|
request: Request,
|
|
body: CalendarEventUpdate,
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
# Build partial update
|
|
updates: list[str] = []
|
|
values: list[Any] = []
|
|
idx = 1
|
|
|
|
def _add(col: str, val: Any):
|
|
nonlocal idx
|
|
updates.append(f"{col} = ${idx}")
|
|
values.append(val)
|
|
idx += 1
|
|
|
|
if body.title is not None: _add("title", body.title)
|
|
if body.description is not None: _add("description", body.description)
|
|
if body.start_at is not None: _add("start_at", body.start_at)
|
|
if body.end_at is not None: _add("end_at", body.end_at)
|
|
if body.status is not None: _add("status", body.status)
|
|
if body.reminder_minutes is not None: _add("reminder_minutes", body.reminder_minutes)
|
|
if body.location is not None: _add("location", body.location)
|
|
|
|
if not updates:
|
|
raise HTTPException(400, "No fields to update")
|
|
|
|
_add("updated_at", datetime.now(UTC))
|
|
_add("tenant_id", user.role)
|
|
_add("owner_user_id", user.user_id)
|
|
values.append(calendar_event_id)
|
|
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
f"""
|
|
UPDATE user_calendar_events
|
|
SET {', '.join(updates)}
|
|
WHERE tenant_id=${idx} AND owner_user_id=${idx+1} AND calendar_event_id=${idx+2}
|
|
""",
|
|
*values,
|
|
)
|
|
if result == "UPDATE 0":
|
|
raise HTTPException(404, "Calendar event not found or not owned by you")
|
|
return {"status": "updated"}
|
|
|
|
|
|
@router.delete("/calendar/{calendar_event_id}", summary="Cancel a calendar event")
|
|
async def delete_calendar_event(
|
|
calendar_event_id: str,
|
|
request: Request,
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE user_calendar_events
|
|
SET status='cancelled', updated_at=NOW()
|
|
WHERE tenant_id=$1 AND owner_user_id=$2 AND calendar_event_id=$3
|
|
""",
|
|
user.role, user.user_id, calendar_event_id,
|
|
)
|
|
if result == "UPDATE 0":
|
|
raise HTTPException(404, "Calendar event not found or not owned by you")
|
|
return {"status": "cancelled"}
|
|
|
|
|
|
# ── Transcripts ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/transcripts/{event_id}", summary="Get transcript segments for an event")
|
|
async def get_transcript(
|
|
event_id: str,
|
|
request: Request,
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
job = await conn.fetchrow(
|
|
"""
|
|
SELECT j.transcription_job_id, j.status, j.provider, j.speaker_count,
|
|
j.word_count, j.language, j.completed_at
|
|
FROM edge_transcription_jobs j
|
|
JOIN edge_communication_events e ON e.event_id = j.event_id
|
|
WHERE j.event_id = $1 AND e.tenant_id = $2
|
|
ORDER BY j.created_at DESC LIMIT 1
|
|
""",
|
|
event_id, user.role,
|
|
)
|
|
if not job:
|
|
raise HTTPException(404, "No transcription job found for this event")
|
|
|
|
segments = await conn.fetch(
|
|
"""
|
|
SELECT segment_id, speaker_label, start_ms, end_ms, text, confidence, is_agent_turn
|
|
FROM edge_transcript_segments
|
|
WHERE transcription_job_id = $1
|
|
ORDER BY start_ms ASC
|
|
""",
|
|
job["transcription_job_id"],
|
|
)
|
|
|
|
return {
|
|
"job": dict(job),
|
|
"segments": [dict(s) for s in segments],
|
|
}
|
|
|
|
|
|
# ── Insights ──────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/insights/{lead_id}", summary="Get insight recommendations for a lead")
|
|
async def get_insights(
|
|
lead_id: str,
|
|
request: Request,
|
|
status_filter: Optional[str] = Query(None, alias="status"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
if status_filter:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
|
|
summary, suggested_action, target_system, status, confidence, created_at
|
|
FROM insight_recommendations
|
|
WHERE tenant_id=$1 AND lead_id=$2 AND status=$3
|
|
ORDER BY created_at DESC LIMIT $4
|
|
""",
|
|
user.role, lead_id, status_filter, limit,
|
|
)
|
|
else:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
|
|
summary, suggested_action, target_system, status, confidence, created_at
|
|
FROM insight_recommendations
|
|
WHERE tenant_id=$1 AND lead_id=$2
|
|
ORDER BY created_at DESC LIMIT $3
|
|
""",
|
|
user.role, lead_id, limit,
|
|
)
|
|
return {"insights": [dict(r) for r in rows]}
|
|
|
|
|
|
@router.post("/insights/{recommendation_id}/act", summary="Act on or dismiss an insight")
|
|
async def act_on_insight(
|
|
recommendation_id: str,
|
|
request: Request,
|
|
body: InsightActionRequest,
|
|
user=Depends(get_current_user),
|
|
):
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE insight_recommendations
|
|
SET status=$1, acted_by=$2, acted_at=NOW(), updated_at=NOW()
|
|
WHERE recommendation_id=$3 AND tenant_id=$4
|
|
""",
|
|
body.action, user.user_id, recommendation_id, user.role,
|
|
)
|
|
if result == "UPDATE 0":
|
|
raise HTTPException(404, "Insight not found")
|
|
return {"status": body.action}
|
|
|
|
|
|
# ── Alerts ────────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/alerts", summary="Get active alerts for the authed user")
|
|
async def get_alerts(
|
|
request: Request,
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""
|
|
Returns a combined, prioritized view of:
|
|
- Pending insights needing action
|
|
- Calendar events due within 24 hours
|
|
- Pending transcription jobs
|
|
"""
|
|
pool = _pool(request)
|
|
async with pool.acquire() as conn:
|
|
pending_insights = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM insight_recommendations WHERE tenant_id=$1 AND status='pending'",
|
|
user.role,
|
|
)
|
|
upcoming_events = await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(*) FROM user_calendar_events
|
|
WHERE tenant_id=$1 AND owner_user_id=$2
|
|
AND status='confirmed'
|
|
AND start_at BETWEEN NOW() AND NOW() + INTERVAL '24 hours'
|
|
""",
|
|
user.role, user.user_id,
|
|
)
|
|
pending_transcriptions = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM edge_transcription_jobs WHERE tenant_id=$1 AND status='pending'",
|
|
user.role,
|
|
)
|
|
|
|
return {
|
|
"pending_insights": pending_insights,
|
|
"upcoming_calendar_events_24h": upcoming_events,
|
|
"pending_transcriptions": pending_transcriptions,
|
|
"generated_at": _now(),
|
|
}
|
|
|
|
|
|
# ── Session Heartbeat ─────────────────────────────────────────────────────────
|
|
|
|
@router.post("/session", status_code=status.HTTP_200_OK, summary="Register surface session heartbeat")
|
|
async def session_heartbeat(
|
|
request: Request,
|
|
body: SessionHeartbeat,
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Upsert a surface session to track cross-surface activity."""
|
|
valid_surfaces = {
|
|
"webos", "ipad", "android_tablet", "iphone_edge", "android_phone_edge",
|
|
}
|
|
if body.surface_type not in valid_surfaces:
|
|
raise HTTPException(400, f"Invalid surface_type. Valid: {sorted(valid_surfaces)}")
|
|
|
|
pool = _pool(request)
|
|
import json
|
|
async with pool.acquire() as conn:
|
|
existing_session_id = await conn.fetchval(
|
|
"""
|
|
SELECT session_id
|
|
FROM surface_sessions
|
|
WHERE tenant_id=$1 AND user_id=$2 AND surface_type=$3
|
|
AND ended_at IS NULL
|
|
AND last_active_at > NOW() - INTERVAL '30 minutes'
|
|
ORDER BY last_active_at DESC
|
|
LIMIT 1
|
|
""",
|
|
user.role, user.user_id, body.surface_type,
|
|
)
|
|
|
|
if existing_session_id:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE surface_sessions
|
|
SET last_active_at=NOW(),
|
|
app_version=$1,
|
|
metadata=$2::jsonb,
|
|
screen_sequence = CASE
|
|
WHEN $3::text IS NULL THEN screen_sequence
|
|
ELSE array_append(screen_sequence, $3::text)
|
|
END
|
|
WHERE session_id=$4
|
|
""",
|
|
body.app_version, json.dumps(body.metadata), body.screen, existing_session_id,
|
|
)
|
|
else:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO surface_sessions (
|
|
tenant_id, user_id, surface_type, app_version, metadata, screen_sequence
|
|
)
|
|
VALUES (
|
|
$1, $2, $3, $4, $5::jsonb,
|
|
CASE
|
|
WHEN $6::text IS NULL THEN '{}'::text[]
|
|
ELSE ARRAY[$6::text]
|
|
END
|
|
)
|
|
""",
|
|
user.role, user.user_id, body.surface_type, body.app_version,
|
|
json.dumps(body.metadata), body.screen,
|
|
)
|
|
return {"status": "ok", "timestamp": _now()}
|