Files
Velocity-OS/core/api/api/routes_comms.py

589 lines
24 KiB
Python

"""
Velocity Conversations API.
Native WhatsApp-first communications surface for Velocity WebOS. The routes are
provider-abstracted and CRM-aware, while remaining safe to run in mock mode.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from backend.auth.dependencies import UserPrincipal, get_current_user
from backend.services.comms_evolution_provider import EvolutionProvider
from backend.services.comms_ingest import ingest_inbound_message
from backend.services.comms_provider import MockProvider
from backend.services.comms_waha_provider import WahaProvider
router = APIRouter()
_SCHEMA_READY = False
class SendMessageBody(BaseModel):
messageType: str = "text"
body: str
mediaUrl: str | None = None
templateName: str | None = None
templateLanguage: str | None = None
class LinkPersonBody(BaseModel):
personId: str
class NoteBody(BaseModel):
content: str
class TaskBody(BaseModel):
title: str
dueAt: str | None = None
class SettingsPatch(BaseModel):
provider: str | None = None
providerBaseUrl: str | None = None
providerApiKey: str | None = None
instanceId: str | None = None
phoneNumberId: str | None = None
webhookCallbackUrl: str | None = None
webhookSecret: str | None = None
defaultAssignmentUserId: str | None = None
autoLinkByPhone: bool | None = None
createCrmInteractionOnInbound: bool | None = None
defaultCountryCode: str | None = None
transcriptionProvider: str | None = None
class TranscribeBody(BaseModel):
callId: str | None = None
recordingUrl: str | None = None
def _get_provider():
return _provider_from_config({})
def _provider_from_config(config: dict[str, Any], provider_override: str | None = None):
provider = (provider_override or config.get("provider") or os.getenv("COMMS_PROVIDER", "mock")).strip().lower()
base_url = (config.get("provider_base_url") or os.getenv("COMMS_PROVIDER_BASE_URL", "")).strip()
api_key = (config.get("provider_api_key") or os.getenv("COMMS_PROVIDER_API_KEY", "")).strip()
instance_id = (config.get("instance_id") or os.getenv("COMMS_INSTANCE_ID", "")).strip() or None
if provider == "waha":
return WahaProvider(base_url, api_key, instance_id)
if provider == "evolution":
return EvolutionProvider(base_url, api_key, instance_id)
return MockProvider("", "", "mock")
async def _load_config(pool) -> dict[str, Any]:
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json FROM comms_settings WHERE key = 'config'")
return _json_obj(row["value_json"]) if row else {}
async def _get_provider_for_pool(pool, provider_override: str | None = None):
return _provider_from_config(await _load_config(pool), provider_override)
def _camel_settings(config: dict[str, Any], updated_at: datetime | None = None) -> dict[str, Any]:
return {
"provider": config.get("provider", os.getenv("COMMS_PROVIDER", "mock")),
"providerBaseUrl": config.get("provider_base_url", os.getenv("COMMS_PROVIDER_BASE_URL", "")),
"providerApiKey": config.get("provider_api_key", ""),
"instanceId": config.get("instance_id", os.getenv("COMMS_INSTANCE_ID", "")),
"phoneNumberId": config.get("phone_number_id", ""),
"webhookCallbackUrl": config.get("webhook_callback_url", "/api/comms/webhooks/{provider}"),
"webhookSecretSet": bool(config.get("webhook_secret_hash") or config.get("webhook_secret_set")),
"defaultAssignmentUserId": config.get("default_assignment_user_id"),
"autoLinkByPhone": bool(config.get("auto_link_by_phone", True)),
"createCrmInteractionOnInbound": bool(config.get("create_crm_interaction_on_inbound", True)),
"defaultCountryCode": str(config.get("default_country_code", os.getenv("COMMS_DEFAULT_COUNTRY_CODE", "91"))),
"mediaStorageDir": config.get("media_storage_dir", os.getenv("COMMS_MEDIA_STORAGE_DIR", "/opt/dlami/nvme/assets/comms")),
"transcriptionProvider": config.get("transcription_provider", os.getenv("COMMS_TRANSCRIPTION_PROVIDER", "none")),
**({"updatedAt": updated_at.isoformat()} if updated_at else {}),
}
def _snake_settings(body: SettingsPatch) -> dict[str, Any]:
mapping = {
"provider": "provider",
"providerBaseUrl": "provider_base_url",
"providerApiKey": "provider_api_key",
"instanceId": "instance_id",
"phoneNumberId": "phone_number_id",
"webhookCallbackUrl": "webhook_callback_url",
"defaultAssignmentUserId": "default_assignment_user_id",
"autoLinkByPhone": "auto_link_by_phone",
"createCrmInteractionOnInbound": "create_crm_interaction_on_inbound",
"defaultCountryCode": "default_country_code",
"transcriptionProvider": "transcription_provider",
}
raw = body.model_dump(exclude_unset=True)
updates: dict[str, Any] = {}
for src, dst in mapping.items():
if src in raw:
updates[dst] = raw[src]
if body.webhookSecret is not None:
updates["webhook_secret_hash"] = hashlib.sha256(body.webhookSecret.encode()).hexdigest() if body.webhookSecret else ""
updates["webhook_secret_set"] = bool(body.webhookSecret)
return updates
def _json_obj(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
return {}
def _record_value(row: Any, key: str, default: Any = None) -> Any:
try:
return row[key]
except (KeyError, IndexError, TypeError):
return default
async def _ensure_schema(pool) -> None:
global _SCHEMA_READY
if _SCHEMA_READY:
return
async with pool.acquire() as conn:
await conn.execute(
"""
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS comms_threads (
thread_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
provider TEXT NOT NULL DEFAULT 'mock',
external_thread_id TEXT,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
phone_e164 TEXT NOT NULL,
display_name TEXT,
channel TEXT NOT NULL DEFAULT 'whatsapp',
status TEXT NOT NULL DEFAULT 'open',
assigned_user_id UUID NULL,
last_message_at TIMESTAMPTZ,
unread_count INT NOT NULL DEFAULT 0,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_threads_phone_provider ON comms_threads(provider, phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_threads_person ON comms_threads(person_id) WHERE person_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_comms_threads_status ON comms_threads(status);
CREATE INDEX IF NOT EXISTS idx_comms_threads_last_message ON comms_threads(last_message_at DESC NULLS LAST);
CREATE TABLE IF NOT EXISTS comms_messages (
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NOT NULL REFERENCES comms_threads(thread_id) ON DELETE CASCADE,
provider TEXT NOT NULL DEFAULT 'mock',
external_message_id TEXT,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound', 'system')),
message_type TEXT NOT NULL DEFAULT 'text',
body TEXT NOT NULL DEFAULT '',
media_url TEXT,
media_mime_type TEXT,
delivery_status TEXT NOT NULL DEFAULT 'pending',
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_messages_thread ON comms_messages(thread_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_comms_messages_external ON comms_messages(external_message_id) WHERE external_message_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS comms_call_logs (
call_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NULL REFERENCES comms_threads(thread_id) ON DELETE SET NULL,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
provider TEXT NOT NULL DEFAULT 'mock',
external_call_id TEXT,
phone_e164 TEXT NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound')),
status TEXT NOT NULL DEFAULT 'completed',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
duration_seconds INT,
recording_url TEXT,
transcript_id UUID,
transcript_text TEXT,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_phone ON comms_call_logs(phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_thread ON comms_call_logs(thread_id) WHERE thread_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS comms_settings (
key TEXT PRIMARY KEY,
value_json JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO comms_settings (key, value_json)
VALUES ('config', '{"provider":"mock","auto_link_by_phone":true,"create_crm_interaction_on_inbound":true,"default_country_code":"91","transcription_provider":"none"}'::jsonb)
ON CONFLICT (key) DO NOTHING;
"""
)
_SCHEMA_READY = True
async def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable")
await _ensure_schema(pool)
return pool
def _row_thread(row) -> dict[str, Any]:
return {
"threadId": str(row["thread_id"]),
"provider": row["provider"],
"externalThreadId": row["external_thread_id"],
"personId": str(row["person_id"]) if row["person_id"] else None,
"phoneE164": row["phone_e164"],
"displayName": row["display_name"],
"channel": row["channel"],
"status": row["status"],
"assignedUserId": str(row["assigned_user_id"]) if row["assigned_user_id"] else None,
"lastMessageAt": row["last_message_at"].isoformat() if row["last_message_at"] else None,
"unreadCount": row["unread_count"],
"metadataJson": _json_obj(row["metadata_json"]),
"createdAt": row["created_at"].isoformat(),
"updatedAt": row["updated_at"].isoformat(),
"lastMessagePreview": _record_value(row, "last_message_preview"),
"crmPerson": {
"id": str(row["person_id"]),
"fullName": row["crm_full_name"],
"primaryPhone": row["crm_primary_phone"],
"primaryEmail": row["crm_primary_email"],
"buyerType": row["crm_buyer_type"],
"leadStatus": row["crm_lead_status"],
"projectName": row["crm_project_name"],
} if row["person_id"] else None,
}
@router.get("/threads")
async def list_threads(
request: Request,
status: str | None = None,
search: str | None = None,
limit: int = 50,
offset: int = 0,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
limit = max(1, min(limit, 100))
offset = max(0, offset)
conditions = ["1=1"]
values: list[Any] = []
if status:
values.append(status)
conditions.append(f"t.status = ${len(values)}")
if search:
values.append(f"%{search}%")
conditions.append(f"(t.phone_e164 ILIKE ${len(values)} OR t.display_name ILIKE ${len(values)} OR p.full_name ILIKE ${len(values)} OR p.primary_email ILIKE ${len(values)})")
where_clause = " AND ".join(conditions)
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.*,
p.full_name AS crm_full_name,
p.primary_email AS crm_primary_email,
p.primary_phone AS crm_primary_phone,
p.buyer_type AS crm_buyer_type,
COALESCE(l.status, '') AS crm_lead_status,
(
SELECT pi.project_name FROM crm_property_interests pi
WHERE pi.person_id = p.person_id
ORDER BY pi.priority ASC NULLS LAST, pi.created_at DESC NULLS LAST
LIMIT 1
) AS crm_project_name,
(
SELECT m.body FROM comms_messages m
WHERE m.thread_id = t.thread_id
ORDER BY m.created_at DESC
LIMIT 1
) AS last_message_preview
FROM comms_threads t
LEFT JOIN crm_people p ON t.person_id = p.person_id
LEFT JOIN LATERAL (
SELECT status FROM crm_leads l
WHERE l.person_id = p.person_id
ORDER BY l.updated_at DESC NULLS LAST
LIMIT 1
) l ON TRUE
WHERE {where_clause}
ORDER BY t.last_message_at DESC NULLS LAST, t.updated_at DESC
LIMIT ${len(values)+1} OFFSET ${len(values)+2}
""",
*values,
limit,
offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM comms_threads t LEFT JOIN crm_people p ON t.person_id = p.person_id WHERE {where_clause}",
*values,
)
unread = await conn.fetchval("SELECT COALESCE(SUM(unread_count),0)::int FROM comms_threads WHERE status = 'open'")
return {"threads": [_row_thread(row) for row in rows], "total": total or 0, "unreadTotal": unread or 0}
@router.get("/threads/{thread_id}")
async def get_thread(thread_id: str, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT t.*, p.full_name AS crm_full_name, p.primary_email AS crm_primary_email,
p.primary_phone AS crm_primary_phone, p.buyer_type AS crm_buyer_type,
COALESCE(l.status, '') AS crm_lead_status,
(
SELECT pi.project_name FROM crm_property_interests pi
WHERE pi.person_id = p.person_id
ORDER BY pi.priority ASC NULLS LAST, pi.created_at DESC NULLS LAST
LIMIT 1
) AS crm_project_name,
NULL::text AS last_message_preview
FROM comms_threads t
LEFT JOIN crm_people p ON t.person_id = p.person_id
LEFT JOIN LATERAL (
SELECT status FROM crm_leads l
WHERE l.person_id = p.person_id
ORDER BY l.updated_at DESC NULLS LAST
LIMIT 1
) l ON TRUE
WHERE t.thread_id = $1::uuid
""",
thread_id,
)
if not row:
raise HTTPException(status_code=404, detail="Thread not found")
return _row_thread(row)
@router.get("/threads/{thread_id}/messages")
async def list_messages(
thread_id: str,
request: Request,
limit: int = 100,
offset: int = 0,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
limit = max(1, min(limit, 200))
offset = max(0, offset)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM comms_messages
WHERE thread_id = $1::uuid
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
thread_id,
limit,
offset,
)
messages = [
{
"messageId": str(row["message_id"]),
"threadId": str(row["thread_id"]),
"provider": row["provider"],
"externalMessageId": row["external_message_id"],
"direction": row["direction"],
"messageType": row["message_type"],
"body": row["body"],
"mediaUrl": row["media_url"],
"mediaMimeType": row["media_mime_type"],
"deliveryStatus": row["delivery_status"],
"sentAt": row["sent_at"].isoformat() if row["sent_at"] else None,
"deliveredAt": row["delivered_at"].isoformat() if row["delivered_at"] else None,
"readAt": row["read_at"].isoformat() if row["read_at"] else None,
"rawPayload": _json_obj(row["raw_payload"]),
"createdAt": row["created_at"].isoformat(),
}
for row in reversed(rows)
]
return {"messages": messages, "thread": await get_thread(thread_id, request)}
@router.post("/threads/{thread_id}/messages")
async def send_message(
thread_id: str,
body: SendMessageBody,
request: Request,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
async with pool.acquire() as conn:
thread = await conn.fetchrow("SELECT * FROM comms_threads WHERE thread_id = $1::uuid", thread_id)
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")
provider = await _get_provider_for_pool(pool)
result = await provider.send_message(
phone=thread["phone_e164"],
message=body.body,
message_type=body.messageType,
media_url=body.mediaUrl,
)
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages
(thread_id, provider, external_message_id, direction, message_type, body, media_url, delivery_status, sent_at)
VALUES ($1::uuid, $2, $3, 'outbound', $4, $5, $6, 'sent', NOW())
RETURNING message_id
""",
thread_id,
os.getenv("COMMS_PROVIDER", "mock").lower(),
result.get("external_message_id"),
body.messageType,
body.body,
body.mediaUrl,
)
await conn.execute("UPDATE comms_threads SET last_message_at = NOW(), updated_at = NOW() WHERE thread_id = $1::uuid", thread_id)
return {"messageId": str(msg_id), "providerResult": result}
@router.post("/threads/{thread_id}/link-person")
async def link_person(
thread_id: str,
body: LinkPersonBody,
request: Request,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval("SELECT EXISTS (SELECT 1 FROM crm_people WHERE person_id = $1::uuid)", body.personId)
if not exists:
raise HTTPException(status_code=404, detail="CRM person not found")
updated = await conn.execute(
"UPDATE comms_threads SET person_id = $1::uuid, updated_at = NOW() WHERE thread_id = $2::uuid",
body.personId,
thread_id,
)
return {"success": updated.endswith("1"), "threadId": thread_id, "personId": body.personId}
@router.post("/threads/{thread_id}/notes")
async def add_note(thread_id: str, body: NoteBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages (thread_id, provider, direction, message_type, body, delivery_status)
VALUES ($1::uuid, 'system', 'system', 'text', $2, 'delivered')
RETURNING message_id
""",
thread_id,
f"Note: {body.content}",
)
return {"messageId": str(msg_id)}
@router.post("/threads/{thread_id}/tasks")
async def add_task(thread_id: str, body: TaskBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
text = f"Task: {body.title}" + (f" (Due: {body.dueAt})" if body.dueAt else "")
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages (thread_id, provider, direction, message_type, body, delivery_status)
VALUES ($1::uuid, 'system', 'system', 'text', $2, 'delivered')
RETURNING message_id
""",
thread_id,
text,
)
return {"messageId": str(msg_id)}
@router.post("/webhooks/{provider}")
async def receive_webhook(provider: str, request: Request):
pool = await _pool(request)
raw_body = await request.body()
secret = os.getenv("COMMS_WEBHOOK_SECRET", "").strip()
if secret:
signature = request.headers.get("x-velocity-signature", "")
expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(signature, expected):
raise HTTPException(status_code=401, detail="Invalid comms webhook signature")
payload = await request.json()
provider_impl = await _get_provider_for_pool(pool, provider)
normalized = await provider_impl.normalize_webhook(payload)
normalized["provider"] = provider
return {"received": True, "ingest": await ingest_inbound_message(pool, normalized)}
@router.get("/settings")
async def get_settings(request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json, updated_at FROM comms_settings WHERE key = 'config'")
config = _json_obj(row["value_json"]) if row else {}
result = _camel_settings(config, row["updated_at"] if row else None)
if result.get("providerApiKey"):
result["providerApiKey"] = "********" + str(result["providerApiKey"])[-4:]
return result
@router.patch("/settings")
async def patch_settings(body: SettingsPatch, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
updates = _snake_settings(body)
if updates.get("provider_api_key", "").startswith("*"):
updates.pop("provider_api_key", None)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json FROM comms_settings WHERE key = 'config'")
config = _json_obj(row["value_json"]) if row else {}
config.update(updates)
await conn.execute(
"""
INSERT INTO comms_settings (key, value_json, updated_at)
VALUES ('config', $1::jsonb, NOW())
ON CONFLICT (key) DO UPDATE SET value_json = EXCLUDED.value_json, updated_at = NOW()
""",
json.dumps(config),
)
return {"success": True}
@router.post("/provider/test")
async def test_provider(request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
return await (await _get_provider_for_pool(pool)).test_connection()
@router.post("/recordings/transcribe")
async def transcribe_recording(body: TranscribeBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
if body.callId:
async with pool.acquire() as conn:
await conn.execute(
"UPDATE comms_call_logs SET transcript_text = $1 WHERE call_id = $2::uuid",
"Transcription pending. Configure COMMS_TRANSCRIPTION_PROVIDER to enable processing.",
body.callId,
)
return {
"success": True,
"status": "pending",
"message": "Transcription intake recorded. A real transcription worker/provider is still required.",
"callId": body.callId,
"recordingUrl": body.recordingUrl,
}