from __future__ import annotations import os from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any from fastapi import FastAPI from fastapi.testclient import TestClient os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret") from backend.api.routes_crm import analytics_router, crm_router from backend.auth.dependencies import UserPrincipal, get_current_user def _now() -> datetime: return datetime.now(timezone.utc) class FakeConn: def __init__(self) -> None: self.leads: dict[str, dict[str, Any]] = {} self.chat_logs: dict[str, dict[str, Any]] = {} async def execute(self, query: str, *args): normalized = query.strip() if "CREATE TABLE IF NOT EXISTS leads" in normalized or "CREATE TABLE IF NOT EXISTS chat_logs" in normalized: return "CREATE" if normalized.startswith("ALTER TABLE leads ADD COLUMN IF NOT EXISTS tenant_id"): for lead in self.leads.values(): lead.setdefault("tenant_id", "tenant_velocity") return "ALTER TABLE" if normalized.startswith("ALTER TABLE chat_logs ADD COLUMN IF NOT EXISTS tenant_id"): for log in self.chat_logs.values(): log.setdefault("tenant_id", "tenant_velocity") return "ALTER TABLE" if normalized.startswith("UPDATE leads") and "SET tenant_id = $1" in normalized: for lead in self.leads.values(): if not lead.get("tenant_id"): lead["tenant_id"] = args[0] return "UPDATE" if normalized.startswith("UPDATE chat_logs") and "SET tenant_id = $1" in normalized: for log in self.chat_logs.values(): if not log.get("tenant_id"): log["tenant_id"] = args[0] return "UPDATE" if normalized.startswith("ALTER TABLE leads ALTER COLUMN tenant_id SET DEFAULT"): return "ALTER TABLE" if normalized.startswith("ALTER TABLE chat_logs ALTER COLUMN tenant_id SET DEFAULT"): return "ALTER TABLE" if "CREATE INDEX IF NOT EXISTS" in normalized: return "CREATE INDEX" if normalized.startswith("DELETE FROM leads WHERE id = $1 AND tenant_id = $2"): existed = self.leads.get(args[0]) if existed and existed["tenant_id"] == args[1]: self.leads.pop(args[0], None) return "DELETE 1" return "DELETE 0" if normalized.startswith("DELETE FROM leads WHERE id = $1"): existed = self.leads.pop(args[0], None) return "DELETE 1" if existed else "DELETE 0" raise AssertionError(f"Unexpected execute query: {query}") async def fetchrow(self, query: str, *args): normalized = query.strip() if "INSERT INTO leads" in normalized: has_tenant = "tenant_id" in normalized.split("(", 1)[1].split(")", 1)[0] tenant_id = args[1] if has_tenant else "tenant_velocity" base = 2 if has_tenant else 1 row = { "id": args[0], "tenant_id": tenant_id, "name": args[base], "email": args[base + 1], "phone": args[base + 2], "source": args[base + 3], "notes": args[base + 4], "qualification": args[base + 5], "score": args[base + 6], "kanban_status": args[base + 7], "budget": args[base + 8], "unit_interest": args[base + 9], "metadata": {}, "created_at": _now(), "updated_at": _now(), } self.leads[row["id"]] = row return row if normalized.startswith("UPDATE leads") and "SET kanban_status" in normalized: lead = self.leads.get(args[0]) if not lead or lead["tenant_id"] != args[2]: return None lead["kanban_status"] = args[1] lead["updated_at"] = _now() if lead["score"] >= 90: lead["qualification"] = "WHALE" elif lead["score"] >= 70: lead["qualification"] = "POTENTIAL" elif lead["score"] >= 45: lead["qualification"] = "HOT" return lead if normalized.startswith("UPDATE leads") and "RETURNING" in normalized: lead = self.leads.get(args[0]) if not lead or lead["tenant_id"] != args[12]: return None lead.update( { "name": args[1], "email": args[2], "phone": args[3], "source": args[4], "notes": args[5], "qualification": args[6], "score": args[7], "kanban_status": args[8], "budget": args[9], "unit_interest": args[10], "updated_at": _now(), } ) return lead if normalized.startswith("SELECT id FROM leads WHERE id = $1 AND tenant_id = $2"): lead = self.leads.get(args[0]) return {"id": lead["id"]} if lead and lead["tenant_id"] == args[1] else None if "INSERT INTO chat_logs" in normalized: has_tenant = "tenant_id" in normalized.split("(", 1)[1].split(")", 1)[0] tenant_id = args[1] if has_tenant else "tenant_velocity" base = 2 if has_tenant else 1 row = { "id": args[0], "tenant_id": tenant_id, "lead_id": args[base], "sender": args[base + 1], "channel": args[base + 2], "content": args[base + 3], "metadata": {}, "created_at": _now(), } self.chat_logs[row["id"]] = row return row if "FROM leads" in normalized and "WHERE id = $1 AND tenant_id = $2" in normalized: lead = self.leads.get(args[0]) return lead if lead and lead["tenant_id"] == args[1] else None raise AssertionError(f"Unexpected fetchrow query: {query}") async def fetch(self, query: str, *args): normalized = query.strip() if "FROM leads" in normalized and "GROUP BY source" not in normalized and "GROUP BY qualification" not in normalized: rows = [row for row in self.leads.values() if row["tenant_id"] == args[0]] if "WHERE tenant_id = $1 AND kanban_status = $2" in normalized: rows = [row for row in rows if row["kanban_status"] == args[1]] return rows if "FROM chat_logs" in normalized: rows = [row for row in self.chat_logs.values() if row["tenant_id"] == args[0]] if "WHERE tenant_id = $1 AND lead_id = $2" in normalized: rows = [row for row in rows if row["lead_id"] == args[1]] return rows if "GROUP BY source" in normalized: grouped: dict[str, dict[str, Any]] = {} for lead in self.leads.values(): if lead["tenant_id"] != args[0]: continue slot = grouped.setdefault(lead["source"], {"source": lead["source"], "lead_count": 0, "avg_score": 0.0}) slot["lead_count"] += 1 slot["avg_score"] += float(lead["score"]) for slot in grouped.values(): slot["avg_score"] = slot["avg_score"] / slot["lead_count"] return list(grouped.values()) if "GROUP BY qualification" in normalized: grouped: dict[str, dict[str, Any]] = {} for lead in self.leads.values(): if lead["tenant_id"] != args[0]: continue slot = grouped.setdefault(lead["qualification"], {"qualification": lead["qualification"], "lead_count": 0}) slot["lead_count"] += 1 return list(grouped.values()) raise AssertionError(f"Unexpected fetch query: {query}") class FakePool: def __init__(self) -> None: self.conn = FakeConn() @asynccontextmanager async def acquire(self): yield self.conn def _build_client( *, authenticated: bool = True, tenant_id: str = "tenant_velocity", ) -> tuple[TestClient, FakePool]: app = FastAPI() pool = FakePool() app.state.db_pool = pool app.include_router(crm_router, prefix="/api") app.include_router(analytics_router, prefix="/api/analytics") if authenticated: app.dependency_overrides[get_current_user] = lambda: UserPrincipal("user-1", "ADMIN", tenant_id) return TestClient(app), pool def _build_shared_app(pool: FakePool, current_user: dict[str, str]) -> TestClient: app = FastAPI() app.state.db_pool = pool app.include_router(crm_router, prefix="/api") app.include_router(analytics_router, prefix="/api/analytics") app.dependency_overrides[get_current_user] = lambda: UserPrincipal( "user-1", current_user["role"], current_user["tenant_id"], ) return TestClient(app) def test_crm_crud_and_analytics_flow() -> None: client, _pool = _build_client() create_response = client.post( "/api/leads", json={ "name": "Amina Rahman", "email": "amina@example.com", "phone": "+971500000001", "source": "website", "notes": "Cash buyer interested in marina penthouse", "score": 92, "kanban_status": "qualified", "budget": "AED 12M", "unit_interest": "Penthouse", "metadata": {"campaign": "meta-velocity-marina"}, }, ) assert create_response.status_code == 201 lead_id = create_response.json()["data"]["id"] list_response = client.get("/api/leads") assert list_response.status_code == 200 assert list_response.json()["meta"]["count"] == 1 chat_response = client.post( "/api/chat-logs", json={ "lead_id": lead_id, "sender": "oracle", "channel": "whatsapp", "content": "Lead requested a private marina walkthrough.", "metadata": {"sentiment": "positive"}, }, ) assert chat_response.status_code == 201 board_response = client.get("/api/kanban/board") assert board_response.status_code == 200 board = board_response.json()["data"] qualifying_column = next(column for column in board if column["status"] == "Qualifying") assert qualifying_column["count"] == 1 move_response = client.put("/api/kanban/move", json={"lead_id": lead_id, "target_status": "negotiation"}) assert move_response.status_code == 200 assert move_response.json()["data"]["kanban_status"] == "Negotiation" scatter_response = client.get("/api/analytics/sentiment-scatter") assert scatter_response.status_code == 200 scatter = scatter_response.json() assert scatter[0]["qualification"] == "WHALE" assert scatter[0]["kanban_status"] == "Negotiation" def test_lead_demographics_groups_by_source_and_qualification() -> None: client, _pool = _build_client() client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 80}) client.post("/api/leads", json={"name": "Lead Two", "source": "walkin", "score": 45}) response = client.get("/api/leads/demographics") assert response.status_code == 200 payload = response.json()["data"] assert len(payload["by_source"]) == 2 assert any(row["qualification"] == "POTENTIAL" for row in payload["by_qualification"]) def test_crm_routes_require_authentication() -> None: client, _pool = _build_client(authenticated=False) response = client.get("/api/leads") assert response.status_code == 401 assert response.json()["detail"] == "Missing or malformed Authorization header." def test_crm_routes_are_scoped_to_authenticated_tenant() -> None: pool = FakePool() tenant_a = {"role": "ADMIN", "tenant_id": "tenant_alpha"} client_a = _build_shared_app(pool, tenant_a) create_response = client_a.post( "/api/leads", json={"name": "Tenant Alpha Lead", "source": "website", "score": 88}, ) assert create_response.status_code == 201 lead_id = create_response.json()["data"]["id"] tenant_b = {"role": "ADMIN", "tenant_id": "tenant_beta"} client_b = _build_shared_app(pool, tenant_b) list_response = client_b.get("/api/leads") assert list_response.status_code == 200 assert list_response.json()["meta"]["count"] == 0 get_response = client_b.get(f"/api/leads/{lead_id}") assert get_response.status_code == 404 delete_response = client_b.delete(f"/api/leads/{lead_id}") assert delete_response.status_code == 404 own_list_response = client_a.get("/api/leads") assert own_list_response.status_code == 200 assert own_list_response.json()["meta"]["count"] == 1