from __future__ import annotations import os from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any from fastapi import FastAPI from fastapi.testclient import TestClient os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret") from backend.api.routes_crm import crm_router from backend.auth.dependencies import UserPrincipal, get_current_user def _now() -> datetime: return datetime.now(timezone.utc) class FakeConn: def __init__(self) -> None: self.leads: dict[str, dict[str, Any]] = {} self.chat_logs: dict[str, dict[str, Any]] = {} async def execute(self, query: str, *args): normalized = " ".join(query.strip().split()) if "CREATE TABLE IF NOT EXISTS leads" in normalized or "CREATE TABLE IF NOT EXISTS chat_logs" in normalized: return "CREATE" if normalized.startswith("ALTER TABLE leads ADD COLUMN IF NOT EXISTS tenant_id"): return "ALTER TABLE" if normalized.startswith("ALTER TABLE chat_logs ADD COLUMN IF NOT EXISTS tenant_id"): return "ALTER TABLE" if normalized.startswith("UPDATE leads SET tenant_id = $1 WHERE tenant_id IS NULL OR tenant_id = ''"): return "UPDATE" if normalized.startswith("UPDATE chat_logs SET tenant_id = $1 WHERE tenant_id IS NULL OR tenant_id = ''"): return "UPDATE" if normalized.startswith("ALTER TABLE leads ALTER COLUMN tenant_id SET DEFAULT"): return "ALTER TABLE" if normalized.startswith("ALTER TABLE chat_logs ALTER COLUMN tenant_id SET DEFAULT"): return "ALTER TABLE" if "CREATE INDEX IF NOT EXISTS" in normalized: return "CREATE INDEX" if normalized.startswith("DELETE FROM leads WHERE id = $1 AND tenant_id = $2"): existed = self.leads.get(args[0]) if existed and existed["tenant_id"] == args[1]: self.leads.pop(args[0], None) return "DELETE 1" return "DELETE 0" raise AssertionError(f"Unexpected execute query: {query}") async def fetchrow(self, query: str, *args): normalized = " ".join(query.strip().split()) if "INSERT INTO leads" in normalized: row = { "id": args[0], "tenant_id": args[1], "name": args[2], "email": args[3], "phone": args[4], "source": args[5], "notes": args[6], "qualification": args[7], "score": args[8], "kanban_status": args[9], "budget": args[10], "unit_interest": args[11], "metadata": {}, "created_at": _now(), "updated_at": _now(), } self.leads[row["id"]] = row return row if normalized.startswith("UPDATE leads") and "RETURNING" in normalized: row = self.leads.get(args[0]) tenant_arg = args[-1] if not row or row["tenant_id"] != tenant_arg: return None if len(args) == 13: row.update( { "name": args[1], "email": args[2], "phone": args[3], "source": args[4], "notes": args[5], "qualification": args[6], "score": args[7], "kanban_status": args[8], "budget": args[9], "unit_interest": args[10], "updated_at": _now(), } ) else: row.update( { "kanban_status": args[1], "qualification": "HOT" if row["score"] >= 45 else row["qualification"], "updated_at": _now(), } ) return row if normalized.startswith("SELECT id, name, email, phone, source, notes, qualification, score, kanban_status, budget, unit_interest, metadata, created_at, updated_at FROM leads WHERE id = $1 AND tenant_id = $2"): row = self.leads.get(args[0]) return row if row and row["tenant_id"] == args[1] else None if normalized.startswith("INSERT INTO chat_logs"): row = { "id": args[0], "tenant_id": args[1], "lead_id": args[2], "sender": args[3], "channel": args[4], "content": args[5], "metadata": {}, "created_at": _now(), } self.chat_logs[row["id"]] = row return row raise AssertionError(f"Unexpected fetchrow query: {query}") class FakePool: def __init__(self) -> None: self.conn = FakeConn() @asynccontextmanager async def acquire(self): yield self.conn def _build_client() -> tuple[TestClient, FakePool]: app = FastAPI() pool = FakePool() app.state.db_pool = pool app.include_router(crm_router, prefix="/api") app.dependency_overrides[get_current_user] = lambda: UserPrincipal("user-1", "ADMIN", "tenant_alpha") return TestClient(app), pool def test_create_lead_triggers_canonical_write_bridge(monkeypatch) -> None: client, _pool = _build_client() calls: list[dict[str, Any]] = [] async def fake_sync(request, conn, user, legacy_lead): calls.append({"lead_id": legacy_lead["id"], "name": legacy_lead["name"], "tenant_id": user.tenant_id}) return {"person_id": "person-1", "lead_id": "canon-1"} monkeypatch.setattr("backend.api.routes_crm._sync_canonical_lead_bridge", fake_sync) response = client.post("/api/leads", json={"name": "Amina Rahman", "source": "website", "score": 88}) assert response.status_code == 201 assert len(calls) == 1 assert calls[0]["name"] == "Amina Rahman" assert calls[0]["tenant_id"] == "tenant_alpha" def test_update_lead_triggers_canonical_write_bridge(monkeypatch) -> None: client, _pool = _build_client() calls: list[dict[str, Any]] = [] async def fake_sync(request, conn, user, legacy_lead): calls.append({"lead_id": legacy_lead["id"], "status": legacy_lead["kanban_status"]}) return {"person_id": "person-1", "lead_id": "canon-1"} monkeypatch.setattr("backend.api.routes_crm._sync_canonical_lead_bridge", fake_sync) create_response = client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 60}) lead_id = create_response.json()["data"]["id"] calls.clear() update_response = client.put( f"/api/leads/{lead_id}", json={"name": "Lead One Updated", "source": "website", "score": 75, "kanban_status": "negotiation"}, ) assert update_response.status_code == 200 assert len(calls) == 1 assert calls[0]["lead_id"] == lead_id assert calls[0]["status"] == "Negotiation" def test_create_chat_log_triggers_canonical_chat_bridge(monkeypatch) -> None: client, _pool = _build_client() calls: list[dict[str, Any]] = [] async def fake_lead_sync(request, conn, user, legacy_lead): return {"person_id": "person-1", "lead_id": "canon-1"} async def fake_chat_sync(request, conn, user, legacy_chat_log, legacy_lead): calls.append( { "chat_log_id": legacy_chat_log["id"], "lead_id": legacy_chat_log["lead_id"], "content": legacy_chat_log["content"], "lead_name": legacy_lead["name"], } ) monkeypatch.setattr("backend.api.routes_crm._sync_canonical_lead_bridge", fake_lead_sync) monkeypatch.setattr("backend.api.routes_crm._sync_canonical_chat_log_bridge", fake_chat_sync) create_response = client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 60}) lead_id = create_response.json()["data"]["id"] response = client.post( "/api/chat-logs", json={"lead_id": lead_id, "sender": "oracle", "channel": "whatsapp", "content": "Follow up tonight"}, ) assert response.status_code == 201 assert len(calls) == 1 assert calls[0]["lead_id"] == lead_id assert calls[0]["content"] == "Follow up tonight" def test_move_and_delete_trigger_canonical_write_bridges(monkeypatch) -> None: client, _pool = _build_client() move_calls: list[dict[str, Any]] = [] delete_calls: list[str] = [] async def fake_sync(request, conn, user, legacy_lead): move_calls.append({"lead_id": legacy_lead["id"], "status": legacy_lead["kanban_status"]}) return {"person_id": "person-1", "lead_id": "canon-1"} async def fake_delete(request, conn, user, legacy_lead_id): delete_calls.append(legacy_lead_id) monkeypatch.setattr("backend.api.routes_crm._sync_canonical_lead_bridge", fake_sync) monkeypatch.setattr("backend.api.routes_crm._delete_canonical_lead_bridge", fake_delete) create_response = client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 60}) lead_id = create_response.json()["data"]["id"] move_calls.clear() move_response = client.put("/api/kanban/move", json={"lead_id": lead_id, "target_status": "site_visit"}) delete_response = client.delete(f"/api/leads/{lead_id}") assert move_response.status_code == 200 assert delete_response.status_code == 200 assert move_calls == [{"lead_id": lead_id, "status": "Site Visit"}] assert delete_calls == [lead_id]