Files
Project_Velocity/backend/services/comms_ingest.py
Sayan Datta 6c93e31741
All checks were successful
Production Readiness / backend-contracts (pull_request) Successful in 3m19s
Production Readiness / webos-typecheck (pull_request) Successful in 2m38s
Production Readiness / ipad-parse (pull_request) Successful in 1m44s
feat: Ipad app production readiness, Colony orchestration, Social posting
2026-05-03 18:28:04 +05:30

367 lines
13 KiB
Python

"""Inbound communications ingestion for Velocity CRM."""
from __future__ import annotations
import json
import os
import re
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from uuid import UUID
import httpx
PHONEUTILS_AVAILABLE = False
try:
import phonenumbers
from phonenumbers import NumberParseException
PHONEUTILS_AVAILABLE = True
except ImportError:
phonenumbers = None # type: ignore[assignment]
NumberParseException = Exception # type: ignore[assignment]
DEFAULT_COUNTRY = os.getenv("COMMS_DEFAULT_COUNTRY_CODE", "91")
class TranscriptionError(RuntimeError):
"""Raised when the configured transcription provider cannot produce text."""
async def _read_recording_bytes(recording_url: str) -> tuple[bytes, str, str]:
if not recording_url:
raise TranscriptionError("recording_url is required.")
if recording_url.startswith("file://"):
path = Path(recording_url[7:]).expanduser()
return path.read_bytes(), path.name or "recording.audio", "application/octet-stream"
local_path = Path(recording_url).expanduser()
if local_path.exists():
return local_path.read_bytes(), local_path.name or "recording.audio", "application/octet-stream"
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
response = await client.get(recording_url)
response.raise_for_status()
content_type = response.headers.get("content-type", "application/octet-stream")
filename = recording_url.rstrip("/").split("/")[-1] or "recording.audio"
return response.content, filename, content_type
async def _transcribe_openai(recording_url: str) -> dict[str, Any]:
api_key = os.getenv("OPENAI_API_KEY", "").strip()
if not api_key:
raise TranscriptionError("OPENAI_API_KEY is required for COMMS_TRANSCRIPTION_PROVIDER=openai.")
audio, filename, content_type = await _read_recording_bytes(recording_url)
model = os.getenv("COMMS_OPENAI_TRANSCRIPTION_MODEL", "whisper-1")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
"https://api.openai.com/v1/audio/transcriptions",
headers={"Authorization": f"Bearer {api_key}"},
data={"model": model, "response_format": "verbose_json"},
files={"file": (filename, audio, content_type)},
)
response.raise_for_status()
payload = response.json()
text = (payload.get("text") or "").strip()
if not text:
raise TranscriptionError("OpenAI transcription response did not include text.")
return {
"text": text,
"provider": "openai",
"language": payload.get("language") or "unknown",
"segments": payload.get("segments") or [],
"raw": payload,
}
async def _transcribe_deepgram(recording_url: str) -> dict[str, Any]:
api_key = os.getenv("DEEPGRAM_API_KEY", "").strip()
if not api_key:
raise TranscriptionError("DEEPGRAM_API_KEY is required for COMMS_TRANSCRIPTION_PROVIDER=deepgram.")
audio, _, content_type = await _read_recording_bytes(recording_url)
model = os.getenv("COMMS_DEEPGRAM_MODEL", "nova-2")
language = os.getenv("COMMS_TRANSCRIPTION_LANGUAGE", "en")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"https://api.deepgram.com/v1/listen?model={model}&language={language}&diarize=true&smart_format=true",
headers={"Authorization": f"Token {api_key}", "Content-Type": content_type},
content=audio,
)
response.raise_for_status()
payload = response.json()
alternative = (
payload.get("results", {})
.get("channels", [{}])[0]
.get("alternatives", [{}])[0]
)
text = (alternative.get("transcript") or "").strip()
if not text:
raise TranscriptionError("Deepgram transcription response did not include text.")
words = alternative.get("words") or []
return {
"text": text,
"provider": "deepgram",
"language": language,
"segments": words,
"raw": payload,
}
async def _transcribe_http_endpoint(recording_url: str) -> dict[str, Any]:
endpoint = os.getenv("COMMS_TRANSCRIPTION_ENDPOINT", "").strip()
if not endpoint:
raise TranscriptionError("COMMS_TRANSCRIPTION_ENDPOINT is required for COMMS_TRANSCRIPTION_PROVIDER=http.")
token = os.getenv("COMMS_TRANSCRIPTION_ENDPOINT_TOKEN", "").strip()
headers = {"Authorization": f"Bearer {token}"} if token else {}
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(endpoint, json={"recording_url": recording_url}, headers=headers)
response.raise_for_status()
payload = response.json()
text = (payload.get("text") or payload.get("transcript") or "").strip()
if not text:
raise TranscriptionError("HTTP transcription endpoint response did not include text.")
return {
"text": text,
"provider": "http",
"language": payload.get("language") or "unknown",
"segments": payload.get("segments") or [],
"raw": payload,
}
async def transcribe_recording(recording_url: str, provider: str | None = None) -> dict[str, Any]:
selected = (provider or os.getenv("COMMS_TRANSCRIPTION_PROVIDER", "none")).strip().lower()
try:
if selected in {"", "none", "disabled"}:
raise TranscriptionError("COMMS_TRANSCRIPTION_PROVIDER is not configured.")
if selected in {"openai", "whisper"}:
return await _transcribe_openai(recording_url)
if selected == "deepgram":
return await _transcribe_deepgram(recording_url)
if selected in {"http", "endpoint", "custom"}:
return await _transcribe_http_endpoint(recording_url)
raise TranscriptionError(f"Unsupported COMMS_TRANSCRIPTION_PROVIDER '{selected}'.")
except httpx.HTTPError as exc:
raise TranscriptionError(f"{selected} transcription request failed: {exc}") from exc
def normalize_phone(phone: str, default_region: str = DEFAULT_COUNTRY) -> str | None:
"""Return an E.164-like phone number suitable for provider and CRM matching."""
if not phone:
return None
cleaned = re.sub(r"[^\d+]", "", phone.strip())
if cleaned.startswith("00"):
cleaned = "+" + cleaned[2:]
if not cleaned.startswith("+"):
cleaned = f"+{default_region}{cleaned}"
if PHONEUTILS_AVAILABLE and phonenumbers is not None:
try:
parsed = phonenumbers.parse(cleaned, None)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
pass
return cleaned if re.match(r"^\+\d{7,15}$", cleaned) else None
def _phone_digits(phone: str) -> str:
return re.sub(r"\D+", "", phone or "")
def _crm_channel(channel: str) -> str:
allowed = {"whatsapp", "sms", "call", "email", "website", "walk_in", "other"}
return channel if channel in allowed else "other"
async def get_or_create_thread(
pool,
phone_e164: str,
provider: str,
external_thread_id: str | None = None,
display_name: str | None = None,
channel: str = "whatsapp",
) -> dict[str, Any]:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT thread_id, person_id, status, unread_count
FROM comms_threads
WHERE phone_e164 = $1 AND provider = $2
LIMIT 1
""",
phone_e164,
provider,
)
if row:
return dict(row)
person_id = None
try:
person_row = await conn.fetchrow(
"""
SELECT person_id
FROM crm_people
WHERE primary_phone = $1
OR regexp_replace(COALESCE(primary_phone, ''), '[^0-9]', '', 'g') = $2
LIMIT 1
""",
phone_e164,
_phone_digits(phone_e164),
)
person_id = person_row["person_id"] if person_row else None
except Exception:
person_id = None
new_id = await conn.fetchval(
"""
INSERT INTO comms_threads
(provider, external_thread_id, person_id, phone_e164, display_name, channel, status, unread_count)
VALUES ($1, $2, $3, $4, $5, $6, 'open', 1)
RETURNING thread_id
""",
provider,
external_thread_id,
person_id,
phone_e164,
display_name or phone_e164,
channel,
)
return {
"thread_id": new_id,
"person_id": person_id,
"status": "open",
"unread_count": 1,
"is_new": True,
}
async def store_message(
pool,
thread_id: UUID,
provider: str,
external_message_id: str | None,
direction: str,
message_type: str,
body: str,
media_url: str | None = None,
raw_payload: dict[str, Any] | None = None,
sent_at: datetime | None = None,
) -> UUID:
async with pool.acquire() as conn:
msg_id = await conn.fetchval(
"""
INSERT INTO comms_messages
(thread_id, provider, external_message_id, direction, message_type, body, media_url,
delivery_status, sent_at, raw_payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::jsonb)
RETURNING message_id
""",
thread_id,
provider,
external_message_id,
direction,
message_type,
body,
media_url,
"delivered" if direction == "inbound" else "sent",
sent_at or datetime.now(UTC),
json.dumps(raw_payload or {}),
)
unread_delta = 1 if direction == "inbound" else 0
await conn.execute(
"""
UPDATE comms_threads
SET last_message_at = NOW(), unread_count = unread_count + $2, updated_at = NOW()
WHERE thread_id = $1
""",
thread_id,
unread_delta,
)
return msg_id
async def maybe_create_crm_interaction(pool, person_id: UUID, body: str, channel: str = "whatsapp") -> None:
"""Mirror inbound comms into canonical CRM intelligence tables when present."""
if not person_id:
return
try:
async with pool.acquire() as conn:
exists = await conn.fetchval("SELECT to_regclass('public.intel_interactions') IS NOT NULL")
if not exists:
return
interaction_id = await conn.fetchval(
"""
INSERT INTO intel_interactions
(person_id, channel, interaction_type, happened_at, summary, source_ref, metadata_json)
VALUES ($1, $2::intel_channel, 'message', NOW(), $3, 'comms_ingest', $4::jsonb)
RETURNING interaction_id
""",
person_id,
_crm_channel(channel),
body[:500],
json.dumps({"source": "comms", "direction": "inbound"}),
)
if await conn.fetchval("SELECT to_regclass('public.intel_messages') IS NOT NULL"):
await conn.execute(
"""
INSERT INTO intel_messages
(interaction_id, sender_role, sender_name, message_text, delivered_at, metadata_json)
VALUES ($1, 'lead', NULL, $2, NOW(), $3::jsonb)
""",
interaction_id,
body,
json.dumps({"source": "comms"}),
)
except Exception:
return
async def ingest_inbound_message(pool, normalized_payload: dict[str, Any]) -> dict[str, Any]:
phone = normalize_phone(normalized_payload.get("phone_e164") or normalized_payload.get("phone") or "")
if not phone:
raise ValueError("Missing phone_e164 in payload")
provider = normalized_payload.get("provider", "unknown")
channel = normalized_payload.get("channel", "whatsapp")
thread = await get_or_create_thread(
pool,
phone_e164=phone,
provider=provider,
external_thread_id=normalized_payload.get("external_thread_id"),
display_name=normalized_payload.get("display_name") or phone,
channel=channel,
)
timestamp = normalized_payload.get("timestamp")
sent_at = datetime.fromtimestamp(timestamp, UTC) if timestamp else None
msg_id = await store_message(
pool,
thread_id=thread["thread_id"],
provider=provider,
external_message_id=normalized_payload.get("external_message_id"),
direction=normalized_payload.get("direction", "inbound"),
message_type=normalized_payload.get("message_type", "text"),
body=normalized_payload.get("body", ""),
media_url=normalized_payload.get("media_url"),
raw_payload=normalized_payload.get("raw"),
sent_at=sent_at,
)
if thread.get("person_id") and normalized_payload.get("direction", "inbound") == "inbound":
await maybe_create_crm_interaction(pool, thread["person_id"], normalized_payload.get("body", ""), channel)
return {
"thread_id": str(thread["thread_id"]),
"message_id": str(msg_id),
"person_id": str(thread["person_id"]) if thread.get("person_id") else None,
"is_new_thread": thread.get("is_new", False),
}