"""Inbound communications ingestion for Velocity CRM.""" from __future__ import annotations import json import os import re from datetime import UTC, datetime from typing import Any from uuid import UUID PHONEUTILS_AVAILABLE = False try: import phonenumbers from phonenumbers import NumberParseException PHONEUTILS_AVAILABLE = True except ImportError: phonenumbers = None # type: ignore[assignment] NumberParseException = Exception # type: ignore[assignment] DEFAULT_COUNTRY = os.getenv("COMMS_DEFAULT_COUNTRY_CODE", "91") def normalize_phone(phone: str, default_region: str = DEFAULT_COUNTRY) -> str | None: """Return an E.164-like phone number suitable for provider and CRM matching.""" if not phone: return None cleaned = re.sub(r"[^\d+]", "", phone.strip()) if cleaned.startswith("00"): cleaned = "+" + cleaned[2:] if not cleaned.startswith("+"): cleaned = f"+{default_region}{cleaned}" if PHONEUTILS_AVAILABLE and phonenumbers is not None: try: parsed = phonenumbers.parse(cleaned, None) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: pass return cleaned if re.match(r"^\+\d{7,15}$", cleaned) else None def _phone_digits(phone: str) -> str: return re.sub(r"\D+", "", phone or "") def _crm_channel(channel: str) -> str: allowed = {"whatsapp", "sms", "call", "email", "website", "walk_in", "other"} return channel if channel in allowed else "other" async def get_or_create_thread( pool, phone_e164: str, provider: str, external_thread_id: str | None = None, display_name: str | None = None, channel: str = "whatsapp", ) -> dict[str, Any]: async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT thread_id, person_id, status, unread_count FROM comms_threads WHERE phone_e164 = $1 AND provider = $2 LIMIT 1 """, phone_e164, provider, ) if row: return dict(row) person_id = None try: person_row = await conn.fetchrow( """ SELECT person_id FROM crm_people WHERE primary_phone = $1 OR regexp_replace(COALESCE(primary_phone, ''), '[^0-9]', '', 'g') = $2 LIMIT 1 """, phone_e164, _phone_digits(phone_e164), ) person_id = person_row["person_id"] if person_row else None except Exception: person_id = None new_id = await conn.fetchval( """ INSERT INTO comms_threads (provider, external_thread_id, person_id, phone_e164, display_name, channel, status, unread_count) VALUES ($1, $2, $3, $4, $5, $6, 'open', 1) RETURNING thread_id """, provider, external_thread_id, person_id, phone_e164, display_name or phone_e164, channel, ) return { "thread_id": new_id, "person_id": person_id, "status": "open", "unread_count": 1, "is_new": True, } async def store_message( pool, thread_id: UUID, provider: str, external_message_id: str | None, direction: str, message_type: str, body: str, media_url: str | None = None, raw_payload: dict[str, Any] | None = None, sent_at: datetime | None = None, ) -> UUID: async with pool.acquire() as conn: msg_id = await conn.fetchval( """ INSERT INTO comms_messages (thread_id, provider, external_message_id, direction, message_type, body, media_url, delivery_status, sent_at, raw_payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::jsonb) RETURNING message_id """, thread_id, provider, external_message_id, direction, message_type, body, media_url, "delivered" if direction == "inbound" else "sent", sent_at or datetime.now(UTC), json.dumps(raw_payload or {}), ) unread_delta = 1 if direction == "inbound" else 0 await conn.execute( """ UPDATE comms_threads SET last_message_at = NOW(), unread_count = unread_count + $2, updated_at = NOW() WHERE thread_id = $1 """, thread_id, unread_delta, ) return msg_id async def maybe_create_crm_interaction(pool, person_id: UUID, body: str, channel: str = "whatsapp") -> None: """Mirror inbound comms into canonical CRM intelligence tables when present.""" if not person_id: return try: async with pool.acquire() as conn: exists = await conn.fetchval("SELECT to_regclass('public.intel_interactions') IS NOT NULL") if not exists: return interaction_id = await conn.fetchval( """ INSERT INTO intel_interactions (person_id, channel, interaction_type, happened_at, summary, source_ref, metadata_json) VALUES ($1, $2::intel_channel, 'message', NOW(), $3, 'comms_ingest', $4::jsonb) RETURNING interaction_id """, person_id, _crm_channel(channel), body[:500], json.dumps({"source": "comms", "direction": "inbound"}), ) if await conn.fetchval("SELECT to_regclass('public.intel_messages') IS NOT NULL"): await conn.execute( """ INSERT INTO intel_messages (interaction_id, sender_role, sender_name, message_text, delivered_at, metadata_json) VALUES ($1, 'lead', NULL, $2, NOW(), $3::jsonb) """, interaction_id, body, json.dumps({"source": "comms"}), ) except Exception: return async def ingest_inbound_message(pool, normalized_payload: dict[str, Any]) -> dict[str, Any]: phone = normalize_phone(normalized_payload.get("phone_e164") or normalized_payload.get("phone") or "") if not phone: raise ValueError("Missing phone_e164 in payload") provider = normalized_payload.get("provider", "unknown") channel = normalized_payload.get("channel", "whatsapp") thread = await get_or_create_thread( pool, phone_e164=phone, provider=provider, external_thread_id=normalized_payload.get("external_thread_id"), display_name=normalized_payload.get("display_name") or phone, channel=channel, ) timestamp = normalized_payload.get("timestamp") sent_at = datetime.fromtimestamp(timestamp, UTC) if timestamp else None msg_id = await store_message( pool, thread_id=thread["thread_id"], provider=provider, external_message_id=normalized_payload.get("external_message_id"), direction=normalized_payload.get("direction", "inbound"), message_type=normalized_payload.get("message_type", "text"), body=normalized_payload.get("body", ""), media_url=normalized_payload.get("media_url"), raw_payload=normalized_payload.get("raw"), sent_at=sent_at, ) if thread.get("person_id") and normalized_payload.get("direction", "inbound") == "inbound": await maybe_create_crm_interaction(pool, thread["person_id"], normalized_payload.get("body", ""), channel) return { "thread_id": str(thread["thread_id"]), "message_id": str(msg_id), "person_id": str(thread["person_id"]) if thread.get("person_id") else None, "is_new_thread": thread.get("is_new", False), }