feat: Whatsapp Integration
Some checks failed
Production Readiness / backend-contracts (push) Failing after 1m58s
Production Readiness / webos-typecheck (push) Successful in 1m37s
Production Readiness / ipad-parse (push) Successful in 2m17s

This commit is contained in:
Sagnik
2026-04-28 13:41:14 +05:30
parent 7ee51543d9
commit 3623bacbac
15 changed files with 2549 additions and 3 deletions

588
backend/api/routes_comms.py Normal file
View File

@@ -0,0 +1,588 @@
"""
Velocity Conversations API.
Native WhatsApp-first communications surface for Velocity WebOS. The routes are
provider-abstracted and CRM-aware, while remaining safe to run in mock mode.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from backend.auth.dependencies import UserPrincipal, get_current_user
from backend.services.comms_evolution_provider import EvolutionProvider
from backend.services.comms_ingest import ingest_inbound_message
from backend.services.comms_provider import MockProvider
from backend.services.comms_waha_provider import WahaProvider
router = APIRouter()
_SCHEMA_READY = False
class SendMessageBody(BaseModel):
messageType: str = "text"
body: str
mediaUrl: str | None = None
templateName: str | None = None
templateLanguage: str | None = None
class LinkPersonBody(BaseModel):
personId: str
class NoteBody(BaseModel):
content: str
class TaskBody(BaseModel):
title: str
dueAt: str | None = None
class SettingsPatch(BaseModel):
provider: str | None = None
providerBaseUrl: str | None = None
providerApiKey: str | None = None
instanceId: str | None = None
phoneNumberId: str | None = None
webhookCallbackUrl: str | None = None
webhookSecret: str | None = None
defaultAssignmentUserId: str | None = None
autoLinkByPhone: bool | None = None
createCrmInteractionOnInbound: bool | None = None
defaultCountryCode: str | None = None
transcriptionProvider: str | None = None
class TranscribeBody(BaseModel):
callId: str | None = None
recordingUrl: str | None = None
def _get_provider():
return _provider_from_config({})
def _provider_from_config(config: dict[str, Any], provider_override: str | None = None):
provider = (provider_override or config.get("provider") or os.getenv("COMMS_PROVIDER", "mock")).strip().lower()
base_url = (config.get("provider_base_url") or os.getenv("COMMS_PROVIDER_BASE_URL", "")).strip()
api_key = (config.get("provider_api_key") or os.getenv("COMMS_PROVIDER_API_KEY", "")).strip()
instance_id = (config.get("instance_id") or os.getenv("COMMS_INSTANCE_ID", "")).strip() or None
if provider == "waha":
return WahaProvider(base_url, api_key, instance_id)
if provider == "evolution":
return EvolutionProvider(base_url, api_key, instance_id)
return MockProvider("", "", "mock")
async def _load_config(pool) -> dict[str, Any]:
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json FROM comms_settings WHERE key = 'config'")
return _json_obj(row["value_json"]) if row else {}
async def _get_provider_for_pool(pool, provider_override: str | None = None):
return _provider_from_config(await _load_config(pool), provider_override)
def _camel_settings(config: dict[str, Any], updated_at: datetime | None = None) -> dict[str, Any]:
return {
"provider": config.get("provider", os.getenv("COMMS_PROVIDER", "mock")),
"providerBaseUrl": config.get("provider_base_url", os.getenv("COMMS_PROVIDER_BASE_URL", "")),
"providerApiKey": config.get("provider_api_key", ""),
"instanceId": config.get("instance_id", os.getenv("COMMS_INSTANCE_ID", "")),
"phoneNumberId": config.get("phone_number_id", ""),
"webhookCallbackUrl": config.get("webhook_callback_url", "/api/comms/webhooks/{provider}"),
"webhookSecretSet": bool(config.get("webhook_secret_hash") or config.get("webhook_secret_set")),
"defaultAssignmentUserId": config.get("default_assignment_user_id"),
"autoLinkByPhone": bool(config.get("auto_link_by_phone", True)),
"createCrmInteractionOnInbound": bool(config.get("create_crm_interaction_on_inbound", True)),
"defaultCountryCode": str(config.get("default_country_code", os.getenv("COMMS_DEFAULT_COUNTRY_CODE", "91"))),
"mediaStorageDir": config.get("media_storage_dir", os.getenv("COMMS_MEDIA_STORAGE_DIR", "/opt/dlami/nvme/assets/comms")),
"transcriptionProvider": config.get("transcription_provider", os.getenv("COMMS_TRANSCRIPTION_PROVIDER", "none")),
**({"updatedAt": updated_at.isoformat()} if updated_at else {}),
}
def _snake_settings(body: SettingsPatch) -> dict[str, Any]:
mapping = {
"provider": "provider",
"providerBaseUrl": "provider_base_url",
"providerApiKey": "provider_api_key",
"instanceId": "instance_id",
"phoneNumberId": "phone_number_id",
"webhookCallbackUrl": "webhook_callback_url",
"defaultAssignmentUserId": "default_assignment_user_id",
"autoLinkByPhone": "auto_link_by_phone",
"createCrmInteractionOnInbound": "create_crm_interaction_on_inbound",
"defaultCountryCode": "default_country_code",
"transcriptionProvider": "transcription_provider",
}
raw = body.model_dump(exclude_unset=True)
updates: dict[str, Any] = {}
for src, dst in mapping.items():
if src in raw:
updates[dst] = raw[src]
if body.webhookSecret is not None:
updates["webhook_secret_hash"] = hashlib.sha256(body.webhookSecret.encode()).hexdigest() if body.webhookSecret else ""
updates["webhook_secret_set"] = bool(body.webhookSecret)
return updates
def _json_obj(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
return {}
def _record_value(row: Any, key: str, default: Any = None) -> Any:
try:
return row[key]
except (KeyError, IndexError, TypeError):
return default
async def _ensure_schema(pool) -> None:
global _SCHEMA_READY
if _SCHEMA_READY:
return
async with pool.acquire() as conn:
await conn.execute(
"""
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS comms_threads (
thread_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
provider TEXT NOT NULL DEFAULT 'mock',
external_thread_id TEXT,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
phone_e164 TEXT NOT NULL,
display_name TEXT,
channel TEXT NOT NULL DEFAULT 'whatsapp',
status TEXT NOT NULL DEFAULT 'open',
assigned_user_id UUID NULL,
last_message_at TIMESTAMPTZ,
unread_count INT NOT NULL DEFAULT 0,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_threads_phone_provider ON comms_threads(provider, phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_threads_person ON comms_threads(person_id) WHERE person_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_comms_threads_status ON comms_threads(status);
CREATE INDEX IF NOT EXISTS idx_comms_threads_last_message ON comms_threads(last_message_at DESC NULLS LAST);
CREATE TABLE IF NOT EXISTS comms_messages (
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NOT NULL REFERENCES comms_threads(thread_id) ON DELETE CASCADE,
provider TEXT NOT NULL DEFAULT 'mock',
external_message_id TEXT,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound', 'system')),
message_type TEXT NOT NULL DEFAULT 'text',
body TEXT NOT NULL DEFAULT '',
media_url TEXT,
media_mime_type TEXT,
delivery_status TEXT NOT NULL DEFAULT 'pending',
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_messages_thread ON comms_messages(thread_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_comms_messages_external ON comms_messages(external_message_id) WHERE external_message_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS comms_call_logs (
call_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NULL REFERENCES comms_threads(thread_id) ON DELETE SET NULL,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
provider TEXT NOT NULL DEFAULT 'mock',
external_call_id TEXT,
phone_e164 TEXT NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound')),
status TEXT NOT NULL DEFAULT 'completed',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
duration_seconds INT,
recording_url TEXT,
transcript_id UUID,
transcript_text TEXT,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_phone ON comms_call_logs(phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_thread ON comms_call_logs(thread_id) WHERE thread_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS comms_settings (
key TEXT PRIMARY KEY,
value_json JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO comms_settings (key, value_json)
VALUES ('config', '{"provider":"mock","auto_link_by_phone":true,"create_crm_interaction_on_inbound":true,"default_country_code":"91","transcription_provider":"none"}'::jsonb)
ON CONFLICT (key) DO NOTHING;
"""
)
_SCHEMA_READY = True
async def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable")
await _ensure_schema(pool)
return pool
def _row_thread(row) -> dict[str, Any]:
return {
"threadId": str(row["thread_id"]),
"provider": row["provider"],
"externalThreadId": row["external_thread_id"],
"personId": str(row["person_id"]) if row["person_id"] else None,
"phoneE164": row["phone_e164"],
"displayName": row["display_name"],
"channel": row["channel"],
"status": row["status"],
"assignedUserId": str(row["assigned_user_id"]) if row["assigned_user_id"] else None,
"lastMessageAt": row["last_message_at"].isoformat() if row["last_message_at"] else None,
"unreadCount": row["unread_count"],
"metadataJson": _json_obj(row["metadata_json"]),
"createdAt": row["created_at"].isoformat(),
"updatedAt": row["updated_at"].isoformat(),
"lastMessagePreview": _record_value(row, "last_message_preview"),
"crmPerson": {
"id": str(row["person_id"]),
"fullName": row["crm_full_name"],
"primaryPhone": row["crm_primary_phone"],
"primaryEmail": row["crm_primary_email"],
"buyerType": row["crm_buyer_type"],
"leadStatus": row["crm_lead_status"],
"projectName": row["crm_project_name"],
} if row["person_id"] else None,
}
@router.get("/threads")
async def list_threads(
request: Request,
status: str | None = None,
search: str | None = None,
limit: int = 50,
offset: int = 0,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
limit = max(1, min(limit, 100))
offset = max(0, offset)
conditions = ["1=1"]
values: list[Any] = []
if status:
values.append(status)
conditions.append(f"t.status = ${len(values)}")
if search:
values.append(f"%{search}%")
conditions.append(f"(t.phone_e164 ILIKE ${len(values)} OR t.display_name ILIKE ${len(values)} OR p.full_name ILIKE ${len(values)} OR p.primary_email ILIKE ${len(values)})")
where_clause = " AND ".join(conditions)
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT t.*,
p.full_name AS crm_full_name,
p.primary_email AS crm_primary_email,
p.primary_phone AS crm_primary_phone,
p.buyer_type AS crm_buyer_type,
COALESCE(l.status, '') AS crm_lead_status,
(
SELECT pi.project_name FROM crm_property_interests pi
WHERE pi.person_id = p.person_id
ORDER BY pi.priority ASC NULLS LAST, pi.created_at DESC NULLS LAST
LIMIT 1
) AS crm_project_name,
(
SELECT m.body FROM comms_messages m
WHERE m.thread_id = t.thread_id
ORDER BY m.created_at DESC
LIMIT 1
) AS last_message_preview
FROM comms_threads t
LEFT JOIN crm_people p ON t.person_id = p.person_id
LEFT JOIN LATERAL (
SELECT status FROM crm_leads l
WHERE l.person_id = p.person_id
ORDER BY l.updated_at DESC NULLS LAST
LIMIT 1
) l ON TRUE
WHERE {where_clause}
ORDER BY t.last_message_at DESC NULLS LAST, t.updated_at DESC
LIMIT ${len(values)+1} OFFSET ${len(values)+2}
""",
*values,
limit,
offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM comms_threads t LEFT JOIN crm_people p ON t.person_id = p.person_id WHERE {where_clause}",
*values,
)
unread = await conn.fetchval("SELECT COALESCE(SUM(unread_count),0)::int FROM comms_threads WHERE status = 'open'")
return {"threads": [_row_thread(row) for row in rows], "total": total or 0, "unreadTotal": unread or 0}
@router.get("/threads/{thread_id}")
async def get_thread(thread_id: str, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT t.*, p.full_name AS crm_full_name, p.primary_email AS crm_primary_email,
p.primary_phone AS crm_primary_phone, p.buyer_type AS crm_buyer_type,
COALESCE(l.status, '') AS crm_lead_status,
(
SELECT pi.project_name FROM crm_property_interests pi
WHERE pi.person_id = p.person_id
ORDER BY pi.priority ASC NULLS LAST, pi.created_at DESC NULLS LAST
LIMIT 1
) AS crm_project_name,
NULL::text AS last_message_preview
FROM comms_threads t
LEFT JOIN crm_people p ON t.person_id = p.person_id
LEFT JOIN LATERAL (
SELECT status FROM crm_leads l
WHERE l.person_id = p.person_id
ORDER BY l.updated_at DESC NULLS LAST
LIMIT 1
) l ON TRUE
WHERE t.thread_id = $1::uuid
""",
thread_id,
)
if not row:
raise HTTPException(status_code=404, detail="Thread not found")
return _row_thread(row)
@router.get("/threads/{thread_id}/messages")
async def list_messages(
thread_id: str,
request: Request,
limit: int = 100,
offset: int = 0,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
limit = max(1, min(limit, 200))
offset = max(0, offset)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM comms_messages
WHERE thread_id = $1::uuid
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
thread_id,
limit,
offset,
)
messages = [
{
"messageId": str(row["message_id"]),
"threadId": str(row["thread_id"]),
"provider": row["provider"],
"externalMessageId": row["external_message_id"],
"direction": row["direction"],
"messageType": row["message_type"],
"body": row["body"],
"mediaUrl": row["media_url"],
"mediaMimeType": row["media_mime_type"],
"deliveryStatus": row["delivery_status"],
"sentAt": row["sent_at"].isoformat() if row["sent_at"] else None,
"deliveredAt": row["delivered_at"].isoformat() if row["delivered_at"] else None,
"readAt": row["read_at"].isoformat() if row["read_at"] else None,
"rawPayload": _json_obj(row["raw_payload"]),
"createdAt": row["created_at"].isoformat(),
}
for row in reversed(rows)
]
return {"messages": messages, "thread": await get_thread(thread_id, request)}
@router.post("/threads/{thread_id}/messages")
async def send_message(
thread_id: str,
body: SendMessageBody,
request: Request,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
async with pool.acquire() as conn:
thread = await conn.fetchrow("SELECT * FROM comms_threads WHERE thread_id = $1::uuid", thread_id)
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")
provider = await _get_provider_for_pool(pool)
result = await provider.send_message(
phone=thread["phone_e164"],
message=body.body,
message_type=body.messageType,
media_url=body.mediaUrl,
)
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages
(thread_id, provider, external_message_id, direction, message_type, body, media_url, delivery_status, sent_at)
VALUES ($1::uuid, $2, $3, 'outbound', $4, $5, $6, 'sent', NOW())
RETURNING message_id
""",
thread_id,
os.getenv("COMMS_PROVIDER", "mock").lower(),
result.get("external_message_id"),
body.messageType,
body.body,
body.mediaUrl,
)
await conn.execute("UPDATE comms_threads SET last_message_at = NOW(), updated_at = NOW() WHERE thread_id = $1::uuid", thread_id)
return {"messageId": str(msg_id), "providerResult": result}
@router.post("/threads/{thread_id}/link-person")
async def link_person(
thread_id: str,
body: LinkPersonBody,
request: Request,
_: UserPrincipal = Depends(get_current_user),
):
pool = await _pool(request)
async with pool.acquire() as conn:
exists = await conn.fetchval("SELECT EXISTS (SELECT 1 FROM crm_people WHERE person_id = $1::uuid)", body.personId)
if not exists:
raise HTTPException(status_code=404, detail="CRM person not found")
updated = await conn.execute(
"UPDATE comms_threads SET person_id = $1::uuid, updated_at = NOW() WHERE thread_id = $2::uuid",
body.personId,
thread_id,
)
return {"success": updated.endswith("1"), "threadId": thread_id, "personId": body.personId}
@router.post("/threads/{thread_id}/notes")
async def add_note(thread_id: str, body: NoteBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages (thread_id, provider, direction, message_type, body, delivery_status)
VALUES ($1::uuid, 'system', 'system', 'text', $2, 'delivered')
RETURNING message_id
""",
thread_id,
f"Note: {body.content}",
)
return {"messageId": str(msg_id)}
@router.post("/threads/{thread_id}/tasks")
async def add_task(thread_id: str, body: TaskBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
text = f"Task: {body.title}" + (f" (Due: {body.dueAt})" if body.dueAt else "")
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages (thread_id, provider, direction, message_type, body, delivery_status)
VALUES ($1::uuid, 'system', 'system', 'text', $2, 'delivered')
RETURNING message_id
""",
thread_id,
text,
)
return {"messageId": str(msg_id)}
@router.post("/webhooks/{provider}")
async def receive_webhook(provider: str, request: Request):
pool = await _pool(request)
raw_body = await request.body()
secret = os.getenv("COMMS_WEBHOOK_SECRET", "").strip()
if secret:
signature = request.headers.get("x-velocity-signature", "")
expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(signature, expected):
raise HTTPException(status_code=401, detail="Invalid comms webhook signature")
payload = await request.json()
provider_impl = await _get_provider_for_pool(pool, provider)
normalized = await provider_impl.normalize_webhook(payload)
normalized["provider"] = provider
return {"received": True, "ingest": await ingest_inbound_message(pool, normalized)}
@router.get("/settings")
async def get_settings(request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json, updated_at FROM comms_settings WHERE key = 'config'")
config = _json_obj(row["value_json"]) if row else {}
result = _camel_settings(config, row["updated_at"] if row else None)
if result.get("providerApiKey"):
result["providerApiKey"] = "********" + str(result["providerApiKey"])[-4:]
return result
@router.patch("/settings")
async def patch_settings(body: SettingsPatch, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
updates = _snake_settings(body)
if updates.get("provider_api_key", "").startswith("*"):
updates.pop("provider_api_key", None)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value_json FROM comms_settings WHERE key = 'config'")
config = _json_obj(row["value_json"]) if row else {}
config.update(updates)
await conn.execute(
"""
INSERT INTO comms_settings (key, value_json, updated_at)
VALUES ('config', $1::jsonb, NOW())
ON CONFLICT (key) DO UPDATE SET value_json = EXCLUDED.value_json, updated_at = NOW()
""",
json.dumps(config),
)
return {"success": True}
@router.post("/provider/test")
async def test_provider(request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
return await (await _get_provider_for_pool(pool)).test_connection()
@router.post("/recordings/transcribe")
async def transcribe_recording(body: TranscribeBody, request: Request, _: UserPrincipal = Depends(get_current_user)):
pool = await _pool(request)
if body.callId:
async with pool.acquire() as conn:
await conn.execute(
"UPDATE comms_call_logs SET transcript_text = $1 WHERE call_id = $2::uuid",
"Transcription pending. Configure COMMS_TRANSCRIPTION_PROVIDER to enable processing.",
body.callId,
)
return {
"success": True,
"status": "pending",
"message": "Transcription intake recorded. A real transcription worker/provider is still required.",
"callId": body.callId,
"recordingUrl": body.recordingUrl,
}

100
backend/db/schema_comms.sql Normal file
View File

@@ -0,0 +1,100 @@
-- Velocity Comms Schema
-- Run this migration against your asyncpg pool database.
BEGIN;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Threads (conversations)
CREATE TABLE IF NOT EXISTS comms_threads (
thread_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
provider TEXT NOT NULL DEFAULT 'mock',
external_thread_id TEXT,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
phone_e164 TEXT NOT NULL,
display_name TEXT,
channel TEXT NOT NULL DEFAULT 'whatsapp',
status TEXT NOT NULL DEFAULT 'open',
assigned_user_id UUID NULL,
last_message_at TIMESTAMPTZ,
unread_count INT NOT NULL DEFAULT 0,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_threads_phone ON comms_threads(phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_threads_person ON comms_threads(person_id) WHERE person_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_comms_threads_status ON comms_threads(status);
CREATE INDEX IF NOT EXISTS idx_comms_threads_last_message ON comms_threads(last_message_at DESC NULLS LAST);
-- Messages
CREATE TABLE IF NOT EXISTS comms_messages (
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NOT NULL REFERENCES comms_threads(thread_id) ON DELETE CASCADE,
provider TEXT NOT NULL DEFAULT 'mock',
external_message_id TEXT,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound', 'system')),
message_type TEXT NOT NULL DEFAULT 'text',
body TEXT NOT NULL DEFAULT '',
media_url TEXT,
media_mime_type TEXT,
delivery_status TEXT NOT NULL DEFAULT 'pending',
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_messages_thread ON comms_messages(thread_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_comms_messages_external ON comms_messages(external_message_id) WHERE external_message_id IS NOT NULL;
-- Call logs
CREATE TABLE IF NOT EXISTS comms_call_logs (
call_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
thread_id UUID NULL REFERENCES comms_threads(thread_id) ON DELETE SET NULL,
person_id UUID NULL REFERENCES crm_people(person_id) ON DELETE SET NULL,
provider TEXT NOT NULL DEFAULT 'mock',
external_call_id TEXT,
phone_e164 TEXT NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound')),
status TEXT NOT NULL DEFAULT 'completed',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
duration_seconds INT,
recording_url TEXT,
transcript_id UUID,
transcript_text TEXT,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_phone ON comms_call_logs(phone_e164);
CREATE INDEX IF NOT EXISTS idx_comms_call_logs_thread ON comms_call_logs(thread_id) WHERE thread_id IS NOT NULL;
-- Settings (key-value JSON)
CREATE TABLE IF NOT EXISTS comms_settings (
key TEXT PRIMARY KEY,
value_json JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Insert default settings
INSERT INTO comms_settings (key, value_json) VALUES ('config', '{
"provider": "mock",
"provider_base_url": "",
"provider_api_key": "",
"instance_id": "",
"phone_number_id": "",
"webhook_callback_url": "",
"webhook_secret_set": false,
"default_assignment_user_id": null,
"auto_link_by_phone": true,
"create_crm_interaction_on_inbound": true,
"default_country_code": "91",
"media_storage_dir": "/opt/dlami/nvme/assets/comms",
"transcription_provider": "none"
}'::jsonb) ON CONFLICT (key) DO NOTHING;
COMMIT;

View File

@@ -63,6 +63,7 @@ from backend.api.routes_admin_surface import router as admin_surface_router
from backend.api.routes_oracle_templates import router as oracle_templates_router
from backend.api.routes_observability import router as observability_router
from backend.api.routes_crm_imports import router as crm_imports_router
from backend.api.routes_comms import router as comms_router
from backend.api.routes_runtime_llm import router as runtime_llm_router
from backend.auth.routes import router as auth_router
from backend.auth.user_directory import ensure_user_directory_schema
@@ -150,6 +151,7 @@ app.include_router(inventory_router, prefix="/api/inventory", tags=["Inventory"]
app.include_router(admin_surface_router, prefix="/api/admin-surface", tags=["Admin Surface"])
app.include_router(observability_router, prefix="/api", tags=["Observability"])
app.include_router(crm_imports_router, prefix="/api", tags=["CRM Canonical"])
app.include_router(comms_router, prefix="/api/comms", tags=["Comms"])
app.include_router(runtime_llm_router, prefix="/api/runtime/llm", tags=["Runtime LLM"])
# Public vault link (no /api prefix — shared externally with prospects)

View File

@@ -0,0 +1,90 @@
"""
Evolution API (https://github.com/EvolutionAPI/evolution-api) adapter.
"""
import httpx
from typing import Any, Dict, List, Optional
from .comms_provider import CommsProvider
class EvolutionProvider(CommsProvider):
def _headers(self) -> Dict[str, str]:
return {"Content-Type": "application/json", "apikey": self.api_key}
async def _request(self, method: str, path: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.request(method, url, headers=self._headers(), json=json_data)
resp.raise_for_status()
return resp.json()
async def send_message(self, phone: str, message: str, message_type: str = "text", **kwargs) -> Dict[str, Any]:
instance = self.instance_id or "default"
payload = {
"number": phone,
"text": message,
"options": {"delay": 1200, "presence": "composing"},
}
result = await self._request("POST", f"/message/sendText/{instance}", payload)
ext_id = result.get("key", {}).get("id") if isinstance(result, dict) else None
return {
"success": True,
"provider": "evolution",
"external_message_id": ext_id,
"status": "sent",
"raw": result,
}
async def normalize_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Evolution webhook v2 shape:
{
"event": "messages.upsert",
"instance": "default",
"data": {
"key": {"remoteJid": "123@s.whatsapp.net", "fromMe": false, "id": "..."},
"message": {"conversation": "Hello"},
"messageTimestamp": 1710000000, ...
}
}
"""
event = payload.get("event", "")
data = payload.get("data", {})
key = data.get("key", {})
remote_jid = key.get("remoteJid", "")
phone = remote_jid.replace("@s.whatsapp.net", "").replace("@g.us", "")
msg_content = data.get("message", {})
body = msg_content.get("conversation", "") or msg_content.get("extendedTextMessage", {}).get("text", "")
direction = "outbound" if key.get("fromMe") else "inbound"
return {
"provider": "evolution",
"external_message_id": key.get("id"),
"phone_e164": phone,
"direction": direction,
"message_type": "text",
"body": body,
"media_url": None,
"raw": payload,
"timestamp": data.get("messageTimestamp"),
}
async def test_connection(self) -> Dict[str, Any]:
try:
instance = self.instance_id or "default"
info = await self._request("GET", f"/instance/connectionState/{instance}")
return {
"success": True,
"message": f"Evolution instance '{instance}' state retrieved.",
"account_info": info,
}
except Exception as exc:
return {
"success": False,
"message": f"Evolution connection failed: {exc}",
}
async def fetch_templates(self) -> List[Dict[str, Any]]:
return []

View File

@@ -0,0 +1,239 @@
"""Inbound communications ingestion for Velocity CRM."""
from __future__ import annotations
import json
import os
import re
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
PHONEUTILS_AVAILABLE = False
try:
import phonenumbers
from phonenumbers import NumberParseException
PHONEUTILS_AVAILABLE = True
except ImportError:
phonenumbers = None # type: ignore[assignment]
NumberParseException = Exception # type: ignore[assignment]
DEFAULT_COUNTRY = os.getenv("COMMS_DEFAULT_COUNTRY_CODE", "91")
def normalize_phone(phone: str, default_region: str = DEFAULT_COUNTRY) -> str | None:
"""Return an E.164-like phone number suitable for provider and CRM matching."""
if not phone:
return None
cleaned = re.sub(r"[^\d+]", "", phone.strip())
if cleaned.startswith("00"):
cleaned = "+" + cleaned[2:]
if not cleaned.startswith("+"):
cleaned = f"+{default_region}{cleaned}"
if PHONEUTILS_AVAILABLE and phonenumbers is not None:
try:
parsed = phonenumbers.parse(cleaned, None)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
pass
return cleaned if re.match(r"^\+\d{7,15}$", cleaned) else None
def _phone_digits(phone: str) -> str:
return re.sub(r"\D+", "", phone or "")
def _crm_channel(channel: str) -> str:
allowed = {"whatsapp", "sms", "call", "email", "website", "walk_in", "other"}
return channel if channel in allowed else "other"
async def get_or_create_thread(
pool,
phone_e164: str,
provider: str,
external_thread_id: str | None = None,
display_name: str | None = None,
channel: str = "whatsapp",
) -> dict[str, Any]:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT thread_id, person_id, status, unread_count
FROM comms_threads
WHERE phone_e164 = $1 AND provider = $2
LIMIT 1
""",
phone_e164,
provider,
)
if row:
return dict(row)
person_id = None
try:
person_row = await conn.fetchrow(
"""
SELECT person_id
FROM crm_people
WHERE primary_phone = $1
OR regexp_replace(COALESCE(primary_phone, ''), '[^0-9]', '', 'g') = $2
LIMIT 1
""",
phone_e164,
_phone_digits(phone_e164),
)
person_id = person_row["person_id"] if person_row else None
except Exception:
person_id = None
new_id = await conn.fetchval(
"""
INSERT INTO comms_threads
(provider, external_thread_id, person_id, phone_e164, display_name, channel, status, unread_count)
VALUES ($1, $2, $3, $4, $5, $6, 'open', 1)
RETURNING thread_id
""",
provider,
external_thread_id,
person_id,
phone_e164,
display_name or phone_e164,
channel,
)
return {
"thread_id": new_id,
"person_id": person_id,
"status": "open",
"unread_count": 1,
"is_new": True,
}
async def store_message(
pool,
thread_id: UUID,
provider: str,
external_message_id: str | None,
direction: str,
message_type: str,
body: str,
media_url: str | None = None,
raw_payload: dict[str, Any] | None = None,
sent_at: datetime | None = None,
) -> UUID:
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages
(thread_id, provider, external_message_id, direction, message_type, body, media_url,
delivery_status, sent_at, raw_payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::jsonb)
RETURNING message_id
""",
thread_id,
provider,
external_message_id,
direction,
message_type,
body,
media_url,
"delivered" if direction == "inbound" else "sent",
sent_at or datetime.now(UTC),
json.dumps(raw_payload or {}),
)
unread_delta = 1 if direction == "inbound" else 0
await conn.execute(
"""
UPDATE comms_threads
SET last_message_at = NOW(), unread_count = unread_count + $2, updated_at = NOW()
WHERE thread_id = $1
""",
thread_id,
unread_delta,
)
return msg_id
async def maybe_create_crm_interaction(pool, person_id: UUID, body: str, channel: str = "whatsapp") -> None:
"""Mirror inbound comms into canonical CRM intelligence tables when present."""
if not person_id:
return
try:
async with pool.acquire() as conn:
exists = await conn.fetchval("SELECT to_regclass('public.intel_interactions') IS NOT NULL")
if not exists:
return
interaction_id = await conn.fetchval(
"""
INSERT INTO intel_interactions
(person_id, channel, interaction_type, happened_at, summary, source_ref, metadata_json)
VALUES ($1, $2::intel_channel, 'message', NOW(), $3, 'comms_ingest', $4::jsonb)
RETURNING interaction_id
""",
person_id,
_crm_channel(channel),
body[:500],
json.dumps({"source": "comms", "direction": "inbound"}),
)
if await conn.fetchval("SELECT to_regclass('public.intel_messages') IS NOT NULL"):
await conn.execute(
"""
INSERT INTO intel_messages
(interaction_id, sender_role, sender_name, message_text, delivered_at, metadata_json)
VALUES ($1, 'lead', NULL, $2, NOW(), $3::jsonb)
""",
interaction_id,
body,
json.dumps({"source": "comms"}),
)
except Exception:
return
async def ingest_inbound_message(pool, normalized_payload: dict[str, Any]) -> dict[str, Any]:
phone = normalize_phone(normalized_payload.get("phone_e164") or normalized_payload.get("phone") or "")
if not phone:
raise ValueError("Missing phone_e164 in payload")
provider = normalized_payload.get("provider", "unknown")
channel = normalized_payload.get("channel", "whatsapp")
thread = await get_or_create_thread(
pool,
phone_e164=phone,
provider=provider,
external_thread_id=normalized_payload.get("external_thread_id"),
display_name=normalized_payload.get("display_name") or phone,
channel=channel,
)
timestamp = normalized_payload.get("timestamp")
sent_at = datetime.fromtimestamp(timestamp, UTC) if timestamp else None
msg_id = await store_message(
pool,
thread_id=thread["thread_id"],
provider=provider,
external_message_id=normalized_payload.get("external_message_id"),
direction=normalized_payload.get("direction", "inbound"),
message_type=normalized_payload.get("message_type", "text"),
body=normalized_payload.get("body", ""),
media_url=normalized_payload.get("media_url"),
raw_payload=normalized_payload.get("raw"),
sent_at=sent_at,
)
if thread.get("person_id") and normalized_payload.get("direction", "inbound") == "inbound":
await maybe_create_crm_interaction(pool, thread["person_id"], normalized_payload.get("body", ""), channel)
return {
"thread_id": str(thread["thread_id"]),
"message_id": str(msg_id),
"person_id": str(thread["person_id"]) if thread.get("person_id") else None,
"is_new_thread": thread.get("is_new", False),
}

View File

@@ -0,0 +1,63 @@
"""
Abstract provider interface for Velocity Comms.
"""
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
class CommsProvider(ABC):
def __init__(self, base_url: str, api_key: str, instance_id: Optional[str] = None):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.instance_id = instance_id
@abstractmethod
async def send_message(self, phone: str, message: str, message_type: str = "text", **kwargs) -> Dict[str, Any]:
"""Send a message. Return provider response dict."""
...
@abstractmethod
async def normalize_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Convert provider webhook payload to Velocity canonical format."""
...
@abstractmethod
async def test_connection(self) -> Dict[str, Any]:
"""Test provider connectivity. Return {success, message, account_info}."""
...
async def fetch_templates(self) -> List[Dict[str, Any]]:
"""Optional: fetch message templates."""
return []
async def get_media(self, media_id: str) -> Optional[bytes]:
"""Optional: download media bytes."""
return None
async def send_template(self, phone: str, template_name: str, language: str, components: Optional[List] = None) -> Dict[str, Any]:
"""Optional: send a template message."""
raise NotImplementedError("Templates not supported by this provider.")
class MockProvider(CommsProvider):
"""Mock provider for local development and UI previews."""
async def send_message(self, phone: str, message: str, message_type: str = "text", **kwargs) -> Dict[str, Any]:
return {
"success": True,
"provider": "mock",
"external_message_id": f"mock-{os.urandom(4).hex()}",
"status": "sent",
}
async def normalize_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
return payload
async def test_connection(self) -> Dict[str, Any]:
return {
"success": True,
"message": "Mock provider is always healthy.",
"account_info": {"mode": "mock"},
}

View File

@@ -0,0 +1,95 @@
"""
WAHA (https://github.com/devlikeapro/waha) adapter.
WAHA exposes a simple HTTP API for WhatsApp Web.
"""
import httpx
from typing import Any, Dict, Optional
from .comms_provider import CommsProvider
class WahaProvider(CommsProvider):
async def _request(self, method: str, path: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/api{path}"
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-Api-Key"] = self.api_key
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.request(method, url, headers=headers, json=json_data)
resp.raise_for_status()
return resp.json()
async def send_message(self, phone: str, message: str, message_type: str = "text", **kwargs) -> Dict[str, Any]:
chat_id = f"{phone}@c.us"
payload = {
"chatId": chat_id,
"text": message,
"session": self.instance_id or "default",
}
if message_type == "image" and kwargs.get("media_url"):
payload["caption"] = message
payload["media"] = kwargs["media_url"]
result = await self._request("POST", "/sendImage", payload)
else:
result = await self._request("POST", "/sendText", payload)
return {
"success": True,
"provider": "waha",
"external_message_id": result.get("id"),
"status": "sent",
"raw": result,
}
async def normalize_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
WAHA webhook payload shape (v2024):
{
"event": "message",
"session": "default",
"payload": {
"id": "true_123@c.us_3EB0...",
"timestamp": 1710000000,
"from": "123@c.us",
"to": "456@c.us",
"body": "Hello",
"hasMedia": false, ...
}
}
"""
event = payload.get("event", "")
pl = payload.get("payload", {})
from_jid = pl.get("from", "")
phone = from_jid.replace("@c.us", "").replace("@g.us", "")
direction = "inbound" if event == "message" and not pl.get("fromMe") else "outbound"
return {
"provider": "waha",
"external_message_id": pl.get("id"),
"phone_e164": phone,
"direction": direction,
"message_type": "image" if pl.get("hasMedia") else "text",
"body": pl.get("body", ""),
"media_url": pl.get("mediaUrl"),
"raw": payload,
"timestamp": pl.get("timestamp"),
}
async def test_connection(self) -> Dict[str, Any]:
try:
sessions = await self._request("GET", "/sessions?all=true")
return {
"success": True,
"message": f"Connected to WAHA. Sessions: {len(sessions)}",
"account_info": {"sessions": sessions},
}
except Exception as exc:
return {
"success": False,
"message": f"WAHA connection failed: {exc}",
}
async def get_media(self, media_id: str) -> Optional[bytes]:
return None