WebOS completion

This commit is contained in:
Sayan Datta
2026-04-18 18:56:05 +05:30
parent 7e3764a8a4
commit cc04e8505f
459 changed files with 11713 additions and 3853 deletions

View File

@@ -0,0 +1,529 @@
"""
routes_admin_surface.py
───────────────────────
Admin Control Plane API
Roles: Only 'admin' or 'superadmin' may access these endpoints.
Endpoints:
GET /admin-surface/health — system health overview
GET /admin-surface/queues — queue depth snapshot
GET /admin-surface/installs — surface session / install overview
POST /admin-surface/actions — submit an admin action
GET /admin-surface/actions — list admin action history
GET /admin-surface/actions/{id} — get a specific action
GET /admin-surface/logs — recent audit event log
GET /admin-surface/templates — template catalog summary (admin view)
POST /admin-surface/templates/{id}/publish — publish a template
POST /admin-surface/templates/{id}/archive — archive a template
GET /admin-surface/synthetic-jobs — list synthetic generation jobs
POST /admin-surface/synthetic-jobs/{id}/cancel — cancel a synthetic job
"""
from __future__ import annotations
import json
import logging
import uuid
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.admin_surface")
router = APIRouter()
# ── RBAC guard ────────────────────────────────────────────────────────────────
ADMIN_ROLES = {"admin", "superadmin", "ADMIN", "SUPERADMIN"}
def require_admin(user=Depends(get_current_user)):
normalized_role = user.role.upper()
if normalized_role not in {"ADMIN", "SUPERADMIN"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required.",
)
return user
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
# ── Pydantic Models ───────────────────────────────────────────────────────────
VALID_ACTION_TYPES = {
"user_create", "user_deactivate", "user_role_change",
"tenant_config_update", "inventory_batch_approve", "inventory_batch_reject",
"template_publish", "template_archive",
"synthetic_job_trigger", "synthetic_job_cancel",
"system_health_check", "queue_drain", "debug_event_export",
"install_register", "install_deregister",
}
class AdminActionRequest(BaseModel):
action_type: str
target_type: str
target_id: str
payload: dict = Field(default_factory=dict)
idempotency_key: Optional[str] = None
# ── System Health ─────────────────────────────────────────────────────────────
@router.get("/health", summary="System health overview")
async def get_health(
request: Request,
admin=Depends(require_admin),
):
"""
Returns an aggregated health snapshot covering DB pool, queue depths,
and basic surface session counts.
"""
pool = _pool(request)
async with pool.acquire() as conn:
# DB round-trip latency
import time
t0 = time.monotonic()
await conn.fetchval("SELECT 1")
db_latency_ms = round((time.monotonic() - t0) * 1000, 2)
# Pending jobs
pending_transcriptions = await conn.fetchval(
"SELECT COUNT(*) FROM edge_transcription_jobs WHERE status='pending'"
)
pending_synthetic_jobs = await conn.fetchval(
"SELECT COUNT(*) FROM oracle_synthetic_generation_jobs WHERE status IN ('pending','running')"
)
pending_admin_actions = await conn.fetchval(
"SELECT COUNT(*) FROM admin_action_events WHERE status='pending'"
)
pending_inventory_batches = await conn.fetchval(
"SELECT COUNT(*) FROM inventory_import_batches WHERE status IN ('pending','validating','processing')"
)
# Active surface sessions (last 30 min)
active_sessions = await conn.fetchval(
"SELECT COUNT(*) FROM surface_sessions WHERE last_active_at > NOW() - INTERVAL '30 minutes'"
)
# Surface breakdown
surface_breakdown = await conn.fetch(
"""
SELECT surface_type, COUNT(*) as count
FROM surface_sessions
WHERE last_active_at > NOW() - INTERVAL '30 minutes'
GROUP BY surface_type
"""
)
return {
"status": "ok",
"timestamp": datetime.now(UTC).isoformat(),
"database": {
"connected": True,
"latency_ms": db_latency_ms,
},
"queues": {
"pending_transcriptions": pending_transcriptions,
"pending_synthetic_jobs": pending_synthetic_jobs,
"pending_admin_actions": pending_admin_actions,
"pending_inventory_batches": pending_inventory_batches,
},
"active_sessions": {
"total": active_sessions,
"by_surface": {r["surface_type"]: r["count"] for r in surface_breakdown},
},
}
# ── Queue Visibility ──────────────────────────────────────────────────────────
@router.get("/queues", summary="Queue depth snapshot")
async def get_queues(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
transcription_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM edge_transcription_jobs
GROUP BY status ORDER BY status
"""
)
synthetic_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM oracle_synthetic_generation_jobs
GROUP BY status ORDER BY status
"""
)
inventory_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM inventory_import_batches
GROUP BY status ORDER BY status
"""
)
admin_queue = await conn.fetch(
"""
SELECT status, COUNT(*) as count
FROM admin_action_events
GROUP BY status ORDER BY status
"""
)
return {
"transcription_jobs": {r["status"]: r["count"] for r in transcription_queue},
"synthetic_jobs": {r["status"]: r["count"] for r in synthetic_queue},
"inventory_batches": {r["status"]: r["count"] for r in inventory_queue},
"admin_actions": {r["status"]: r["count"] for r in admin_queue},
"timestamp": datetime.now(UTC).isoformat(),
}
# ── Install / Surface Overview ────────────────────────────────────────────────
@router.get("/installs", summary="Surface session and install overview")
async def get_installs(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT surface_type, app_version, COUNT(*) as session_count,
MAX(last_active_at) as last_seen
FROM surface_sessions
GROUP BY surface_type, app_version
ORDER BY surface_type, app_version
"""
)
return {
"installs": [dict(r) for r in rows],
"timestamp": datetime.now(UTC).isoformat(),
}
# ── Admin Actions ─────────────────────────────────────────────────────────────
@router.post("/actions", status_code=status.HTTP_201_CREATED, summary="Submit an admin action")
async def submit_action(
request: Request,
body: AdminActionRequest,
admin=Depends(require_admin),
):
"""
Submit a bounded admin action. All actions are persisted with full audit trail.
Supported action_types are enumerated in VALID_ACTION_TYPES.
Actions are not auto-executed — they transition to 'pending' and must be
processed by the appropriate backend job or confirmed by a second admin.
(This prevents destructive mass-actions from running unreviewed.)
"""
if body.action_type not in VALID_ACTION_TYPES:
raise HTTPException(400, f"Invalid action_type. Valid: {sorted(VALID_ACTION_TYPES)}")
action_id = body.idempotency_key or str(uuid.uuid4())
pool = _pool(request)
async with pool.acquire() as conn:
try:
row = await conn.fetchrow(
"""
INSERT INTO admin_action_events (
tenant_id, action_id, action_type, target_type, target_id,
requested_by, payload
) VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb)
RETURNING action_event_id, status, created_at
""",
admin.role, action_id, body.action_type, body.target_type,
body.target_id, admin.user_id, json.dumps(body.payload),
)
except Exception as exc:
if "unique" in str(exc).lower():
raise HTTPException(409, "Action with this idempotency key already exists")
raise
logger.info(
"Admin action submitted: %s by %s%s/%s",
body.action_type, admin.user_id, body.target_type, body.target_id,
)
return {
"action_event_id": str(row["action_event_id"]),
"action_id": action_id,
"status": row["status"],
"created_at": str(row["created_at"]),
}
@router.get("/actions", summary="List admin action history")
async def list_actions(
request: Request,
action_type: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
where = "WHERE tenant_id = $1"
params: list[Any] = [admin.role]
idx = 2
if action_type:
where += f" AND action_type = ${idx}"; params.append(action_type); idx += 1
if status_filter:
where += f" AND status = ${idx}"; params.append(status_filter); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT action_event_id, action_id, action_type, target_type, target_id,
requested_by, status, result_message, executed_at, created_at
FROM admin_action_events
{where}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM admin_action_events {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "actions": [dict(r) for r in rows]}
@router.get("/actions/{action_event_id}", summary="Get a specific admin action")
async def get_action(
action_event_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM admin_action_events WHERE action_event_id=$1 AND tenant_id=$2",
action_event_id, admin.role,
)
if not row:
raise HTTPException(404, "Admin action not found")
return dict(row)
# ── Audit Log ─────────────────────────────────────────────────────────────────
@router.get("/logs", summary="Recent Oracle audit events")
async def get_audit_logs(
request: Request,
entity_type: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
if entity_type:
rows = await conn.fetch(
"""
SELECT audit_event_id, entity_type, entity_id, action, actor_id,
actor_type, correlation_id, details, created_at
FROM oracle_audit_events
WHERE tenant_id=$1 AND entity_type=$2
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
""",
admin.role, entity_type, limit, offset,
)
else:
rows = await conn.fetch(
"""
SELECT audit_event_id, entity_type, entity_id, action, actor_id,
actor_type, correlation_id, details, created_at
FROM oracle_audit_events
WHERE tenant_id=$1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
admin.role, limit, offset,
)
return {"logs": [dict(r) for r in rows]}
# ── Template Administration ───────────────────────────────────────────────────
@router.get("/templates", summary="Template catalog admin view")
async def get_templates_admin(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
where = "WHERE tenant_id = $1"
params: list[Any] = [admin.role]
idx = 2
if status_filter:
where += f" AND status = ${idx}"; params.append(status_filter); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.template_id, t.name, t.category, t.status, t.origin,
t.version, t.use_count, t.chapter_id, t.subchapter_id,
ch.name as chapter_name, sub.name as subchapter_name,
t.created_at, t.updated_at
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
{where}
ORDER BY t.updated_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM oracle_component_templates {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "templates": [dict(r) for r in rows]}
@router.post("/templates/{template_id}/publish", summary="Publish a template")
async def publish_template(
template_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_component_templates
SET status='catalog_active', updated_at=NOW()
WHERE template_id=$1 AND tenant_id=$2
""",
template_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Template not found")
logger.info("Template %s published by admin %s", template_id, admin.user_id)
return {"status": "published"}
@router.post("/templates/{template_id}/archive", summary="Archive a template")
async def archive_template(
template_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_component_templates
SET status='archived', updated_at=NOW()
WHERE template_id=$1 AND tenant_id=$2
""",
template_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Template not found")
logger.info("Template %s archived by admin %s", template_id, admin.user_id)
return {"status": "archived"}
# ── Template Chapter Admin ────────────────────────────────────────────────────
@router.get("/template-chapters", summary="List template chapters (admin view)")
async def list_chapters_admin(
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT ch.chapter_id, ch.name, ch.description, ch.sort_order, ch.is_active,
COUNT(sub.subchapter_id) as subchapter_count
FROM oracle_template_chapters ch
LEFT JOIN oracle_template_subchapters sub ON sub.chapter_id = ch.chapter_id
WHERE ch.tenant_id=$1
GROUP BY ch.chapter_id
ORDER BY ch.sort_order ASC
""",
admin.role,
)
return {"chapters": [dict(r) for r in rows]}
# ── Synthetic Jobs Admin ──────────────────────────────────────────────────────
@router.get("/synthetic-jobs", summary="List synthetic generation jobs")
async def list_synthetic_jobs(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
if status_filter:
rows = await conn.fetch(
"""
SELECT job_id, template_id, model, status, requested_count,
accepted_count, created_by, started_at, completed_at, created_at
FROM oracle_synthetic_generation_jobs
WHERE tenant_id=$1 AND status=$2
ORDER BY created_at DESC LIMIT $3 OFFSET $4
""",
admin.role, status_filter, limit, offset,
)
else:
rows = await conn.fetch(
"""
SELECT job_id, template_id, model, status, requested_count,
accepted_count, created_by, started_at, completed_at, created_at
FROM oracle_synthetic_generation_jobs
WHERE tenant_id=$1
ORDER BY created_at DESC LIMIT $2 OFFSET $3
""",
admin.role, limit, offset,
)
return {"jobs": [dict(r) for r in rows]}
@router.post("/synthetic-jobs/{job_id}/cancel", summary="Cancel a synthetic generation job")
async def cancel_synthetic_job(
job_id: str,
request: Request,
admin=Depends(require_admin),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE oracle_synthetic_generation_jobs
SET status='cancelled', updated_at=NOW()
WHERE job_id=$1 AND tenant_id=$2 AND status IN ('pending','running')
""",
job_id, admin.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Job not found or already in terminal state")
return {"status": "cancelled"}

View File

@@ -0,0 +1,399 @@
"""
routes_inventory.py
───────────────────
Inventory Pipeline API
Endpoints:
POST /inventory/import-batches — create a new import batch
GET /inventory/import-batches — list import batches
GET /inventory/import-batches/{batch_id} — get batch status
POST /inventory/properties — create a single property
GET /inventory/properties — list properties
GET /inventory/properties/{property_id} — get a property
PATCH /inventory/properties/{property_id} — update a property
DELETE /inventory/properties/{property_id} — archive a property
POST /inventory/properties/{property_id}/media — attach media to a property
GET /inventory/properties/{property_id}/media — list media for a property
DELETE /inventory/media/{media_asset_id} — remove a media asset
"""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.inventory")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
# ── Pydantic Models ───────────────────────────────────────────────────────────
VALID_SOURCE_TYPES = {"csv", "json", "api_push", "manual"}
VALID_PROPERTY_STATUSES = {"active", "archived", "draft", "under_review"}
VALID_MEDIA_TYPES = {"image", "video", "floorplan", "brochure", "360", "vr"}
class ImportBatchCreate(BaseModel):
source_type: str
source_file_ref: Optional[str] = None
total_rows: int = 0
class PropertyCreate(BaseModel):
batch_id: Optional[str] = None
source_id: Optional[str] = None
project_name: str
developer_name: str
location: dict = Field(default_factory=dict) # {city, district, lat, lng}
property_type: str
price_bands: list[dict] = Field(default_factory=list)
unit_mix: list[dict] = Field(default_factory=list)
amenities: list[str] = Field(default_factory=list)
status: str = "draft"
validation_state: dict = Field(default_factory=dict)
class PropertyUpdate(BaseModel):
project_name: Optional[str] = None
developer_name: Optional[str] = None
location: Optional[dict] = None
property_type: Optional[str] = None
price_bands: Optional[list[dict]] = None
unit_mix: Optional[list[dict]] = None
amenities: Optional[list[str]] = None
status: Optional[str] = None
validation_state: Optional[dict] = None
class MediaAssetCreate(BaseModel):
media_type: str
url: str
thumbnail_url: Optional[str] = None
sort_order: int = 0
metadata: dict = Field(default_factory=dict)
# ── Import Batches ────────────────────────────────────────────────────────────
@router.post("/import-batches", status_code=status.HTTP_201_CREATED,
summary="Create an inventory import batch")
async def create_import_batch(
request: Request,
body: ImportBatchCreate,
user=Depends(get_current_user),
):
if body.source_type not in VALID_SOURCE_TYPES:
raise HTTPException(400, f"Invalid source_type. Valid: {sorted(VALID_SOURCE_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_import_batches
(tenant_id, source_type, submitted_by, total_rows, source_file_ref)
VALUES ($1, $2, $3, $4, $5)
RETURNING batch_id, status, created_at
""",
user.role, body.source_type, user.user_id, body.total_rows, body.source_file_ref,
)
return dict(row)
@router.get("/import-batches", summary="List import batches")
async def list_import_batches(
request: Request,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT batch_id, source_type, submitted_by, status, total_rows,
accepted_rows, rejected_rows, created_at, completed_at
FROM inventory_import_batches
WHERE tenant_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
user.role, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM inventory_import_batches WHERE tenant_id=$1", user.role,
)
return {"total": total, "limit": limit, "offset": offset, "batches": [dict(r) for r in rows]}
@router.get("/import-batches/{batch_id}", summary="Get import batch status")
async def get_import_batch(
batch_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT * FROM inventory_import_batches WHERE batch_id=$1 AND tenant_id=$2
""",
batch_id, user.role,
)
if not row:
raise HTTPException(404, "Batch not found")
return dict(row)
# ── Properties ────────────────────────────────────────────────────────────────
@router.post("/properties", status_code=status.HTTP_201_CREATED, summary="Create a property")
async def create_property(
request: Request,
body: PropertyCreate,
user=Depends(get_current_user),
):
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_properties (
tenant_id, batch_id, source_id, project_name, developer_name,
location, property_type, price_bands, unit_mix, amenities,
status, validation_state
) VALUES (
$1, $2, $3, $4, $5,
$6::jsonb, $7, $8::jsonb, $9::jsonb, $10,
$11, $12::jsonb
)
RETURNING property_id, created_at
""",
user.role, body.batch_id, body.source_id, body.project_name, body.developer_name,
json.dumps(body.location), body.property_type, json.dumps(body.price_bands),
json.dumps(body.unit_mix), body.amenities,
body.status, json.dumps(body.validation_state),
)
return {"property_id": str(row["property_id"]), "created_at": str(row["created_at"])}
@router.get("/properties", summary="List inventory properties")
async def list_properties(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
property_type: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where_clause = "WHERE tenant_id = $1"
params: list[Any] = [user.role]
idx = 2
if status_filter:
where_clause += f" AND status = ${idx}"
params.append(status_filter)
idx += 1
if property_type:
where_clause += f" AND property_type = ${idx}"
params.append(property_type)
idx += 1
rows = await conn.fetch(
f"""
SELECT property_id, project_name, developer_name, property_type,
location, price_bands, unit_mix, status, ingested_at, created_at
FROM inventory_properties
{where_clause}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM inventory_properties {where_clause}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "properties": [dict(r) for r in rows]}
@router.get("/properties/{property_id}", summary="Get a property")
async def get_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, user.role,
)
if not row:
raise HTTPException(404, "Property not found")
return dict(row)
@router.patch("/properties/{property_id}", summary="Update a property")
async def update_property(
property_id: str,
request: Request,
body: PropertyUpdate,
user=Depends(get_current_user),
):
updates: list[str] = []
values: list[Any] = []
idx = 1
def _add(col: str, val: Any, cast: str = ""):
nonlocal idx
updates.append(f"{col} = ${idx}{cast}")
values.append(val)
idx += 1
if body.project_name is not None: _add("project_name", body.project_name)
if body.developer_name is not None: _add("developer_name", body.developer_name)
if body.location is not None: _add("location", json.dumps(body.location), "::jsonb")
if body.property_type is not None: _add("property_type", body.property_type)
if body.price_bands is not None: _add("price_bands", json.dumps(body.price_bands), "::jsonb")
if body.unit_mix is not None: _add("unit_mix", json.dumps(body.unit_mix), "::jsonb")
if body.amenities is not None: _add("amenities", body.amenities)
if body.status is not None:
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
_add("status", body.status)
if body.validation_state is not None:
_add("validation_state", json.dumps(body.validation_state), "::jsonb")
if not updates:
raise HTTPException(400, "No fields to update")
_add("updated_at", datetime.now(UTC))
values.extend([property_id, user.role])
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
f"""
UPDATE inventory_properties
SET {', '.join(updates)}
WHERE property_id=${idx} AND tenant_id=${idx+1}
""",
*values,
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "updated"}
@router.delete("/properties/{property_id}", summary="Archive a property")
async def archive_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE inventory_properties
SET status='archived', updated_at=NOW()
WHERE property_id=$1 AND tenant_id=$2
""",
property_id, user.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "archived"}
# ── Media Assets ──────────────────────────────────────────────────────────────
@router.post("/properties/{property_id}/media", status_code=status.HTTP_201_CREATED,
summary="Attach media to a property")
async def add_media(
property_id: str,
request: Request,
body: MediaAssetCreate,
user=Depends(get_current_user),
):
if body.media_type not in VALID_MEDIA_TYPES:
raise HTTPException(400, f"Invalid media_type. Valid: {sorted(VALID_MEDIA_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
# Verify property belongs to tenant
exists = await conn.fetchval(
"SELECT 1 FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, user.role,
)
if not exists:
raise HTTPException(404, "Property not found")
row = await conn.fetchrow(
"""
INSERT INTO inventory_media_assets
(property_id, tenant_id, media_type, url, thumbnail_url, sort_order, metadata, uploaded_by)
VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8)
RETURNING media_asset_id, created_at
""",
property_id, user.role, body.media_type, body.url, body.thumbnail_url,
body.sort_order, json.dumps(body.metadata), user.user_id,
)
return {"media_asset_id": str(row["media_asset_id"]), "created_at": str(row["created_at"])}
@router.get("/properties/{property_id}/media", summary="List media for a property")
async def list_media(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT media_asset_id, media_type, url, thumbnail_url, sort_order, metadata, created_at
FROM inventory_media_assets
WHERE property_id=$1 AND tenant_id=$2
ORDER BY sort_order ASC, created_at ASC
""",
property_id, user.role,
)
return {"media": [dict(r) for r in rows]}
@router.delete("/media/{media_asset_id}", summary="Remove a media asset")
async def delete_media(
media_asset_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM inventory_media_assets WHERE media_asset_id=$1 AND tenant_id=$2",
media_asset_id, user.role,
)
if result == "DELETE 0":
raise HTTPException(404, "Media asset not found")
return {"status": "deleted"}

View File

@@ -0,0 +1,635 @@
"""
routes_mobile_edge.py
─────────────────────
Mobile Edge API — serves iPhone Edge and Android Phone Edge apps.
Surfaces:
GET /mobile-edge/events — communication events for a lead
POST /mobile-edge/events — log a new communication event
GET /mobile-edge/memory — memory facts for a lead
POST /mobile-edge/imports — operator-assisted import of a recording/note
POST /mobile-edge/notes — quick note attached to a lead
GET /mobile-edge/calendar — calendar events for the authed user
POST /mobile-edge/calendar — create a calendar event
PATCH /mobile-edge/calendar/{id} — update a calendar event
DELETE /mobile-edge/calendar/{id} — cancel a calendar event
GET /mobile-edge/transcripts/{id} — transcript segments for an event
GET /mobile-edge/insights/{lead_id}— insight recommendations for a lead
POST /mobile-edge/insights/{id}/act — act on or dismiss an insight
GET /mobile-edge/alerts — active alerts for the authed user
POST /mobile-edge/session — register a surface session heartbeat
"""
from __future__ import annotations
import logging
import uuid
from datetime import UTC, datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.mobile_edge")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
def _now() -> str:
return datetime.now(UTC).isoformat()
# ── Pydantic models ───────────────────────────────────────────────────────────
VALID_CHANNELS = {
"pstn", "whatsapp_message", "whatsapp_voice", "whatsapp_video",
"email", "facebook_message", "instagram_message", "in_app_voip", "manual_note",
}
VALID_CAPTURE_MODES = {"direct_api", "provider_routed", "operator_import", "operator_note"}
VALID_DIRECTIONS = {"inbound", "outbound"}
VALID_CONSENT = {"unknown", "granted", "denied", "not_required"}
class CommunicationEventCreate(BaseModel):
lead_id: str
channel: str
direction: str = "inbound"
provider: Optional[str] = None
capture_mode: str
consent_state: str = "unknown"
duration_seconds: Optional[int] = None
summary: Optional[str] = None
raw_reference: Optional[str] = None
recording_ref: Optional[str] = None
provider_metadata: dict = Field(default_factory=dict)
class ImportCreate(BaseModel):
lead_id: str
channel: str
capture_mode: str = "operator_import"
recording_ref: Optional[str] = None
summary: Optional[str] = None
consent_state: str = "granted"
class NoteCreate(BaseModel):
lead_id: str
note_text: str
fact_type: str = "custom"
effective_date: Optional[str] = None
class CalendarEventCreate(BaseModel):
lead_id: Optional[str] = None
source_event_id: Optional[str] = None
title: str
description: Optional[str] = None
start_at: str # ISO8601
end_at: str # ISO8601
all_day: bool = False
reminder_minutes: list[int] = Field(default_factory=lambda: [15])
location: Optional[str] = None
metadata: dict = Field(default_factory=dict)
class CalendarEventUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
start_at: Optional[str] = None
end_at: Optional[str] = None
status: Optional[str] = None
reminder_minutes: Optional[list[int]] = None
location: Optional[str] = None
class InsightActionRequest(BaseModel):
action: str = Field(..., pattern="^(accepted|dismissed|acted_upon)$")
class SessionHeartbeat(BaseModel):
surface_type: str
app_version: str
screen: Optional[str] = None
metadata: dict = Field(default_factory=dict)
# ── Communication Events ───────────────────────────────────────────────────────
@router.get("/events", summary="List communication events for a lead")
async def list_events(
request: Request,
lead_id: str = Query(..., description="Lead ID to fetch events for"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
"""Return paginated communication events for a given lead, newest first."""
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_id, lead_id, channel, direction, provider, capture_mode,
consent_state, timestamp, duration_seconds, summary, raw_reference,
recording_ref, provider_metadata, created_at
FROM edge_communication_events
WHERE tenant_id = $1 AND lead_id = $2
ORDER BY timestamp DESC
LIMIT $3 OFFSET $4
""",
user.role, # tenant_id derived from role scope; production uses dedicated tenant field
lead_id, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM edge_communication_events WHERE tenant_id = $1 AND lead_id = $2",
user.role, lead_id,
)
return {
"total": total,
"limit": limit,
"offset": offset,
"events": [dict(r) for r in rows],
}
@router.post("/events", status_code=status.HTTP_201_CREATED, summary="Log a communication event")
async def create_event(
request: Request,
body: CommunicationEventCreate,
user=Depends(get_current_user),
):
"""
Create a new communication event record.
Supports all three capture modes: direct_api, provider_routed, operator_import.
"""
if body.channel not in VALID_CHANNELS:
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
if body.capture_mode not in VALID_CAPTURE_MODES:
raise HTTPException(400, f"Invalid capture_mode. Valid: {sorted(VALID_CAPTURE_MODES)}")
if body.direction not in VALID_DIRECTIONS:
raise HTTPException(400, "direction must be 'inbound' or 'outbound'")
if body.consent_state not in VALID_CONSENT:
raise HTTPException(400, f"Invalid consent_state. Valid: {sorted(VALID_CONSENT)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO edge_communication_events (
tenant_id, lead_id, channel, direction, provider, capture_mode,
consent_state, duration_seconds, summary, raw_reference,
recording_ref, provider_metadata
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12::jsonb)
RETURNING event_id, created_at
""",
user.role, body.lead_id, body.channel, body.direction, body.provider,
body.capture_mode, body.consent_state, body.duration_seconds,
body.summary, body.raw_reference, body.recording_ref,
json.dumps(body.provider_metadata),
)
logger.info("Created communication event %s for lead %s", row["event_id"], body.lead_id)
return {"event_id": str(row["event_id"]), "created_at": str(row["created_at"])}
# ── Communication Memory Facts ────────────────────────────────────────────────
@router.get("/memory", summary="List memory facts for a lead")
async def list_memory_facts(
request: Request,
lead_id: str = Query(...),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT fact_id, lead_id, event_id, fact_type, fact_text,
effective_date, confidence, extracted_from, is_confirmed,
confirmed_by, confirmed_at, created_at
FROM edge_communication_memory_facts
WHERE tenant_id = $1 AND lead_id = $2
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
""",
user.role, lead_id, limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM edge_communication_memory_facts WHERE tenant_id=$1 AND lead_id=$2",
user.role, lead_id,
)
return {"total": total, "limit": limit, "offset": offset, "facts": [dict(r) for r in rows]}
# ── Operator-Assisted Import ──────────────────────────────────────────────────
@router.post("/imports", status_code=status.HTTP_201_CREATED, summary="Operator-assisted import")
async def create_import(
request: Request,
body: ImportCreate,
user=Depends(get_current_user),
):
"""
Mode C import: user uploads recording ref or confirms a note manually.
Creates an event with capture_mode = 'operator_import' and triggers a
transcription job if a recording_ref is supplied.
"""
if body.channel not in VALID_CHANNELS:
raise HTTPException(400, f"Invalid channel. Valid: {sorted(VALID_CHANNELS)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
async with conn.transaction():
event_row = await conn.fetchrow(
"""
INSERT INTO edge_communication_events (
tenant_id, lead_id, channel, direction, capture_mode,
consent_state, recording_ref, summary
) VALUES ($1,$2,$3,'inbound',$4,$5,$6,$7)
RETURNING event_id, created_at
""",
user.role, body.lead_id, body.channel, body.capture_mode,
body.consent_state, body.recording_ref, body.summary,
)
event_id = event_row["event_id"]
job_id = None
if body.recording_ref:
job_row = await conn.fetchrow(
"""
INSERT INTO edge_transcription_jobs (
tenant_id, event_id, media_type, consent_state
) VALUES ($1,$2,'audio',$3)
RETURNING transcription_job_id
""",
user.role, event_id, body.consent_state,
)
job_id = str(job_row["transcription_job_id"])
return {
"event_id": str(event_id),
"transcription_job_id": job_id,
"created_at": str(event_row["created_at"]),
}
# ── Quick Notes ───────────────────────────────────────────────────────────────
@router.post("/notes", status_code=status.HTTP_201_CREATED, summary="Create a quick note for a lead")
async def create_note(
request: Request,
body: NoteCreate,
user=Depends(get_current_user),
):
"""
Create a manual memory fact from an operator note.
No event is created — this is a direct fact insertion.
"""
pool = _pool(request)
from datetime import date
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO edge_communication_memory_facts (
tenant_id, lead_id, fact_type, fact_text, effective_date,
extracted_from, confidence, is_confirmed
) VALUES ($1,$2,$3,$4,$5,'operator_note',1.0, TRUE)
RETURNING fact_id, created_at
""",
user.role, body.lead_id, body.fact_type, body.note_text,
body.effective_date,
)
return {"fact_id": str(row["fact_id"]), "created_at": str(row["created_at"])}
# ── Calendar ──────────────────────────────────────────────────────────────────
@router.get("/calendar", summary="Get calendar events for the authed user")
async def list_calendar_events(
request: Request,
from_date: Optional[str] = Query(None),
to_date: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
if from_date and to_date:
rows = await conn.fetch(
"""
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
all_day, status, reminder_minutes, created_by, location, metadata, created_at
FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
AND start_at >= $3::timestamptz AND end_at <= $4::timestamptz
ORDER BY start_at ASC LIMIT $5
""",
user.role, user.user_id, from_date, to_date, limit,
)
else:
rows = await conn.fetch(
"""
SELECT calendar_event_id, lead_id, title, description, start_at, end_at,
all_day, status, reminder_minutes, created_by, location, metadata, created_at
FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
ORDER BY start_at ASC LIMIT $3
""",
user.role, user.user_id, limit,
)
return {"events": [dict(r) for r in rows]}
@router.post("/calendar", status_code=status.HTTP_201_CREATED, summary="Create a calendar event")
async def create_calendar_event(
request: Request,
body: CalendarEventCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO user_calendar_events (
tenant_id, owner_user_id, lead_id, source_event_id, title, description,
start_at, end_at, all_day, reminder_minutes, created_by, location, metadata
) VALUES ($1,$2,$3,$4,$5,$6,$7::timestamptz,$8::timestamptz,$9,$10,$11,$12,$13::jsonb)
RETURNING calendar_event_id, created_at
""",
user.role, user.user_id, body.lead_id, body.source_event_id,
body.title, body.description, body.start_at, body.end_at,
body.all_day, body.reminder_minutes, "user",
body.location, json.dumps(body.metadata),
)
return {"calendar_event_id": str(row["calendar_event_id"]), "created_at": str(row["created_at"])}
@router.patch("/calendar/{calendar_event_id}", summary="Update a calendar event")
async def update_calendar_event(
calendar_event_id: str,
request: Request,
body: CalendarEventUpdate,
user=Depends(get_current_user),
):
pool = _pool(request)
# Build partial update
updates: list[str] = []
values: list[Any] = []
idx = 1
def _add(col: str, val: Any):
nonlocal idx
updates.append(f"{col} = ${idx}")
values.append(val)
idx += 1
if body.title is not None: _add("title", body.title)
if body.description is not None: _add("description", body.description)
if body.start_at is not None: _add("start_at", body.start_at)
if body.end_at is not None: _add("end_at", body.end_at)
if body.status is not None: _add("status", body.status)
if body.reminder_minutes is not None: _add("reminder_minutes", body.reminder_minutes)
if body.location is not None: _add("location", body.location)
if not updates:
raise HTTPException(400, "No fields to update")
_add("updated_at", datetime.now(UTC))
_add("tenant_id", user.role)
_add("owner_user_id", user.user_id)
values.append(calendar_event_id)
async with pool.acquire() as conn:
result = await conn.execute(
f"""
UPDATE user_calendar_events
SET {', '.join(updates)}
WHERE tenant_id=${idx} AND owner_user_id=${idx+1} AND calendar_event_id=${idx+2}
""",
*values,
)
if result == "UPDATE 0":
raise HTTPException(404, "Calendar event not found or not owned by you")
return {"status": "updated"}
@router.delete("/calendar/{calendar_event_id}", summary="Cancel a calendar event")
async def delete_calendar_event(
calendar_event_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE user_calendar_events
SET status='cancelled', updated_at=NOW()
WHERE tenant_id=$1 AND owner_user_id=$2 AND calendar_event_id=$3
""",
user.role, user.user_id, calendar_event_id,
)
if result == "UPDATE 0":
raise HTTPException(404, "Calendar event not found or not owned by you")
return {"status": "cancelled"}
# ── Transcripts ───────────────────────────────────────────────────────────────
@router.get("/transcripts/{event_id}", summary="Get transcript segments for an event")
async def get_transcript(
event_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
job = await conn.fetchrow(
"""
SELECT j.transcription_job_id, j.status, j.provider, j.speaker_count,
j.word_count, j.language, j.completed_at
FROM edge_transcription_jobs j
JOIN edge_communication_events e ON e.event_id = j.event_id
WHERE j.event_id = $1 AND e.tenant_id = $2
ORDER BY j.created_at DESC LIMIT 1
""",
event_id, user.role,
)
if not job:
raise HTTPException(404, "No transcription job found for this event")
segments = await conn.fetch(
"""
SELECT segment_id, speaker_label, start_ms, end_ms, text, confidence, is_agent_turn
FROM edge_transcript_segments
WHERE transcription_job_id = $1
ORDER BY start_ms ASC
""",
job["transcription_job_id"],
)
return {
"job": dict(job),
"segments": [dict(s) for s in segments],
}
# ── Insights ──────────────────────────────────────────────────────────────────
@router.get("/insights/{lead_id}", summary="Get insight recommendations for a lead")
async def get_insights(
lead_id: str,
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
limit: int = Query(20, ge=1, le=100),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
if status_filter:
rows = await conn.fetch(
"""
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
summary, suggested_action, target_system, status, confidence, created_at
FROM insight_recommendations
WHERE tenant_id=$1 AND lead_id=$2 AND status=$3
ORDER BY created_at DESC LIMIT $4
""",
user.role, lead_id, status_filter, limit,
)
else:
rows = await conn.fetch(
"""
SELECT recommendation_id, lead_id, source_event_id, recommendation_type,
summary, suggested_action, target_system, status, confidence, created_at
FROM insight_recommendations
WHERE tenant_id=$1 AND lead_id=$2
ORDER BY created_at DESC LIMIT $3
""",
user.role, lead_id, limit,
)
return {"insights": [dict(r) for r in rows]}
@router.post("/insights/{recommendation_id}/act", summary="Act on or dismiss an insight")
async def act_on_insight(
recommendation_id: str,
request: Request,
body: InsightActionRequest,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE insight_recommendations
SET status=$1, acted_by=$2, acted_at=NOW(), updated_at=NOW()
WHERE recommendation_id=$3 AND tenant_id=$4
""",
body.action, user.user_id, recommendation_id, user.role,
)
if result == "UPDATE 0":
raise HTTPException(404, "Insight not found")
return {"status": body.action}
# ── Alerts ────────────────────────────────────────────────────────────────────
@router.get("/alerts", summary="Get active alerts for the authed user")
async def get_alerts(
request: Request,
user=Depends(get_current_user),
):
"""
Returns a combined, prioritized view of:
- Pending insights needing action
- Calendar events due within 24 hours
- Pending transcription jobs
"""
pool = _pool(request)
async with pool.acquire() as conn:
pending_insights = await conn.fetchval(
"SELECT COUNT(*) FROM insight_recommendations WHERE tenant_id=$1 AND status='pending'",
user.role,
)
upcoming_events = await conn.fetchval(
"""
SELECT COUNT(*) FROM user_calendar_events
WHERE tenant_id=$1 AND owner_user_id=$2
AND status='confirmed'
AND start_at BETWEEN NOW() AND NOW() + INTERVAL '24 hours'
""",
user.role, user.user_id,
)
pending_transcriptions = await conn.fetchval(
"SELECT COUNT(*) FROM edge_transcription_jobs WHERE tenant_id=$1 AND status='pending'",
user.role,
)
return {
"pending_insights": pending_insights,
"upcoming_calendar_events_24h": upcoming_events,
"pending_transcriptions": pending_transcriptions,
"generated_at": _now(),
}
# ── Session Heartbeat ─────────────────────────────────────────────────────────
@router.post("/session", status_code=status.HTTP_200_OK, summary="Register surface session heartbeat")
async def session_heartbeat(
request: Request,
body: SessionHeartbeat,
user=Depends(get_current_user),
):
"""Upsert a surface session to track cross-surface activity."""
valid_surfaces = {
"webos", "ipad", "android_tablet", "iphone_edge", "android_phone_edge",
}
if body.surface_type not in valid_surfaces:
raise HTTPException(400, f"Invalid surface_type. Valid: {sorted(valid_surfaces)}")
pool = _pool(request)
import json
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO surface_sessions (tenant_id, user_id, surface_type, app_version, metadata)
VALUES ($1, $2, $3, $4, $5::jsonb)
ON CONFLICT DO NOTHING
RETURNING session_id
""",
user.role, user.user_id, body.surface_type, body.app_version,
json.dumps(body.metadata),
)
# Update last_active + screen_sequence for existing session (within 30 min)
if body.screen and row is None:
await conn.execute(
"""
UPDATE surface_sessions
SET last_active_at=NOW(),
screen_sequence = array_append(screen_sequence, $1)
WHERE tenant_id=$2 AND user_id=$3 AND surface_type=$4
AND last_active_at > NOW() - INTERVAL '30 minutes'
""",
body.screen, user.role, user.user_id, body.surface_type,
)
return {"status": "ok", "timestamp": _now()}

View File

@@ -0,0 +1,398 @@
"""
routes_oracle_templates.py
──────────────────────────
Oracle Template Catalog API
Extends the existing Oracle route surface with template taxonomy and seeding.
Endpoints:
GET /oracle/template-chapters — list chapters
POST /oracle/template-chapters — create a chapter
GET /oracle/template-subchapters — list subchapters (optionally filtered)
POST /oracle/template-subchapters — create a subchapter
GET /oracle/component-templates — list templates (filterable)
POST /oracle/component-templates — create a template
GET /oracle/component-templates/{id} — get a template
POST /oracle/component-templates/{id}/seed — add a seed example
GET /oracle/component-templates/{id}/seed — list seed examples for a template
POST /oracle/component-templates/synthetic-jobs — trigger a Kimi synthetic job
"""
from __future__ import annotations
import json
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.oracle_templates")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(503, "Database unavailable.")
return pool
# ── Models ────────────────────────────────────────────────────────────────────
class ChapterCreate(BaseModel):
name: str
description: Optional[str] = None
sort_order: int = 0
class SubchapterCreate(BaseModel):
chapter_id: str
name: str
description: Optional[str] = None
sort_order: int = 0
class TemplateCreate(BaseModel):
name: str
category: str
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
component_type: Optional[str] = None
accepted_shapes: list[str] = Field(default_factory=list)
json_template: Optional[dict] = None
description: Optional[str] = None
origin: str = "premade"
version: str = "1.0.0"
class SeedExampleCreate(BaseModel):
title: str
example_json: dict
quality_notes: Optional[str] = None
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
is_canonical: bool = False
class SyntheticJobCreate(BaseModel):
template_id: str
chapter_id: Optional[str] = None
subchapter_id: Optional[str] = None
model: str = "kimi"
requested_count: int = Field(10, ge=1, le=500)
# ── Template Chapters ─────────────────────────────────────────────────────────
@router.get("/template-chapters", summary="List Oracle template chapters")
async def list_template_chapters(
request: Request,
include_inactive: bool = Query(False),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where = "WHERE ch.tenant_id=$1" + ("" if include_inactive else " AND ch.is_active=TRUE")
rows = await conn.fetch(
f"""
SELECT ch.chapter_id, ch.name, ch.description, ch.sort_order, ch.is_active,
COUNT(sub.subchapter_id) FILTER (WHERE sub.is_active=TRUE) as subchapter_count,
COUNT(t.template_id) as template_count
FROM oracle_template_chapters ch
LEFT JOIN oracle_template_subchapters sub ON sub.chapter_id = ch.chapter_id
LEFT JOIN oracle_component_templates t ON t.chapter_id = ch.chapter_id
AND t.status != 'archived'
{where}
GROUP BY ch.chapter_id
ORDER BY ch.sort_order ASC
""",
user.role,
)
return {"chapters": [dict(r) for r in rows]}
@router.post("/template-chapters", status_code=status.HTTP_201_CREATED,
summary="Create a template chapter")
async def create_template_chapter(
request: Request,
body: ChapterCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_chapters (tenant_id, name, description, sort_order)
VALUES ($1,$2,$3,$4)
RETURNING chapter_id, created_at
""",
user.role, body.name, body.description, body.sort_order,
)
return {"chapter_id": str(row["chapter_id"]), "created_at": str(row["created_at"])}
# ── Template Subchapters ──────────────────────────────────────────────────────
@router.get("/template-subchapters", summary="List Oracle template subchapters")
async def list_template_subchapters(
request: Request,
chapter_id: Optional[str] = Query(None),
include_inactive: bool = Query(False),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where = "WHERE sub.tenant_id=$1"
params: list[Any] = [user.role]
idx = 2
if not include_inactive:
where += " AND sub.is_active=TRUE"
if chapter_id:
where += f" AND sub.chapter_id=${idx}"; params.append(chapter_id); idx += 1
rows = await conn.fetch(
f"""
SELECT sub.subchapter_id, sub.chapter_id, ch.name as chapter_name,
sub.name, sub.description, sub.sort_order, sub.is_active,
COUNT(t.template_id) as template_count
FROM oracle_template_subchapters sub
JOIN oracle_template_chapters ch ON ch.chapter_id = sub.chapter_id
LEFT JOIN oracle_component_templates t ON t.subchapter_id = sub.subchapter_id
AND t.status != 'archived'
{where}
GROUP BY sub.subchapter_id, ch.name
ORDER BY sub.chapter_id, sub.sort_order ASC
""",
*params,
)
return {"subchapters": [dict(r) for r in rows]}
@router.post("/template-subchapters", status_code=status.HTTP_201_CREATED,
summary="Create a template subchapter")
async def create_template_subchapter(
request: Request,
body: SubchapterCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
# Verify chapter exists and belongs to tenant
ch_exists = await conn.fetchval(
"SELECT 1 FROM oracle_template_chapters WHERE chapter_id=$1 AND tenant_id=$2",
body.chapter_id, user.role,
)
if not ch_exists:
raise HTTPException(404, "Chapter not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_subchapters
(chapter_id, tenant_id, name, description, sort_order)
VALUES ($1,$2,$3,$4,$5)
RETURNING subchapter_id, created_at
""",
body.chapter_id, user.role, body.name, body.description, body.sort_order,
)
return {"subchapter_id": str(row["subchapter_id"]), "created_at": str(row["created_at"])}
# ── Component Templates ───────────────────────────────────────────────────────
@router.get("/component-templates", summary="List Oracle component templates")
async def list_component_templates(
request: Request,
chapter_id: Optional[str] = Query(None),
subchapter_id: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
search: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
where = "WHERE t.tenant_id=$1"
params: list[Any] = [user.role]
idx = 2
if chapter_id:
where += f" AND t.chapter_id=${idx}"; params.append(chapter_id); idx += 1
if subchapter_id:
where += f" AND t.subchapter_id=${idx}"; params.append(subchapter_id); idx += 1
if status_filter:
where += f" AND t.status=${idx}"; params.append(status_filter); idx += 1
if search:
where += f" AND (t.name ILIKE ${idx} OR t.description ILIKE ${idx})"
params.append(f"%{search}%"); idx += 1
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.template_id, t.name, t.category, t.status, t.origin, t.version,
t.accepted_shapes, t.use_count, t.chapter_id, t.subchapter_id,
t.description, ch.name as chapter_name, sub.name as subchapter_name,
t.created_at, t.updated_at
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
{where}
ORDER BY t.updated_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM oracle_component_templates t {where}", *params,
)
return {"total": total, "limit": limit, "offset": offset, "templates": [dict(r) for r in rows]}
@router.post("/component-templates", status_code=status.HTTP_201_CREATED,
summary="Create a component template")
async def create_component_template(
request: Request,
body: TemplateCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO oracle_component_templates (
tenant_id, name, category, chapter_id, subchapter_id,
accepted_shapes, json_template, description, origin, version, status
) VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8,$9,$10,'draft')
RETURNING template_id, created_at
""",
user.role, body.name, body.category, body.chapter_id, body.subchapter_id,
body.accepted_shapes,
json.dumps(body.json_template) if body.json_template else None,
body.description, body.origin, body.version,
)
return {"template_id": str(row["template_id"]), "created_at": str(row["created_at"])}
@router.get("/component-templates/{template_id}", summary="Get a component template")
async def get_component_template(
template_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT t.*, ch.name as chapter_name, sub.name as subchapter_name
FROM oracle_component_templates t
LEFT JOIN oracle_template_chapters ch ON ch.chapter_id = t.chapter_id
LEFT JOIN oracle_template_subchapters sub ON sub.subchapter_id = t.subchapter_id
WHERE t.template_id=$1 AND t.tenant_id=$2
""",
template_id, user.role,
)
if not row:
raise HTTPException(404, "Template not found")
return dict(row)
# ── Seed Examples ─────────────────────────────────────────────────────────────
@router.post("/component-templates/{template_id}/seed", status_code=status.HTTP_201_CREATED,
summary="Add a seed example to a template")
async def add_seed_example(
template_id: str,
request: Request,
body: SeedExampleCreate,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval(
"SELECT 1 FROM oracle_component_templates WHERE template_id=$1 AND tenant_id=$2",
template_id, user.role,
)
if not exists:
raise HTTPException(404, "Template not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_template_seed_examples (
template_id, chapter_id, subchapter_id, title, example_json,
quality_notes, is_canonical
) VALUES ($1,$2,$3,$4,$5::jsonb,$6,$7)
RETURNING example_id, created_at
""",
template_id, body.chapter_id, body.subchapter_id, body.title,
json.dumps(body.example_json), body.quality_notes, body.is_canonical,
)
return {"example_id": str(row["example_id"]), "created_at": str(row["created_at"])}
@router.get("/component-templates/{template_id}/seed", summary="List seed examples for a template")
async def list_seed_examples(
template_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT example_id, title, example_json, quality_notes, is_canonical, created_at
FROM oracle_template_seed_examples
WHERE template_id=$1
ORDER BY is_canonical DESC, created_at ASC
""",
template_id,
)
return {"examples": [dict(r) for r in rows]}
# ── Synthetic Jobs ────────────────────────────────────────────────────────────
@router.post("/component-templates/synthetic-jobs", status_code=status.HTTP_201_CREATED,
summary="Trigger a Kimi synthetic data generation job")
async def trigger_synthetic_job(
request: Request,
body: SyntheticJobCreate,
user=Depends(get_current_user),
):
"""
Queues a Kimi synthetic data expansion job for a template.
The job will be picked up by the background synthetic generation worker.
"""
pool = _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval(
"SELECT 1 FROM oracle_component_templates WHERE template_id=$1 AND tenant_id=$2",
body.template_id, user.role,
)
if not exists:
raise HTTPException(404, "Template not found")
row = await conn.fetchrow(
"""
INSERT INTO oracle_synthetic_generation_jobs (
tenant_id, template_id, chapter_id, subchapter_id,
model, requested_count, created_by
) VALUES ($1,$2,$3,$4,$5,$6,$7)
RETURNING job_id, status, created_at
""",
user.role, body.template_id, body.chapter_id, body.subchapter_id,
body.model, body.requested_count, user.user_id,
)
logger.info(
"Synthetic job queued: %s for template %s (%d examples)",
row["job_id"], body.template_id, body.requested_count,
)
return {
"job_id": str(row["job_id"]),
"status": row["status"],
"created_at": str(row["created_at"]),
}