feat/#24 WebOS Completion (#25)

#24 WebOS Completion

Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local>
Reviewed-on: #25
This commit was merged in pull request #25.
This commit is contained in:
2026-04-18 18:59:04 +05:30
parent 857e0b88e6
commit 84e439712c
459 changed files with 11713 additions and 3853 deletions

View File

@@ -0,0 +1,529 @@
"""
routes_admin_surface.py
───────────────────────
Admin Control Plane API
Roles: Only 'admin' or 'superadmin' may access these endpoints.
Endpoints:
GET /admin-surface/health — system health overview
GET /admin-surface/queues — queue depth snapshot
GET /admin-surface/installs — surface session / install overview
POST /admin-surface/actions — submit an admin action
GET /admin-surface/actions — list admin action history
GET /admin-surface/actions/{id} — get a specific action
GET /admin-surface/logs — recent audit event log
GET /admin-surface/templates — template catalog summary (admin view)
POST /admin-surface/templates/{id}/publish — publish a template
POST /admin-surface/templates/{id}/archive — archive a template
GET /admin-surface/synthetic-jobs — list synthetic generation jobs
POST /admin-surface/synthetic-jobs/{id}/cancel — cancel a synthetic job
"""
from __future__ import annotations
import json
import logging
import uuid
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.admin_surface")
router = APIRouter()
# ── RBAC guard ────────────────────────────────────────────────────────────────
ADMIN_ROLES = {"admin", "superadmin", "ADMIN", "SUPERADMIN"}
def require_admin(user=Depends(get_current_user)):
normalized_role = user.role.upper()
if normalized_role not in {"ADMIN", "SUPERADMIN"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required.",
)
return user
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
# ── Pydantic Models ───────────────────────────────────────────────────────────
VALID_ACTION_TYPES = {
"user_create", "user_deactivate", "user_role_change",
"tenant_config_update", "inventory_batch_approve", "inventory_batch_reject",
"template_publish", "template_archive",
"synthetic_job_trigger", "synthetic_job_cancel",
"system_health_check", "queue_drain", "debug_event_export",
"install_register", "install_deregister",
}
class AdminActionRequest(BaseModel):
action_type: str
target_type: str
target_id: str
payload: dict = Field(default_factory=dict)
idempotency_key: Optional[str] = None
# ── System Health ─────────────────────────────────────────────────────────────
@router.get("/health", summary="System health overview")
async def get_health(
request: Request,
admin=Depends(require_admin),
):
"""
Returns an aggregated health snapshot covering DB pool, queue depths,
and basic surface session counts.
"""
pool = _pool(request)
async with pool.acquire() as conn:
# DB round-trip latency
import time
t0 = time.monotonic()
await conn.fetchval("SELECT 1")
db_latency_ms = round((time.monotonic() - t0) * 1000, 2)
# Pending jobs
pending_transcriptions = await conn.fetchval(
"SELECT COUNT(*) FROM edge_transcription_jobs WHERE status='pending'"
)
pending_synthetic_jobs = await conn.fetchval(
"SELECT COUNT(*) FROM oracle_synthetic_generation_jobs WHERE status IN ('pending','running')"
)
pending_admin_actions = await conn.fetchval(
"SELECT COUNT(*) FROM admin_action_events WHERE status='pending'"
)
pending_inventory_batches = await conn.fetchval(
"SELECT COUNT(*) FROM inventory_import_batches WHERE status IN ('pending','validating','processing')"
)
# Active surface sessions (last 30 min)
active_sessions = await conn.fetchval(
"SELECT COUNT(*) FROM surface_sessions WHERE last_active_at > NOW() - INTERVAL '30 minutes'"
)
# Surface breakdown
surface_breakdown = await conn.fetch(
"""
SELECT surface_type, COUNT(*) as count
FROM surface_sessions
WHERE last_active_at > NOW() - INTERVAL '30 minutes'
GROUP BY surface_type
"""
)
return {
"status": "ok",
"timestamp": datetime.now(UTC).isoformat(),
"database": {
"connected": True,
"latency_ms": db_latency_ms,
},
"queues": {
"pending_transcriptions": pending_transcriptions,
"pending_synthetic_jobs": pending_synthetic_jobs,
"pending_admin_actions": pending_admin_actions,
"pending_inventory_batches": pending_inventory_batches,
},
"active_sessions": {
"total": active_sessions,
"by_surface": {r["surface_type"]: r["count"] for r in surface_breakdown},
},
}
# ── Queue Visibility ──────────────────────────────────────────────────────────
@router.get("/queues", summary="Queue depth snapshot")
async def get_queues(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
transcription_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM edge_transcription_jobs
GROUP BY status ORDER BY status
"""
)
synthetic_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM oracle_synthetic_generation_jobs
GROUP BY status ORDER BY status
"""
)
inventory_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM inventory_import_batches
GROUP BY status ORDER BY status
"""
)
admin_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM admin_action_events
GROUP BY status ORDER BY status
"""
)
return {
"transcription_jobs": {r["status"]: r["count"] for r in transcription_queue},
"synthetic_jobs": {r["status"]: r["count"] for r in synthetic_queue},
"inventory_batches": {r["status"]: r["count"] for r in inventory_queue},
"admin_actions": {r["status"]: r["count"] for r in admin_queue},
"timestamp": datetime.now(UTC).isoformat(),
}
# ── Install / Surface Overview ────────────────────────────────────────────────
@router.get("/installs", summary="Surface session and install overview")
async def get_installs(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT surface_type, app_version, COUNT(*) as session_count,
MAX(last_active_at) as last_seen
FROM surface_sessions
GROUP BY surface_type, app_version
ORDER BY surface_type, app_version
"""
)
return {
"installs": [dict(r) for r in rows],
"timestamp": datetime.now(UTC).isoformat(),
}
# ── Admin Actions ─────────────────────────────────────────────────────────────
@router.post("/actions", status_code=status.HTTP_201_CREATED, summary="Submit an admin action")
async def submit_action(
request: Request,
body: AdminActionRequest,
admin=Depends(require_admin),
):
"""
Submit a bounded admin action. All actions are persisted with full audit trail.
Supported action_types are enumerated in VALID_ACTION_TYPES.
Actions are not auto-executed — they transition to 'pending' and must be
processed by the appropriate backend job or confirmed by a second admin.
(This prevents destructive mass-actions from running unreviewed.)
"""
if body.action_type not in VALID_ACTION_TYPES:
raise HTTPException(400, f"Invalid action_type. Valid: {sorted(VALID_ACTION_TYPES)}")
action_id = body.idempotency_key or str(uuid.uuid4())
pool = _pool(request)
async with pool.acquire() as conn:
try:
row = await conn.fetchrow(
"""
INSERT INTO admin_action_events (
tenant_id, action_id, action_type, target_type, target_id,
requested_by, payload
) VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb)
RETURNING action_event_id, status, created_at
""",
admin.role, action_id, body.action_type, body.target_type,
body.target_id, admin.user_id, json.dumps(body.payload),
)
except Exception as exc:
if "unique" in str(exc).lower():
raise HTTPException(409, "Action with this idempotency key already exists")
raise
logger.info(
"Admin action submitted: %s by %s%s/%s",
body.action_type, admin.user_id, body.target_type, body.target_id,
)
return {
"action_event_id": str(row["action_event_id"]),
"action_id": action_id,
"status": row["status"],
"created_at": str(row["created_at"]),
}
@router.get("/actions", summary="List admin action history")
async def list_actions(
request: Request,
action_type: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
where = "WHERE tenant_id = $1"
params: list[Any] = [admin.role]
idx = 2
if action_type:
where += f" AND action_type = ${idx}"; params.append(action_type); idx += 1
if status_filter:
where += f" AND status = ${idx}"; params.append(status_filter); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT action_event_id, action_id, action_type, target_type, target_id,
requested_by, status, result_message, executed_at, created_at
FROM admin_action_events
{where}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM admin_action_events {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "actions": [dict(r) for r in rows]}
@router.get("/actions/{action_event_id}", summary="Get a specific admin action")
async def get_action(
action_event_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM admin_action_events WHERE action_event_id=$1 AND tenant_id=$2",
action_event_id, admin.role,
)
if not row:
raise HTTPException(404, "Admin action not found")
return dict(row)
# ── Audit Log ─────────────────────────────────────────────────────────────────
@router.get("/logs", summary="Recent Oracle audit events")
async def get_audit_logs(
request: Request,
entity_type: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
if entity_type:
rows = await conn.fetch(
"""
SELECT audit_event_id, entity_type, entity_id, action, actor_id,
actor_type, correlation_id, details, created_at
FROM oracle_audit_events
WHERE tenant_id=$1 AND entity_type=$2
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
""",
admin.role, entity_type, limit, offset,
)
else:
rows = await conn.fetch(
"""
SELECT audit_event_id, entity_type, entity_id, action, actor_id,
actor_type, correlation_id, details, created_at
FROM oracle_audit_events
WHERE tenant_id=$1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
admin.role, limit, offset,
)
return {"logs": [dict(r) for r in rows]}
# ── Template Administration ───────────────────────────────────────────────────
@router.get("/templates", summary="Template catalog admin view")
async def get_templates_admin(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
where = "WHERE tenant_id = $1"
params: list[Any] = [admin.role]
idx = 2
if status_filter:
where += f" AND status = ${idx}"; params.append(status_filter); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.template_id, t.name, t.category, t.status, t.origin,
t.version, t.use_count, t.chapter_id, t.subchapter_id,
ch.name as chapter_name, sub.name as subchapter_name,
t.created_at, t.updated_at
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
{where}
ORDER BY t.updated_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM oracle_component_templates {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "templates": [dict(r) for r in rows]}
@router.post("/templates/{template_id}/publish", summary="Publish a template")
async def publish_template(
template_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_component_templates
SET status='catalog_active', updated_at=NOW()
WHERE template_id=$1 AND tenant_id=$2
""",
template_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Template not found")
logger.info("Template %s published by admin %s", template_id, admin.user_id)
return {"status": "published"}
@router.post("/templates/{template_id}/archive", summary="Archive a template")
async def archive_template(
template_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_component_templates
SET status='archived', updated_at=NOW()
WHERE template_id=$1 AND tenant_id=$2
""",
template_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Template not found")
logger.info("Template %s archived by admin %s", template_id, admin.user_id)
return {"status": "archived"}
# ── Template Chapter Admin ────────────────────────────────────────────────────
@router.get("/template-chapters", summary="List template chapters (admin view)")
async def list_chapters_admin(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT ch.chapter_id, ch.name, ch.description, ch.sort_order, ch.is_active,
COUNT(sub.subchapter_id) as subchapter_count
FROM oracle_template_chapters ch
LEFT JOIN oracle_template_subchapters sub ON sub.chapter_id = ch.chapter_id
WHERE ch.tenant_id=$1
GROUP BY ch.chapter_id
ORDER BY ch.sort_order ASC
""",
admin.role,
)
return {"chapters": [dict(r) for r in rows]}
# ── Synthetic Jobs Admin ──────────────────────────────────────────────────────
@router.get("/synthetic-jobs", summary="List synthetic generation jobs")
async def list_synthetic_jobs(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
if status_filter:
rows = await conn.fetch(
"""
SELECT job_id, template_id, model, status, requested_count,
accepted_count, created_by, started_at, completed_at, created_at
FROM oracle_synthetic_generation_jobs
WHERE tenant_id=$1 AND status=$2
ORDER BY created_at DESC LIMIT $3 OFFSET $4
""",
admin.role, status_filter, limit, offset,
)
else:
rows = await conn.fetch(
"""
SELECT job_id, template_id, model, status, requested_count,
accepted_count, created_by, started_at, completed_at, created_at
FROM oracle_synthetic_generation_jobs
WHERE tenant_id=$1
ORDER BY created_at DESC LIMIT $2 OFFSET $3
""",
admin.role, limit, offset,
)
return {"jobs": [dict(r) for r in rows]}
@router.post("/synthetic-jobs/{job_id}/cancel", summary="Cancel a synthetic generation job")
async def cancel_synthetic_job(
job_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_synthetic_generation_jobs
SET status='cancelled', updated_at=NOW()
WHERE job_id=$1 AND tenant_id=$2 AND status IN ('pending','running')
""",
job_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Job not found or already in terminal state")
return {"status": "cancelled"}

View File

@@ -0,0 +1,399 @@
"""
routes_inventory.py
───────────────────
Inventory Pipeline API
Endpoints:
POST /inventory/import-batches — create a new import batch
GET /inventory/import-batches — list import batches
GET /inventory/import-batches/{batch_id} — get batch status
POST /inventory/properties — create a single property
GET /inventory/properties — list properties
GET /inventory/properties/{property_id} — get a property
PATCH /inventory/properties/{property_id} — update a property
DELETE /inventory/properties/{property_id} — archive a property
POST /inventory/properties/{property_id}/media — attach media to a property
GET /inventory/properties/{property_id}/media — list media for a property
DELETE /inventory/media/{media_asset_id} — remove a media asset
"""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.inventory")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
# ── Pydantic Models ───────────────────────────────────────────────────────────
VALID_SOURCE_TYPES = {"csv", "json", "api_push", "manual"}
VALID_PROPERTY_STATUSES = {"active", "archived", "draft", "under_review"}
VALID_MEDIA_TYPES = {"image", "video", "floorplan", "brochure", "360", "vr"}
class ImportBatchCreate(BaseModel):
source_type: str
source_file_ref: Optional[str] = None
total_rows: int = 0
class PropertyCreate(BaseModel):
batch_id: Optional[str] = None
source_id: Optional[str] = None
project_name: str
developer_name: str
location: dict = Field(default_factory=dict) # {city, district, lat, lng}
property_type: str
price_bands: list[dict] = Field(default_factory=list)
unit_mix: list[dict] = Field(default_factory=list)
amenities: list[str] = Field(default_factory=list)
status: str = "draft"
validation_state: dict = Field(default_factory=dict)
class PropertyUpdate(BaseModel):
project_name: Optional[str] = None
developer_name: Optional[str] = None
location: Optional[dict] = None
property_type: Optional[str] = None
price_bands: Optional[list[dict]] = None
unit_mix: Optional[list[dict]] = None
amenities: Optional[list[str]] = None
status: Optional[str] = None
validation_state: Optional[dict] = None
class MediaAssetCreate(BaseModel):
media_type: str
url: str
thumbnail_url: Optional[str] = None
sort_order: int = 0
metadata: dict = Field(default_factory=dict)
# ── Import Batches ────────────────────────────────────────────────────────────
@router.post("/import-batches", status_code=status.HTTP_201_CREATED,
summary="Create an inventory import batch")
async def create_import_batch(
request: Request,
body: ImportBatchCreate,
user=Depends(get_current_user),
):
if body.source_type not in VALID_SOURCE_TYPES:
raise HTTPException(400, f"Invalid source_type. Valid: {sorted(VALID_SOURCE_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_import_batches
(tenant_id, source_type, submitted_by, total_rows, source_file_ref)
VALUES ($1, $2, $3, $4, $5)
RETURNING batch_id, status, created_at
""",
user.role, body.source_type, user.user_id, body.total_rows, body.source_file_ref,
)
return dict(row)
@router.get("/import-batches", summary="List import batches")
async def list_import_batches(
request: Request,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT batch_id, source_type, submitted_by, status, total_rows,
accepted_rows, rejected_rows, created_at, completed_at
FROM inventory_import_batches
WHERE tenant_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
user.role, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM inventory_import_batches WHERE tenant_id=$1", user.role,
)
return {"total": total, "limit": limit, "offset": offset, "batches": [dict(r) for r in rows]}
@router.get("/import-batches/{batch_id}", summary="Get import batch status")
async def get_import_batch(
batch_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT * FROM inventory_import_batches WHERE batch_id=$1 AND tenant_id=$2
""",
batch_id, user.role,
)
if not row:
raise HTTPException(404, "Batch not found")
return dict(row)
# ── Properties ────────────────────────────────────────────────────────────────
@router.post("/properties", status_code=status.HTTP_201_CREATED, summary="Create a property")
async def create_property(
request: Request,
body: PropertyCreate,
user=Depends(get_current_user),
):
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_properties (
tenant_id, batch_id, source_id, project_name, developer_name,
location, property_type, price_bands, unit_mix, amenities,
status, validation_state
) VALUES (
$1, $2, $3, $4, $5,
$6::jsonb, $7, $8::jsonb, $9::jsonb, $10,
$11, $12::jsonb
)
RETURNING property_id, created_at
""",
user.role, body.batch_id, body.source_id, body.project_name, body.developer_name,
json.dumps(body.location), body.property_type, json.dumps(body.price_bands),
json.dumps(body.unit_mix), body.amenities,
body.status, json.dumps(body.validation_state),
)
return {"property_id": str(row["property_id"]), "created_at": str(row["created_at"])}
@router.get("/properties", summary="List inventory properties")
async def list_properties(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
property_type: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where_clause = "WHERE tenant_id = $1"
params: list[Any] = [user.role]
idx = 2
if status_filter:
where_clause += f" AND status = ${idx}"
params.append(status_filter)
idx += 1
if property_type:
where_clause += f" AND property_type = ${idx}"
params.append(property_type)
idx += 1
rows = await conn.fetch(
f"""
SELECT property_id, project_name, developer_name, property_type,
location, price_bands, unit_mix, status, ingested_at, created_at
FROM inventory_properties
{where_clause}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM inventory_properties {where_clause}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "properties": [dict(r) for r in rows]}
@router.get("/properties/{property_id}", summary="Get a property")
async def get_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, user.role,
)
if not row:
raise HTTPException(404, "Property not found")
return dict(row)
@router.patch("/properties/{property_id}", summary="Update a property")
async def update_property(
property_id: str,
request: Request,
body: PropertyUpdate,
user=Depends(get_current_user),
):
updates: list[str] = []
values: list[Any] = []
idx = 1
def _add(col: str, val: Any, cast: str = ""):
nonlocal idx
updates.append(f"{col} = ${idx}{cast}")
values.append(val)
idx += 1
if body.project_name is not None: _add("project_name", body.project_name)
if body.developer_name is not None: _add("developer_name", body.developer_name)
if body.location is not None: _add("location", json.dumps(body.location), "::jsonb")
if body.property_type is not None: _add("property_type", body.property_type)
if body.price_bands is not None: _add("price_bands", json.dumps(body.price_bands), "::jsonb")
if body.unit_mix is not None: _add("unit_mix", json.dumps(body.unit_mix), "::jsonb")
if body.amenities is not None: _add("amenities", body.amenities)
if body.status is not None:
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
_add("status", body.status)
if body.validation_state is not None:
_add("validation_state", json.dumps(body.validation_state), "::jsonb")
if not updates:
raise HTTPException(400, "No fields to update")
_add("updated_at", datetime.now(UTC))
values.extend([property_id, user.role])
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
f"""
UPDATE inventory_properties
SET {', '.join(updates)}
WHERE property_id=${idx} AND tenant_id=${idx+1}
""",
*values,
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "updated"}
@router.delete("/properties/{property_id}", summary="Archive a property")
async def archive_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE inventory_properties
SET status='archived', updated_at=NOW()
WHERE property_id=$1 AND tenant_id=$2
""",
property_id, user.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "archived"}
# ── Media Assets ──────────────────────────────────────────────────────────────
@router.post("/properties/{property_id}/media", status_code=status.HTTP_201_CREATED,
summary="Attach media to a property")
async def add_media(
property_id: str,
request: Request,
body: MediaAssetCreate,
user=Depends(get_current_user),
):
if body.media_type not in VALID_MEDIA_TYPES:
raise HTTPException(400, f"Invalid media_type. Valid: {sorted(VALID_MEDIA_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
# Verify property belongs to tenant
exists = await conn.fetchval(
"SELECT 1 FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, user.role,
)
if not exists:
raise HTTPException(404, "Property not found")
row = await conn.fetchrow(
"""
INSERT INTO inventory_media_assets
(property_id, tenant_id, media_type, url, thumbnail_url, sort_order, metadata, uploaded_by)
VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8)
RETURNING media_asset_id, created_at
""",
property_id, user.role, body.media_type, body.url, body.thumbnail_url,
body.sort_order, json.dumps(body.metadata), user.user_id,
)
return {"media_asset_id": str(row["media_asset_id"]), "created_at": str(row["created_at"])}
@router.get("/properties/{property_id}/media", summary="List media for a property")
async def list_media(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT media_asset_id, media_type, url, thumbnail_url, sort_order, metadata, created_at
FROM inventory_media_assets
WHERE property_id=$1 AND tenant_id=$2
ORDER BY sort_order ASC, created_at ASC
""",
property_id, user.role,
)
return {"media": [dict(r) for r in rows]}
@router.delete("/media/{media_asset_id}", summary="Remove a media asset")
async def delete_media(
media_asset_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM inventory_media_assets WHERE media_asset_id=$1 AND tenant_id=$2",
media_asset_id, user.role,
)
if result == "DELETE 0":
raise HTTPException(404, "Media asset not found")
return {"status": "deleted"}

View File

@@ -0,0 +1,635 @@
"""
routes_mobile_edge.py
─────────────────────
Mobile Edge API — serves iPhone Edge and Android Phone Edge apps.
Surfaces:
GET /mobile-edge/events — communication events for a lead
POST /mobile-edge/events — log a new communication event
GET /mobile-edge/memory — memory facts for a lead
POST /mobile-edge/imports — operator-assisted import of a recording/note
POST /mobile-edge/notes — quick note attached to a lead
GET /mobile-edge/calendar — calendar events for the authed user
POST /mobile-edge/calendar — create a calendar event
PATCH /mobile-edge/calendar/{id} — update a calendar event
DELETE /mobile-edge/calendar/{id} — cancel a calendar event
GET /mobile-edge/transcripts/{id} — transcript segments for an event
GET /mobile-edge/insights/{lead_id}— insight recommendations for a lead
POST /mobile-edge/insights/{id}/act — act on or dismiss an insight
GET /mobile-edge/alerts — active alerts for the authed user
POST /mobile-edge/session — register a surface session heartbeat
"""
from __future__ import annotations
import logging
import uuid
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.mobile_edge")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
def _now() -> str:
return datetime.now(UTC).isoformat()
# ── Pydantic models ───────────────────────────────────────────────────────────
VALID_CHANNELS = {
"pstn", "whatsapp_message", "whatsapp_voice", "whatsapp_video",
"email", "facebook_message", "instagram_message", "in_app_voip", "manual_note",
}
VALID_CAPTURE_MODES = {"direct_api", "provider_routed", "operator_import", "operator_note"}
VALID_DIRECTIONS = {"inbound", "outbound"}
VALID_CONSENT = {"unknown", "granted", "denied", "not_required"}
class CommunicationEventCreate(BaseModel):
lead_id: str
channel: str
direction: str = "inbound"
provider: Optional[str] = None
capture_mode: str
consent_state: str = "unknown"
duration_seconds: Optional[int] = None
summary: Optional[str] = None
raw_reference: Optional[str] = None
recording_ref: Optional[str] = None
provider_metadata: dict = Field(default_factory=dict)
class ImportCreate(BaseModel):
lead_id: str
channel: str
capture_mode: str = "operator_import"
recording_ref: Optional[str] = None
summary: Optional[str] = None
consent_state: str = "granted"
class NoteCreate(BaseModel):
lead_id: str
note_text: str
fact_type: str = "custom"
effective_date: Optional[str] = None
class CalendarEventCreate(BaseModel):
lead_id: Optional[str] = None
source_event_id: Optional[str] = None
title: str
description: Optional[str] = None
start_at: str # ISO8601
end_at: str # ISO8601
all_day: bool = False
reminder_minutes: list[int] = Field(default_factory=lambda: [15])
location: Optional[str] = None
metadata: dict = Field(default_factory=dict)
class CalendarEventUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
start_at: Optional[str] = None
end_at: Optional[str] = None
status: Optional[str] = None
reminder_minutes: Optional[list[int]] = None
location: Optional[str] = None
class InsightActionRequest(BaseModel):
action: str = Field(..., pattern="^(accepted|dismissed|acted_upon)$")
class SessionHeartbeat(BaseModel):
surface_type: str
app_version: str
screen: Optional[str] = None
metadata: dict = Field(default_factory=dict)
# ── Communication Events ───────────────────────────────────────────────────────
@router.get("/events", summary="List communication events for a lead")
async def list_events(
request: Request,
lead_id: str = Query(..., description="Lead ID to fetch events for"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
"""Return paginated communication events for a given lead, newest first."""
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_id, lead_id, channel, direction, provider, capture_mode,
consent_state, timestamp, duration_seconds, summary, raw_reference,
recording_ref, provider_metadata, created_at
FROM edge_communication_events
WHERE tenant_id = $1 AND lead_id = $2
ORDER BY timestamp DESC
LIMIT $3 OFFSET $4
""",
user.role, # tenant_id derived from role scope; production uses dedicated tenant field
lead_id, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM edge_communication_events WHERE tenant_id = $1 AND lead_id = $2",
user.role, lead_id,
)
return {
"total": total,
"limit": limit,
"offset": offset,
"events": [dict(r) for r in rows],
}
@router.post("/events", status_code=status.HTTP_201_CREATED, summary="Log a communication event")
async def create_event(
request: Request,
body: CommunicationEventCreate,
user=Depends(get_current_user),
):
"""
Create a new communication event record.
Supports all three capture modes: direct_api, provider_routed, operator_import.
"""
if body.channel not in VALID_CHANNELS:
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
if body.capture_mode not in VALID_CAPTURE_MODES:
raise HTTPException(400, f"Invalid capture_mode. Valid: {sorted(VALID_CAPTURE_MODES)}")
if body.direction not in VALID_DIRECTIONS:
raise HTTPException(400, "direction must be 'inbound' or 'outbound'")
if body.consent_state not in VALID_CONSENT:
raise HTTPException(400, f"Invalid consent_state. Valid: {sorted(VALID_CONSENT)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO edge_communication_events (
tenant_id, lead_id, channel, direction, provider, capture_mode,
consent_state, duration_seconds, summary, raw_reference,
recording_ref, provider_metadata
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12::jsonb)
RETURNING event_id, created_at
""",
user.role, body.lead_id, body.channel, body.direction, body.provider,
body.capture_mode, body.consent_state, body.duration_seconds,
body.summary, body.raw_reference, body.recording_ref,
json.dumps(body.provider_metadata),
)
logger.info("Created communication event %s for lead %s", row["event_id"], body.lead_id)
return {"event_id": str(row["event_id"]), "created_at": str(row["created_at"])}
# ── Communication Memory Facts ────────────────────────────────────────────────
@router.get("/memory", summary="List memory facts for a lead")
async def list_memory_facts(
request: Request,
lead_id: str = Query(...),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT fact_id, lead_id, event_id, fact_type, fact_text,
effective_date, confidence, extracted_from, is_confirmed,
confirmed_by, confirmed_at, created_at
FROM edge_communication_memory_facts
WHERE tenant_id = $1 AND lead_id = $2
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
""",
user.role, lead_id, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM edge_communication_memory_facts WHERE tenant_id=$1 AND lead_id=$2",
user.role, lead_id,
)
return {"total": total, "limit": limit, "offset": offset, "facts": [dict(r) for r in rows]}
# ── Operator-Assisted Import ──────────────────────────────────────────────────
@router.post("/imports", status_code=status.HTTP_201_CREATED, summary="Operator-assisted import")
async def create_import(
request: Request,
body: ImportCreate,
user=Depends(get_current_user),
):
"""
Mode C import: user uploads recording ref or confirms a note manually.
Creates an event with capture_mode = 'operator_import' and triggers a
transcription job if a recording_ref is supplied.
"""
if body.channel not in VALID_CHANNELS:
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
async with conn.transaction():
event_row = await conn.fetchrow(
"""
INSERT INTO edge_communication_events (
tenant_id, lead_id, channel, direction, capture_mode,
consent_state, recording_ref, summary
) VALUES ($1,$2,$3,'inbound',$4,$5,$6,$7)
RETURNING event_id, created_at
""",
user.role, body.lead_id, body.channel, body.capture_mode,
body.consent_state, body.recording_ref, body.summary,
)
event_id = event_row["event_id"]
job_id = None
if body.recording_ref:
job_row = await conn.fetchrow(
"""
INSERT INTO edge_transcription_jobs (
tenant_id, event_id, media_type, consent_state
) VALUES ($1,$2,'audio',$3)
RETURNING transcription_job_id
""",
user.role, event_id, body.consent_state,
)
job_id = str(job_row["transcription_job_id"])
return {
"event_id": str(event_id),
"transcription_job_id": job_id,
"created_at": str(event_row["created_at"]),
}
# ── Quick Notes ───────────────────────────────────────────────────────────────
@router.post("/notes", status_code=status.HTTP_201_CREATED, summary="Create a quick note for a lead")
async def create_note(
request: Request,
body: NoteCreate,
user=Depends(get_current_user),
):
"""
Create a manual memory fact from an operator note.
No event is created — this is a direct fact insertion.
"""
pool = _pool(request)
from datetime import date
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO edge_communication_memory_facts (
tenant_id, lead_id, fact_type, fact_text, effective_date,
extracted_from, confidence, is_confirmed
) VALUES ($1,$2,$3,$4,$5,'operator_note',1.0, TRUE)
RETURNING fact_id, created_at
""",
user.role, body.lead_id, body.fact_type, body.note_text,
body.effective_date,
)
return {"fact_id": str(row["fact_id"]), "created_at": str(row["created_at"])}
# ── Calendar ──────────────────────────────────────────────────────────────────
@router.get("/calendar", summary="Get calendar events for the authed user")
async def list_calendar_events(
request: Request,
from_date: Optional[str] = Query(None),
to_date: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
if from_date and to_date:
rows = await conn.fetch(
"""
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
all_day, status, reminder_minutes, created_by, location, metadata, created_at
FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
AND start_at >= $3::timestamptz AND end_at <= $4::timestamptz
ORDER BY start_at ASC LIMIT $5
""",
user.role, user.user_id, from_date, to_date, limit,
)
else:
rows = await conn.fetch(
"""
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
all_day, status, reminder_minutes, created_by, location, metadata, created_at
FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
ORDER BY start_at ASC LIMIT $3
""",
user.role, user.user_id, limit,
)
return {"events": [dict(r) for r in rows]}
@router.post("/calendar", status_code=status.HTTP_201_CREATED, summary="Create a calendar event")
async def create_calendar_event(
request: Request,
body: CalendarEventCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO user_calendar_events (
tenant_id, owner_user_id, lead_id, source_event_id, title, description,
start_at, end_at, all_day, reminder_minutes, created_by, location, metadata
) VALUES ($1,$2,$3,$4,$5,$6,$7::timestamptz,$8::timestamptz,$9,$10,$11,$12,$13::jsonb)
RETURNING calendar_event_id, created_at
""",
user.role, user.user_id, body.lead_id, body.source_event_id,
body.title, body.description, body.start_at, body.end_at,
body.all_day, body.reminder_minutes, "user",
body.location, json.dumps(body.metadata),
)
return {"calendar_event_id": str(row["calendar_event_id"]), "created_at": str(row["created_at"])}
@router.patch("/calendar/{calendar_event_id}", summary="Update a calendar event")
async def update_calendar_event(
calendar_event_id: str,
request: Request,
body: CalendarEventUpdate,
user=Depends(get_current_user),
):
pool = _pool(request)
# Build partial update
updates: list[str] = []
values: list[Any] = []
idx = 1
def _add(col: str, val: Any):
nonlocal idx
updates.append(f"{col} = ${idx}")
values.append(val)
idx += 1
if body.title is not None: _add("title", body.title)
if body.description is not None: _add("description", body.description)
if body.start_at is not None: _add("start_at", body.start_at)
if body.end_at is not None: _add("end_at", body.end_at)
if body.status is not None: _add("status", body.status)
if body.reminder_minutes is not None: _add("reminder_minutes", body.reminder_minutes)
if body.location is not None: _add("location", body.location)
if not updates:
raise HTTPException(400, "No fields to update")
_add("updated_at", datetime.now(UTC))
_add("tenant_id", user.role)
_add("owner_user_id", user.user_id)
values.append(calendar_event_id)
async with pool.acquire() as conn:
result = await conn.execute(
f"""
UPDATE user_calendar_events
SET {', '.join(updates)}
WHERE tenant_id=${idx} AND owner_user_id=${idx+1} AND calendar_event_id=${idx+2}
""",
*values,
)
if result == "UPDATE 0":
raise HTTPException(404, "Calendar event not found or not owned by you")
return {"status": "updated"}
@router.delete("/calendar/{calendar_event_id}", summary="Cancel a calendar event")
async def delete_calendar_event(
calendar_event_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE user_calendar_events
SET status='cancelled', updated_at=NOW()
WHERE tenant_id=$1 AND owner_user_id=$2 AND calendar_event_id=$3
""",
user.role, user.user_id, calendar_event_id,
)
if result == "UPDATE 0":
raise HTTPException(404, "Calendar event not found or not owned by you")
return {"status": "cancelled"}
# ── Transcripts ───────────────────────────────────────────────────────────────
@router.get("/transcripts/{event_id}", summary="Get transcript segments for an event")
async def get_transcript(
event_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
job = await conn.fetchrow(
"""
SELECT j.transcription_job_id, j.status, j.provider, j.speaker_count,
j.word_count, j.language, j.completed_at
FROM edge_transcription_jobs j
JOIN edge_communication_events e ON e.event_id = j.event_id
WHERE j.event_id = $1 AND e.tenant_id = $2
ORDER BY j.created_at DESC LIMIT 1
""",
event_id, user.role,
)
if not job:
raise HTTPException(404, "No transcription job found for this event")
segments = await conn.fetch(
"""
SELECT segment_id, speaker_label, start_ms, end_ms, text, confidence, is_agent_turn
FROM edge_transcript_segments
WHERE transcription_job_id = $1
ORDER BY start_ms ASC
""",
job["transcription_job_id"],
)
return {
"job": dict(job),
"segments": [dict(s) for s in segments],
}
# ── Insights ──────────────────────────────────────────────────────────────────
@router.get("/insights/{lead_id}", summary="Get insight recommendations for a lead")
async def get_insights(
lead_id: str,
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(20, ge=1, le=100),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
if status_filter:
rows = await conn.fetch(
"""
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
summary, suggested_action, target_system, status, confidence, created_at
FROM insight_recommendations
WHERE tenant_id=$1 AND lead_id=$2 AND status=$3
ORDER BY created_at DESC LIMIT $4
""",
user.role, lead_id, status_filter, limit,
)
else:
rows = await conn.fetch(
"""
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
summary, suggested_action, target_system, status, confidence, created_at
FROM insight_recommendations
WHERE tenant_id=$1 AND lead_id=$2
ORDER BY created_at DESC LIMIT $3
""",
user.role, lead_id, limit,
)
return {"insights": [dict(r) for r in rows]}
@router.post("/insights/{recommendation_id}/act", summary="Act on or dismiss an insight")
async def act_on_insight(
recommendation_id: str,
request: Request,
body: InsightActionRequest,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE insight_recommendations
SET status=$1, acted_by=$2, acted_at=NOW(), updated_at=NOW()
WHERE recommendation_id=$3 AND tenant_id=$4
""",
body.action, user.user_id, recommendation_id, user.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Insight not found")
return {"status": body.action}
# ── Alerts ────────────────────────────────────────────────────────────────────
@router.get("/alerts", summary="Get active alerts for the authed user")
async def get_alerts(
request: Request,
user=Depends(get_current_user),
):
"""
Returns a combined, prioritized view of:
- Pending insights needing action
- Calendar events due within 24 hours
- Pending transcription jobs
"""
pool = _pool(request)
async with pool.acquire() as conn:
pending_insights = await conn.fetchval(
"SELECT COUNT(*) FROM insight_recommendations WHERE tenant_id=$1 AND status='pending'",
user.role,
)
upcoming_events = await conn.fetchval(
"""
SELECT COUNT(*) FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
AND status='confirmed'
AND start_at BETWEEN NOW() AND NOW() + INTERVAL '24 hours'
""",
user.role, user.user_id,
)
pending_transcriptions = await conn.fetchval(
"SELECT COUNT(*) FROM edge_transcription_jobs WHERE tenant_id=$1 AND status='pending'",
user.role,
)
return {
"pending_insights": pending_insights,
"upcoming_calendar_events_24h": upcoming_events,
"pending_transcriptions": pending_transcriptions,
"generated_at": _now(),
}
# ── Session Heartbeat ─────────────────────────────────────────────────────────
@router.post("/session", status_code=status.HTTP_200_OK, summary="Register surface session heartbeat")
async def session_heartbeat(
request: Request,
body: SessionHeartbeat,
user=Depends(get_current_user),
):
"""Upsert a surface session to track cross-surface activity."""
valid_surfaces = {
"webos", "ipad", "android_tablet", "iphone_edge", "android_phone_edge",
}
if body.surface_type not in valid_surfaces:
raise HTTPException(400, f"Invalid surface_type. Valid: {sorted(valid_surfaces)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO surface_sessions (tenant_id, user_id, surface_type, app_version, metadata)
VALUES ($1, $2, $3, $4, $5::jsonb)
ON CONFLICT DO NOTHING
RETURNING session_id
""",
user.role, user.user_id, body.surface_type, body.app_version,
json.dumps(body.metadata),
)
# Update last_active + screen_sequence for existing session (within 30 min)
if body.screen and row is None:
await conn.execute(
"""
UPDATE surface_sessions
SET last_active_at=NOW(),
screen_sequence = array_append(screen_sequence, $1)
WHERE tenant_id=$2 AND user_id=$3 AND surface_type=$4
AND last_active_at > NOW() - INTERVAL '30 minutes'
""",
body.screen, user.role, user.user_id, body.surface_type,
)
return {"status": "ok", "timestamp": _now()}

View File

@@ -0,0 +1,398 @@
"""
routes_oracle_templates.py
──────────────────────────
Oracle Template Catalog API
Extends the existing Oracle route surface with template taxonomy and seeding.
Endpoints:
GET /oracle/template-chapters — list chapters
POST /oracle/template-chapters — create a chapter
GET /oracle/template-subchapters — list subchapters (optionally filtered)
POST /oracle/template-subchapters — create a subchapter
GET /oracle/component-templates — list templates (filterable)
POST /oracle/component-templates — create a template
GET /oracle/component-templates/{id} — get a template
POST /oracle/component-templates/{id}/seed — add a seed example
GET /oracle/component-templates/{id}/seed — list seed examples for a template
POST /oracle/component-templates/synthetic-jobs — trigger a Kimi synthetic job
"""
from __future__ import annotations
import json
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.oracle_templates")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(503, "Database unavailable.")
return pool
# ── Models ────────────────────────────────────────────────────────────────────
class ChapterCreate(BaseModel):
name: str
description: Optional[str] = None
sort_order: int = 0
class SubchapterCreate(BaseModel):
chapter_id: str
name: str
description: Optional[str] = None
sort_order: int = 0
class TemplateCreate(BaseModel):
name: str
category: str
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
component_type: Optional[str] = None
accepted_shapes: list[str] = Field(default_factory=list)
json_template: Optional[dict] = None
description: Optional[str] = None
origin: str = "premade"
version: str = "1.0.0"
class SeedExampleCreate(BaseModel):
title: str
example_json: dict
quality_notes: Optional[str] = None
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
is_canonical: bool = False
class SyntheticJobCreate(BaseModel):
template_id: str
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
model: str = "kimi"
requested_count: int = Field(10, ge=1, le=500)
# ── Template Chapters ─────────────────────────────────────────────────────────
@router.get("/template-chapters", summary="List Oracle template chapters")
async def list_template_chapters(
request: Request,
include_inactive: bool = Query(False),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where = "WHERE ch.tenant_id=$1" + ("" if include_inactive else " AND ch.is_active=TRUE")
rows = await conn.fetch(
f"""
SELECT ch.chapter_id, ch.name, ch.description, ch.sort_order, ch.is_active,
COUNT(sub.subchapter_id) FILTER (WHERE sub.is_active=TRUE) as subchapter_count,
COUNT(t.template_id) as template_count
FROM oracle_template_chapters ch
LEFT JOIN oracle_template_subchapters sub ON sub.chapter_id = ch.chapter_id
LEFT JOIN oracle_component_templates t ON t.chapter_id = ch.chapter_id
AND t.status != 'archived'
{where}
GROUP BY ch.chapter_id
ORDER BY ch.sort_order ASC
""",
user.role,
)
return {"chapters": [dict(r) for r in rows]}
@router.post("/template-chapters", status_code=status.HTTP_201_CREATED,
summary="Create a template chapter")
async def create_template_chapter(
request: Request,
body: ChapterCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_chapters (tenant_id, name, description, sort_order)
VALUES ($1,$2,$3,$4)
RETURNING chapter_id, created_at
""",
user.role, body.name, body.description, body.sort_order,
)
return {"chapter_id": str(row["chapter_id"]), "created_at": str(row["created_at"])}
# ── Template Subchapters ──────────────────────────────────────────────────────
@router.get("/template-subchapters", summary="List Oracle template subchapters")
async def list_template_subchapters(
request: Request,
chapter_id: Optional[str] = Query(None),
include_inactive: bool = Query(False),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where = "WHERE sub.tenant_id=$1"
params: list[Any] = [user.role]
idx = 2
if not include_inactive:
where += " AND sub.is_active=TRUE"
if chapter_id:
where += f" AND sub.chapter_id=${idx}"; params.append(chapter_id); idx += 1
rows = await conn.fetch(
f"""
SELECT sub.subchapter_id, sub.chapter_id, ch.name as chapter_name,
sub.name, sub.description, sub.sort_order, sub.is_active,
COUNT(t.template_id) as template_count
FROM oracle_template_subchapters sub
JOIN oracle_template_chapters ch ON ch.chapter_id = sub.chapter_id
LEFT JOIN oracle_component_templates t ON t.subchapter_id = sub.subchapter_id
AND t.status != 'archived'
{where}
GROUP BY sub.subchapter_id, ch.name
ORDER BY sub.chapter_id, sub.sort_order ASC
""",
*params,
)
return {"subchapters": [dict(r) for r in rows]}
@router.post("/template-subchapters", status_code=status.HTTP_201_CREATED,
summary="Create a template subchapter")
async def create_template_subchapter(
request: Request,
body: SubchapterCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
# Verify chapter exists and belongs to tenant
ch_exists = await conn.fetchval(
"SELECT 1 FROM oracle_template_chapters WHERE chapter_id=$1 AND tenant_id=$2",
body.chapter_id, user.role,
)
if not ch_exists:
raise HTTPException(404, "Chapter not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_subchapters
(chapter_id, tenant_id, name, description, sort_order)
VALUES ($1,$2,$3,$4,$5)
RETURNING subchapter_id, created_at
""",
body.chapter_id, user.role, body.name, body.description, body.sort_order,
)
return {"subchapter_id": str(row["subchapter_id"]), "created_at": str(row["created_at"])}
# ── Component Templates ───────────────────────────────────────────────────────
@router.get("/component-templates", summary="List Oracle component templates")
async def list_component_templates(
request: Request,
chapter_id: Optional[str] = Query(None),
subchapter_id: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
search: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
where = "WHERE t.tenant_id=$1"
params: list[Any] = [user.role]
idx = 2
if chapter_id:
where += f" AND t.chapter_id=${idx}"; params.append(chapter_id); idx += 1
if subchapter_id:
where += f" AND t.subchapter_id=${idx}"; params.append(subchapter_id); idx += 1
if status_filter:
where += f" AND t.status=${idx}"; params.append(status_filter); idx += 1
if search:
where += f" AND (t.name ILIKE ${idx} OR t.description ILIKE ${idx})"
params.append(f"%{search}%"); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.template_id, t.name, t.category, t.status, t.origin, t.version,
t.accepted_shapes, t.use_count, t.chapter_id, t.subchapter_id,
t.description, ch.name as chapter_name, sub.name as subchapter_name,
t.created_at, t.updated_at
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
{where}
ORDER BY t.updated_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM oracle_component_templates t {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "templates": [dict(r) for r in rows]}
@router.post("/component-templates", status_code=status.HTTP_201_CREATED,
summary="Create a component template")
async def create_component_template(
request: Request,
body: TemplateCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO oracle_component_templates (
tenant_id, name, category, chapter_id, subchapter_id,
accepted_shapes, json_template, description, origin, version, status
) VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8,$9,$10,'draft')
RETURNING template_id, created_at
""",
user.role, body.name, body.category, body.chapter_id, body.subchapter_id,
body.accepted_shapes,
json.dumps(body.json_template) if body.json_template else None,
body.description, body.origin, body.version,
)
return {"template_id": str(row["template_id"]), "created_at": str(row["created_at"])}
@router.get("/component-templates/{template_id}", summary="Get a component template")
async def get_component_template(
template_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT t.*, ch.name as chapter_name, sub.name as subchapter_name
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
WHERE t.template_id=$1 AND t.tenant_id=$2
""",
template_id, user.role,
)
if not row:
raise HTTPException(404, "Template not found")
return dict(row)
# ── Seed Examples ─────────────────────────────────────────────────────────────
@router.post("/component-templates/{template_id}/seed", status_code=status.HTTP_201_CREATED,
summary="Add a seed example to a template")
async def add_seed_example(
template_id: str,
request: Request,
body: SeedExampleCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval(
"SELECT 1 FROM oracle_component_templates WHERE template_id=$1 AND tenant_id=$2",
template_id, user.role,
)
if not exists:
raise HTTPException(404, "Template not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_seed_examples (
template_id, chapter_id, subchapter_id, title, example_json,
quality_notes, is_canonical
) VALUES ($1,$2,$3,$4,$5::jsonb,$6,$7)
RETURNING example_id, created_at
""",
template_id, body.chapter_id, body.subchapter_id, body.title,
json.dumps(body.example_json), body.quality_notes, body.is_canonical,
)
return {"example_id": str(row["example_id"]), "created_at": str(row["created_at"])}
@router.get("/component-templates/{template_id}/seed", summary="List seed examples for a template")
async def list_seed_examples(
template_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT example_id, title, example_json, quality_notes, is_canonical, created_at
FROM oracle_template_seed_examples
WHERE template_id=$1
ORDER BY is_canonical DESC, created_at ASC
""",
template_id,
)
return {"examples": [dict(r) for r in rows]}
# ── Synthetic Jobs ────────────────────────────────────────────────────────────
@router.post("/component-templates/synthetic-jobs", status_code=status.HTTP_201_CREATED,
summary="Trigger a Kimi synthetic data generation job")
async def trigger_synthetic_job(
request: Request,
body: SyntheticJobCreate,
user=Depends(get_current_user),
):
"""
Queues a Kimi synthetic data expansion job for a template.
The job will be picked up by the background synthetic generation worker.
"""
pool = _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval(
"SELECT 1 FROM oracle_component_templates WHERE template_id=$1 AND tenant_id=$2",
body.template_id, user.role,
)
if not exists:
raise HTTPException(404, "Template not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_synthetic_generation_jobs (
tenant_id, template_id, chapter_id, subchapter_id,
model, requested_count, created_by
) VALUES ($1,$2,$3,$4,$5,$6,$7)
RETURNING job_id, status, created_at
""",
user.role, body.template_id, body.chapter_id, body.subchapter_id,
body.model, body.requested_count, user.user_id,
)
logger.info(
"Synthetic job queued: %s for template %s (%d examples)",
row["job_id"], body.template_id, body.requested_count,
)
return {
"job_id": str(row["job_id"]),
"status": row["status"],
"created_at": str(row["created_at"]),
}

View File

@@ -52,9 +52,10 @@ JWT_EXPIRE_HOURS = 8
def create_access_token(user_id: str, role: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS)
normalized_role = role.strip().upper()
payload = {
"sub": user_id,
"role": role,
"role": normalized_role,
"exp": expire,
"iat": datetime.now(timezone.utc),
}
@@ -70,7 +71,7 @@ class UserPrincipal:
@property
def role_level(self) -> int:
return ROLE_HIERARCHY.get(self.role, -1)
return ROLE_HIERARCHY.get(self.role.upper(), -1)
# ── Dependency: parse bearer token ────────────────────────────────────────────
@@ -104,7 +105,7 @@ def get_current_user(
headers={"WWW-Authenticate": "Bearer"},
) from exc
return UserPrincipal(user_id=payload["sub"], role=payload["role"])
return UserPrincipal(user_id=payload["sub"], role=str(payload["role"]).strip().upper())
# ── Dependency factory: role gate ─────────────────────────────────────────────

View File

@@ -23,6 +23,10 @@ from dotenv import load_dotenv
from backend.api.routes_catalyst import router as catalyst_router
from backend.api.routes_crm import crm_router, analytics_router
from backend.api.routes_oracle import router as oracle_helper_router
from backend.api.routes_mobile_edge import router as mobile_edge_router
from backend.api.routes_inventory import router as inventory_router
from backend.api.routes_admin_surface import router as admin_surface_router
from backend.api.routes_oracle_templates import router as oracle_templates_router
from backend.auth.dependencies import (
create_access_token, verify_password, get_current_user
)
@@ -93,11 +97,15 @@ app.include_router(crm_router, prefix="/api", tags=["CRM"])
app.include_router(analytics_router, prefix="/api/analytics", tags=["Analytics"])
app.include_router(oracle_helper_router, prefix="/api/oracle", tags=["Oracle"])
app.include_router(oracle_v1_router, prefix="/api/oracle/v1", tags=["Oracle V1"])
app.include_router(oracle_templates_router, prefix="/api/oracle", tags=["Oracle Templates"])
app.include_router(sentinel_router, prefix="/api/sentinel", tags=["Sentinel"])
app.include_router(cctv_router, prefix="/api/cctv", tags=["CCTV"])
app.include_router(scenes_router, prefix="/api/scenes", tags=["Scenes"])
app.include_router(videos_router, prefix="/api/videos", tags=["Videos"])
app.include_router(vault_router, prefix="/api/vault", tags=["Vault"])
app.include_router(mobile_edge_router, prefix="/api/mobile-edge", tags=["Mobile Edge"])
app.include_router(inventory_router, prefix="/api/inventory", tags=["Inventory"])
app.include_router(admin_surface_router, prefix="/api/admin-surface", tags=["Admin Surface"])
# Public vault link (no /api prefix — shared externally with prospects)
from backend.routers.vault import router as public_vault_router

View File

@@ -0,0 +1,497 @@
{
"_meta": {
"version": "1.0.0",
"created": "2026-04-18",
"description": "Oracle Template Seed Database — canonical chapter/subchapter taxonomy and seed JSON examples for the Project Velocity Oracle platform",
"total_chapters": 6,
"total_subchapters": 24,
"total_seed_examples": 36
},
"chapters": [
{
"chapter_id": "ch-001",
"name": "Market Intelligence",
"description": "Components for real estate market analysis, pricing trends, demand signals, and competitive landscape.",
"sort_order": 1,
"subchapters": [
{
"subchapter_id": "sub-001-01",
"name": "Pricing Trends",
"description": "Price per sqft trends, AED/m² benchmarks, quarterly movement charts.",
"sort_order": 1
},
{
"subchapter_id": "sub-001-02",
"name": "Demand Signals",
"description": "Search volume, inquiry rate, site visit frequency, and absorption rate components.",
"sort_order": 2
},
{
"subchapter_id": "sub-001-03",
"name": "Competitive Landscape",
"description": "Developer comparison, project pipeline mapping, competitive unit mix analysis.",
"sort_order": 3
},
{
"subchapter_id": "sub-001-04",
"name": "Location Index",
"description": "District-level scores, proximity analysis, infrastructure readiness.",
"sort_order": 4
}
]
},
{
"chapter_id": "ch-002",
"name": "Lead Intelligence",
"description": "Components for lead profiling, scoring, pipeline health, and behaviour tracking.",
"sort_order": 2,
"subchapters": [
{
"subchapter_id": "sub-002-01",
"name": "Lead Profile",
"description": "Buyer persona cards, nationality, budget bracket, preferred property type.",
"sort_order": 1
},
{
"subchapter_id": "sub-002-02",
"name": "QD Score",
"description": "Qualification-Desire score breakdown, historical trend, per-dimension scores.",
"sort_order": 2
},
{
"subchapter_id": "sub-002-03",
"name": "Pipeline Health",
"description": "Pipeline stage distribution, velocity, stall alerts, probability weighting.",
"sort_order": 3
},
{
"subchapter_id": "sub-002-04",
"name": "Engagement History",
"description": "Touchpoint timeline, dwell time heat maps, content interaction logs.",
"sort_order": 4
}
]
},
{
"chapter_id": "ch-003",
"name": "Communication Intelligence",
"description": "Components surfacing insights from calls, messages, transcripts, and follow-up commitments.",
"sort_order": 3,
"subchapters": [
{
"subchapter_id": "sub-003-01",
"name": "Call Summary",
"description": "Transcript summary, speaker diarization, key-phrase extraction.",
"sort_order": 1
},
{
"subchapter_id": "sub-003-02",
"name": "Promise Tracker",
"description": "Promises made during calls, follow-up dates, commitment confidence.",
"sort_order": 2
},
{
"subchapter_id": "sub-003-03",
"name": "WhatsApp Thread",
"description": "Business WhatsApp message thread summaries, sentiment per message.",
"sort_order": 3
},
{
"subchapter_id": "sub-003-04",
"name": "Reminder Surface",
"description": "Due follow-ups, overdue reminders, NemoClaw-suggested next actions.",
"sort_order": 4
}
]
},
{
"chapter_id": "ch-004",
"name": "Inventory Analytics",
"description": "Components for property inventory insight, availability, and absorption.",
"sort_order": 4,
"subchapters": [
{
"subchapter_id": "sub-004-01",
"name": "Property Card",
"description": "Single-property summary card with unit details, pricing, media reference.",
"sort_order": 1
},
{
"subchapter_id": "sub-004-02",
"name": "Availability Matrix",
"description": "Bed-type × availability grid with unit count and price band.",
"sort_order": 2
},
{
"subchapter_id": "sub-004-03",
"name": "Absorption Rate",
"description": "Sales velocity per project and developer over rolling windows.",
"sort_order": 3
},
{
"subchapter_id": "sub-004-04",
"name": "Inventory Comparison",
"description": "Side-by-side comparison of two or more properties on key metrics.",
"sort_order": 4
}
]
},
{
"chapter_id": "ch-005",
"name": "Operational Metrics",
"description": "System-level, team-level, and showroom-level operational performance components.",
"sort_order": 5,
"subchapters": [
{
"subchapter_id": "sub-005-01",
"name": "Showroom Traffic",
"description": "Visitor count, zone dwell time, peak hour distribution.",
"sort_order": 1
},
{
"subchapter_id": "sub-005-02",
"name": "Team Performance",
"description": "Agent-level QD scores, conversion rates, call volume, follow-up compliance.",
"sort_order": 2
},
{
"subchapter_id": "sub-005-03",
"name": "Campaign Metrics",
"description": "Catalyst campaign reach, engagement rate, cost-per-lead, ROAS.",
"sort_order": 3
},
{
"subchapter_id": "sub-005-04",
"name": "System Health",
"description": "Backend queue depth, GPU utilization, transcription job latency.",
"sort_order": 4
}
]
},
{
"chapter_id": "ch-006",
"name": "Calendar and Follow-Up",
"description": "Components for scheduling, action planning, and NemoClaw-derived follow-up surfaces.",
"sort_order": 6,
"subchapters": [
{
"subchapter_id": "sub-006-01",
"name": "Calendar View",
"description": "Personal calendar view with communication-derived events and reminders.",
"sort_order": 1
},
{
"subchapter_id": "sub-006-02",
"name": "Action Queue",
"description": "Prioritized action list for an agent, ordered by urgency and lead value.",
"sort_order": 2
},
{
"subchapter_id": "sub-006-03",
"name": "Follow-Up Plan",
"description": "Structured follow-up plan derived from call outcomes and NemoClaw insights.",
"sort_order": 3
},
{
"subchapter_id": "sub-006-04",
"name": "Reminder Cards",
"description": "Surface-agnostic reminder card applicable to tablet and phone edge.",
"sort_order": 4
}
]
}
],
"seed_examples": [
{
"example_id": "ex-001",
"chapter_id": "ch-001",
"subchapter_id": "sub-001-01",
"title": "Dubai Marina — Price Per Sqft Trend (12-Month)",
"quality_notes": "Canonical example. Use for pricing trend chart templates.",
"is_canonical": true,
"template_name": "Pricing Trend Chart",
"component_type": "line_chart",
"accepted_shapes": ["time_series"],
"example_json": {
"componentType": "line_chart",
"title": "Dubai Marina — AED/sqft Trend",
"subtitle": "12-Month Rolling Average",
"dataSource": {
"type": "inventory_aggregate",
"district": "Dubai Marina",
"metric": "avg_price_per_sqft",
"window": "12M"
},
"visualization": {
"xAxis": "month",
"yAxis": "aed_per_sqft",
"format": "currency_aed",
"annotations": [
{ "date": "2025-10", "label": "Off-plan surge", "type": "event" }
],
"trend_line": true,
"confidence_band": false
},
"style": {
"accentColor": "#2563EB",
"gridLines": "subtle"
}
}
},
{
"example_id": "ex-002",
"chapter_id": "ch-001",
"subchapter_id": "sub-001-02",
"title": "Inquiry Velocity — Downtown Dubai (30-Day)",
"quality_notes": "Use for demand signal bar charts.",
"is_canonical": true,
"template_name": "Demand Signal Bar",
"component_type": "bar_chart",
"accepted_shapes": ["categorical_count"],
"example_json": {
"componentType": "bar_chart",
"title": "Inquiry Volume — Downtown Dubai",
"subtitle": "Last 30 Days by Week",
"dataSource": {
"type": "crm_aggregate",
"district": "Downtown Dubai",
"metric": "inquiry_count",
"window": "30D",
"groupBy": "week"
},
"visualization": {
"xAxis": "week",
"yAxis": "inquiry_count",
"format": "integer",
"comparison": { "enabled": true, "label": "Prior 30D", "style": "ghost_bar" }
},
"style": {
"accentColor": "#10B981",
"barRadius": 4
}
}
},
{
"example_id": "ex-003",
"chapter_id": "ch-002",
"subchapter_id": "sub-002-02",
"title": "Lead QD Score Card — Mohammed Al-Rashid",
"quality_notes": "Canonical single-lead QD score breakdown card.",
"is_canonical": true,
"template_name": "QD Score Card",
"component_type": "metric_card_group",
"accepted_shapes": ["qd_score_breakdown"],
"example_json": {
"componentType": "metric_card_group",
"title": "QD Score",
"subtitle": "Qualification × Desire",
"dataSource": {
"type": "sentinel_qd",
"leadId": "{{lead_id}}"
},
"visualization": {
"layout": "2x2_grid",
"cards": [
{ "dimension": "overall", "label": "Overall QD", "format": "percentage" },
{ "dimension": "qualification", "label": "Qualification", "format": "percentage" },
{ "dimension": "desire", "label": "Desire", "format": "percentage" },
{ "dimension": "velocity", "label": "Engagement Velocity", "format": "trend_arrow" }
],
"threshold_colors": {
"high": "#10B981",
"medium": "#F59E0B",
"low": "#EF4444"
}
}
}
},
{
"example_id": "ex-004",
"chapter_id": "ch-003",
"subchapter_id": "sub-003-01",
"title": "Call Summary Card — Diarized Transcript with Key Phrases",
"quality_notes": "Canonical call summary. Use for communication intelligence panels.",
"is_canonical": true,
"template_name": "Call Summary Card",
"component_type": "communication_summary",
"accepted_shapes": ["transcript_summary"],
"example_json": {
"componentType": "communication_summary",
"title": "Call Summary",
"dataSource": {
"type": "edge_communication_event",
"eventId": "{{event_id}}",
"channel": "pstn"
},
"visualization": {
"layout": "timeline_with_phrases",
"show_speaker_labels": true,
"show_duration": true,
"show_sentiment": true,
"key_phrase_highlight": true,
"sections": ["summary", "promises", "key_phrases", "next_action"]
}
}
},
{
"example_id": "ex-005",
"chapter_id": "ch-003",
"subchapter_id": "sub-003-02",
"title": "Promise Tracker — Lead Follow-Up Commitments",
"quality_notes": "Canonical promise tracker. Use for follow-up reminder surfaces.",
"is_canonical": true,
"template_name": "Promise Tracker Table",
"component_type": "data_table",
"accepted_shapes": ["communication_facts"],
"example_json": {
"componentType": "data_table",
"title": "Promises and Commitments",
"dataSource": {
"type": "edge_memory_facts",
"leadId": "{{lead_id}}",
"factTypes": ["promise", "follow_up_date", "decision_maker_note"]
},
"visualization": {
"columns": [
{ "key": "fact_text", "label": "Commitment", "width": "flex" },
{ "key": "effective_date", "label": "Due", "format": "date_relative" },
{ "key": "confidence", "label": "Confidence", "format": "percentage" },
{ "key": "extracted_from", "label": "Source", "format": "badge" }
],
"row_actions": ["mark_done", "create_calendar_event"],
"sort": { "column": "effective_date", "direction": "asc" }
}
}
},
{
"example_id": "ex-006",
"chapter_id": "ch-004",
"subchapter_id": "sub-004-01",
"title": "Property Card — Sobha One Tower A",
"quality_notes": "Canonical property card. Use for inventory summaries.",
"is_canonical": true,
"template_name": "Property Summary Card",
"component_type": "property_card",
"accepted_shapes": ["inventory_property"],
"example_json": {
"componentType": "property_card",
"title": "Property Summary",
"dataSource": {
"type": "inventory_property",
"propertyId": "{{property_id}}"
},
"visualization": {
"layout": "hero_with_stats",
"sections": [
"project_name",
"developer_name",
"location_map_pin",
"price_bands",
"unit_mix_summary",
"amenity_chips",
"media_carousel"
],
"cta": { "label": "Schedule Viewing", "action": "create_calendar_event" }
}
}
},
{
"example_id": "ex-007",
"chapter_id": "ch-005",
"subchapter_id": "sub-005-01",
"title": "Showroom Traffic Heatmap",
"quality_notes": "Canonical traffic component. Use for operational dashboards.",
"is_canonical": true,
"template_name": "Showroom Traffic Heatmap",
"component_type": "heatmap",
"accepted_shapes": ["zone_time_matrix"],
"example_json": {
"componentType": "heatmap",
"title": "Showroom Zone Traffic",
"subtitle": "Today — Live",
"dataSource": {
"type": "sentinel_live",
"metric": "visitor_dwell_time",
"groupBy": ["zone", "hour"]
},
"visualization": {
"xAxis": "hour_of_day",
"yAxis": "zone_name",
"value": "avg_dwell_minutes",
"colorScale": { "low": "#EFF6FF", "high": "#1D4ED8" },
"annotations": true
}
}
},
{
"example_id": "ex-008",
"chapter_id": "ch-006",
"subchapter_id": "sub-006-04",
"title": "Phone Edge Reminder Card — Follow-Up Due",
"quality_notes": "Designed for narrow phone edge surfaces. Minimal data footprint.",
"is_canonical": true,
"template_name": "Reminder Card",
"component_type": "compact_alert_card",
"accepted_shapes": ["insight_recommendation"],
"example_json": {
"componentType": "compact_alert_card",
"title": "Follow-Up Reminder",
"dataSource": {
"type": "insight_recommendations",
"leadId": "{{lead_id}}",
"status": "pending",
"limit": 1
},
"visualization": {
"layout": "single_card_narrow",
"fields": ["summary", "suggested_action", "target_system"],
"actions": ["accept", "dismiss", "snooze_1h"],
"urgency_indicator": true,
"surface_target": ["iphone_edge", "android_phone_edge"]
}
}
}
],
"kimi_synthetic_plan": {
"description": "Downstream Kimi synthetic data expansion plan consuming this seed DB",
"expansion_targets": [
{
"chapter_id": "ch-001",
"subchapter_id": "sub-001-01",
"seed_example_ids": ["ex-001"],
"requested_count": 50,
"model": "kimi",
"diversity_axes": ["district", "property_type", "time_window"]
},
{
"chapter_id": "ch-002",
"subchapter_id": "sub-002-02",
"seed_example_ids": ["ex-003"],
"requested_count": 100,
"model": "kimi",
"diversity_axes": ["lead_nationality", "budget_bracket", "pipeline_stage"]
},
{
"chapter_id": "ch-003",
"subchapter_id": "sub-003-01",
"seed_example_ids": ["ex-004"],
"requested_count": 200,
"model": "kimi",
"diversity_axes": ["call_outcome", "property_type", "language"]
},
{
"chapter_id": "ch-004",
"subchapter_id": "sub-004-01",
"seed_example_ids": ["ex-006"],
"requested_count": 150,
"model": "kimi",
"diversity_axes": ["developer_name", "district", "bedrooms"]
}
],
"quality_gate": {
"min_acceptance_confidence": 0.8,
"human_review_required_for_canonical": true,
"auto_accept_below_count": 20
}
}
}

View File

@@ -0,0 +1,350 @@
-- ────────────────────────────────────────────────────────────────────────────
-- Oracle Schema Extension v2 — Multi-Surface Platform and Oracle Expansion
-- Date: 2026-04-18
-- Author: Velocity Platform Team
-- Depends on: schema_oracle.sql (must be applied first)
-- PostgreSQL 14+ required · UUID via pgcrypto already enabled
-- ────────────────────────────────────────────────────────────────────────────
-- ─── 1. Oracle Template Taxonomy ─────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS oracle_template_chapters (
chapter_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
sort_order INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS oracle_template_subchapters (
subchapter_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
chapter_id UUID NOT NULL REFERENCES oracle_template_chapters(chapter_id) ON DELETE CASCADE,
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
sort_order INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS oracle_template_seed_examples (
example_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
template_id UUID NOT NULL REFERENCES oracle_component_templates(template_id) ON DELETE CASCADE,
chapter_id UUID REFERENCES oracle_template_chapters(chapter_id),
subchapter_id UUID REFERENCES oracle_template_subchapters(subchapter_id),
title TEXT NOT NULL,
example_json JSONB NOT NULL,
quality_notes TEXT,
is_canonical BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Extend oracle_component_templates with chapter/subchapter linkage
-- (additive columns — does not alter existing rows)
ALTER TABLE oracle_component_templates
ADD COLUMN IF NOT EXISTS chapter_id UUID REFERENCES oracle_template_chapters(chapter_id),
ADD COLUMN IF NOT EXISTS subchapter_id UUID REFERENCES oracle_template_subchapters(subchapter_id),
ADD COLUMN IF NOT EXISTS json_template JSONB,
ADD COLUMN IF NOT EXISTS description TEXT;
-- ─── 2. Kimi Synthetic Data Jobs ─────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS oracle_synthetic_generation_jobs (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
template_id UUID NOT NULL REFERENCES oracle_component_templates(template_id),
chapter_id UUID REFERENCES oracle_template_chapters(chapter_id),
subchapter_id UUID REFERENCES oracle_template_subchapters(subchapter_id),
model TEXT NOT NULL DEFAULT 'kimi',
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','running','completed','failed','cancelled')),
requested_count INTEGER NOT NULL DEFAULT 10,
accepted_count INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_by TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 3. Inventory Pipeline ───────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS inventory_import_batches (
batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('csv','json','api_push','manual')),
submitted_by TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','validating','processing','completed','failed','partial')),
total_rows INTEGER NOT NULL DEFAULT 0,
accepted_rows INTEGER NOT NULL DEFAULT 0,
rejected_rows INTEGER NOT NULL DEFAULT 0,
error_summary JSONB NOT NULL DEFAULT '[]'::JSONB,
source_file_ref TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS inventory_properties (
property_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
batch_id UUID REFERENCES inventory_import_batches(batch_id),
source_id TEXT, -- external source identifier
project_name TEXT NOT NULL,
developer_name TEXT NOT NULL,
location JSONB NOT NULL DEFAULT '{}'::JSONB, -- {city, district, lat, lng}
property_type TEXT NOT NULL, -- apartment, villa, penthouse, plot, etc.
price_bands JSONB NOT NULL DEFAULT '[]'::JSONB, -- [{minAED, maxAED, unitType}]
unit_mix JSONB NOT NULL DEFAULT '[]'::JSONB, -- [{bedrooms, count, sizeSqft}]
amenities TEXT[] NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active','archived','draft','under_review')),
validation_state JSONB NOT NULL DEFAULT '{}'::JSONB,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS inventory_media_assets (
media_asset_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
property_id UUID NOT NULL REFERENCES inventory_properties(property_id) ON DELETE CASCADE,
tenant_id TEXT NOT NULL,
media_type TEXT NOT NULL CHECK (media_type IN ('image','video','floorplan','brochure','360','vr')),
url TEXT NOT NULL,
thumbnail_url TEXT,
sort_order INTEGER NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{}'::JSONB,
uploaded_by TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 4. Edge Communication Events ────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS edge_communication_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
lead_id TEXT NOT NULL,
channel TEXT NOT NULL
CHECK (channel IN ('pstn','whatsapp_message','whatsapp_voice',
'whatsapp_video','email','facebook_message',
'instagram_message','in_app_voip','manual_note')),
direction TEXT NOT NULL CHECK (direction IN ('inbound','outbound')),
provider TEXT, -- twilio, vonage, meta, etc.
capture_mode TEXT NOT NULL
CHECK (capture_mode IN ('direct_api','provider_routed','operator_import','operator_note')),
consent_state TEXT NOT NULL DEFAULT 'unknown'
CHECK (consent_state IN ('unknown','granted','denied','not_required')),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
duration_seconds INTEGER,
summary TEXT,
raw_reference TEXT, -- provider message/call ID
recording_ref TEXT, -- storage path or URL
provider_metadata JSONB NOT NULL DEFAULT '{}'::JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS edge_communication_memory_facts (
fact_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
lead_id TEXT NOT NULL,
event_id UUID REFERENCES edge_communication_events(event_id),
fact_type TEXT NOT NULL
CHECK (fact_type IN ('promise','preference','follow_up_date',
'objection','interest_signal','budget','timeline',
'constraint','decision_maker_note','custom')),
fact_text TEXT NOT NULL,
effective_date DATE,
confidence NUMERIC(4,3) NOT NULL DEFAULT 1.0 CHECK (confidence BETWEEN 0 AND 1),
extracted_from TEXT NOT NULL
CHECK (extracted_from IN ('transcript','message_thread','operator_note','import')),
is_confirmed BOOLEAN NOT NULL DEFAULT FALSE,
confirmed_by TEXT,
confirmed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 5. Transcription Jobs and Segments ──────────────────────────────────────
CREATE TABLE IF NOT EXISTS edge_transcription_jobs (
transcription_job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
event_id UUID NOT NULL REFERENCES edge_communication_events(event_id) ON DELETE CASCADE,
media_type TEXT NOT NULL CHECK (media_type IN ('audio','video')),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','queued','processing','completed','failed')),
transcript_ref TEXT, -- storage path to diarized JSON
provider TEXT NOT NULL DEFAULT 'nemoclaw',
consent_state TEXT NOT NULL DEFAULT 'unknown'
CHECK (consent_state IN ('unknown','granted','denied')),
speaker_count INTEGER,
word_count INTEGER,
language TEXT NOT NULL DEFAULT 'en',
error_message TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS edge_transcript_segments (
segment_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
transcription_job_id UUID NOT NULL REFERENCES edge_transcription_jobs(transcription_job_id) ON DELETE CASCADE,
event_id UUID NOT NULL REFERENCES edge_communication_events(event_id),
speaker_label TEXT NOT NULL, -- SPEAKER_00, SPEAKER_01, etc.
start_ms INTEGER NOT NULL,
end_ms INTEGER NOT NULL,
text TEXT NOT NULL,
confidence NUMERIC(4,3) NOT NULL DEFAULT 1.0,
is_agent_turn BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 6. User Calendar Events ─────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS user_calendar_events (
calendar_event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
owner_user_id TEXT NOT NULL,
lead_id TEXT,
source_event_id UUID REFERENCES edge_communication_events(event_id),
title TEXT NOT NULL,
description TEXT,
start_at TIMESTAMPTZ NOT NULL,
end_at TIMESTAMPTZ NOT NULL,
all_day BOOLEAN NOT NULL DEFAULT FALSE,
status TEXT NOT NULL DEFAULT 'confirmed'
CHECK (status IN ('tentative','confirmed','cancelled')),
reminder_minutes INTEGER[] NOT NULL DEFAULT '{15}'::INTEGER[],
created_by TEXT NOT NULL
CHECK (created_by IN ('user','nemoclaw_suggested','operator_import')),
is_nemoclaw_confirmed BOOLEAN NOT NULL DEFAULT FALSE,
location TEXT,
metadata JSONB NOT NULL DEFAULT '{}'::JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 7. Insight Recommendations ──────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS insight_recommendations (
recommendation_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
lead_id TEXT NOT NULL,
source_event_id UUID REFERENCES edge_communication_events(event_id),
recommendation_type TEXT NOT NULL
CHECK (recommendation_type IN ('follow_up_call','send_message',
'schedule_meeting','update_crm',
'update_qd_score','send_property_info',
'escalate','custom')),
summary TEXT NOT NULL,
suggested_action TEXT NOT NULL,
target_system TEXT NOT NULL
CHECK (target_system IN ('crm','calendar','qd_score','whatsapp','email','operator')),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','accepted','dismissed','acted_upon')),
confidence NUMERIC(4,3) NOT NULL DEFAULT 0.8,
acted_by TEXT,
acted_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 8. Admin Action Events ───────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS admin_action_events (
action_event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
action_id TEXT NOT NULL UNIQUE, -- idempotency key from client
action_type TEXT NOT NULL
CHECK (action_type IN (
'user_create','user_deactivate','user_role_change',
'tenant_config_update','inventory_batch_approve',
'inventory_batch_reject','template_publish','template_archive',
'synthetic_job_trigger','synthetic_job_cancel',
'system_health_check','queue_drain','debug_event_export',
'install_register','install_deregister'
)),
target_type TEXT NOT NULL,
target_id TEXT NOT NULL,
requested_by TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::JSONB,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','processing','completed','failed','rejected')),
result_message TEXT,
result_artifacts JSONB NOT NULL DEFAULT '[]'::JSONB,
executed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ─── 9. Surface Sessions (cross-surface telemetry) ───────────────────────────
CREATE TABLE IF NOT EXISTS surface_sessions (
session_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
user_id TEXT NOT NULL,
surface_type TEXT NOT NULL
CHECK (surface_type IN ('webos','ipad','android_tablet',
'iphone_edge','android_phone_edge')),
app_version TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ,
last_active_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
screen_sequence TEXT[] NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}'::JSONB
);
-- ─── Indexes ──────────────────────────────────────────────────────────────────
-- Template taxonomy
CREATE INDEX IF NOT EXISTS idx_tmpl_chapters_tenant ON oracle_template_chapters(tenant_id, is_active);
CREATE INDEX IF NOT EXISTS idx_tmpl_subchapters_chapter ON oracle_template_subchapters(chapter_id, is_active);
CREATE INDEX IF NOT EXISTS idx_tmpl_seed_examples_template ON oracle_template_seed_examples(template_id);
CREATE INDEX IF NOT EXISTS idx_tmpl_seed_examples_chapter ON oracle_template_seed_examples(chapter_id);
-- Synthetic jobs
CREATE INDEX IF NOT EXISTS idx_synthetic_jobs_tenant ON oracle_synthetic_generation_jobs(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_synthetic_jobs_template ON oracle_synthetic_generation_jobs(template_id);
-- Inventory
CREATE INDEX IF NOT EXISTS idx_inv_batches_tenant ON inventory_import_batches(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_inv_props_tenant ON inventory_properties(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_inv_props_batch ON inventory_properties(batch_id);
CREATE INDEX IF NOT EXISTS idx_inv_media_property ON inventory_media_assets(property_id);
-- Edge communication
CREATE INDEX IF NOT EXISTS idx_edge_events_lead ON edge_communication_events(tenant_id, lead_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_edge_events_channel ON edge_communication_events(channel, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_edge_memory_lead ON edge_communication_memory_facts(tenant_id, lead_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_edge_memory_event ON edge_communication_memory_facts(event_id);
-- Transcription
CREATE INDEX IF NOT EXISTS idx_transcription_jobs_event ON edge_transcription_jobs(event_id);
CREATE INDEX IF NOT EXISTS idx_transcription_jobs_status ON edge_transcription_jobs(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_transcript_segments_job ON edge_transcript_segments(transcription_job_id, start_ms);
-- Calendar
CREATE INDEX IF NOT EXISTS idx_calendar_events_owner ON user_calendar_events(tenant_id, owner_user_id, start_at);
CREATE INDEX IF NOT EXISTS idx_calendar_events_lead ON user_calendar_events(lead_id, start_at);
-- Insights
CREATE INDEX IF NOT EXISTS idx_insights_lead ON insight_recommendations(tenant_id, lead_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_insights_status ON insight_recommendations(status, created_at DESC);
-- Admin
CREATE INDEX IF NOT EXISTS idx_admin_actions_tenant ON admin_action_events(tenant_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_admin_actions_type ON admin_action_events(action_type, status);
-- Surface sessions
CREATE INDEX IF NOT EXISTS idx_surface_sessions_user ON surface_sessions(tenant_id, user_id, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_surface_sessions_type ON surface_sessions(surface_type, started_at DESC);