Files
Project_Velocity/backend/tests/test_crm_routes.py
sagnik 4645ff737b feat: Complete code integration of modules (#18)
The complete code integration is done.

Co-authored-by: Sagnik <sagnik7896@gmail.com>
Reviewed-on: #18
2026-04-12 19:20:14 +05:30

216 lines
8.1 KiB
Python

from __future__ import annotations
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any
from fastapi import FastAPI
from fastapi.testclient import TestClient
from backend.api.routes_crm import analytics_router, crm_router
def _now() -> datetime:
return datetime.now(timezone.utc)
class FakeConn:
def __init__(self) -> None:
self.leads: dict[str, dict[str, Any]] = {}
self.chat_logs: dict[str, dict[str, Any]] = {}
async def execute(self, query: str, *args):
normalized = query.strip()
if "CREATE TABLE IF NOT EXISTS leads" in normalized or "CREATE TABLE IF NOT EXISTS chat_logs" in normalized:
return "CREATE"
if "CREATE INDEX IF NOT EXISTS" in normalized:
return "CREATE INDEX"
if normalized.startswith("DELETE FROM leads WHERE id = $1"):
existed = self.leads.pop(args[0], None)
return "DELETE 1" if existed else "DELETE 0"
raise AssertionError(f"Unexpected execute query: {query}")
async def fetchrow(self, query: str, *args):
normalized = query.strip()
if "INSERT INTO leads" in normalized:
row = {
"id": args[0],
"name": args[1],
"email": args[2],
"phone": args[3],
"source": args[4],
"notes": args[5],
"qualification": args[6],
"score": args[7],
"kanban_status": args[8],
"budget": args[9],
"unit_interest": args[10],
"metadata": {},
"created_at": _now(),
"updated_at": _now(),
}
self.leads[row["id"]] = row
return row
if normalized.startswith("UPDATE leads") and "SET kanban_status" in normalized:
lead = self.leads.get(args[0])
if not lead:
return None
lead["kanban_status"] = args[1]
lead["updated_at"] = _now()
if lead["score"] >= 90:
lead["qualification"] = "WHALE"
elif lead["score"] >= 70:
lead["qualification"] = "POTENTIAL"
elif lead["score"] >= 45:
lead["qualification"] = "HOT"
return lead
if normalized.startswith("UPDATE leads") and "RETURNING" in normalized:
lead = self.leads.get(args[0])
if not lead:
return None
lead.update(
{
"name": args[1],
"email": args[2],
"phone": args[3],
"source": args[4],
"notes": args[5],
"qualification": args[6],
"score": args[7],
"kanban_status": args[8],
"budget": args[9],
"unit_interest": args[10],
"updated_at": _now(),
}
)
return lead
if normalized.startswith("SELECT id FROM leads WHERE id = $1"):
lead = self.leads.get(args[0])
return {"id": lead["id"]} if lead else None
if "INSERT INTO chat_logs" in normalized:
row = {
"id": args[0],
"lead_id": args[1],
"sender": args[2],
"channel": args[3],
"content": args[4],
"metadata": {},
"created_at": _now(),
}
self.chat_logs[row["id"]] = row
return row
raise AssertionError(f"Unexpected fetchrow query: {query}")
async def fetch(self, query: str, *args):
normalized = query.strip()
if "FROM leads" in normalized and "GROUP BY source" not in normalized and "GROUP BY qualification" not in normalized:
rows = list(self.leads.values())
if "WHERE kanban_status = $1" in normalized:
rows = [row for row in rows if row["kanban_status"] == args[0]]
return rows
if "FROM chat_logs" in normalized:
rows = list(self.chat_logs.values())
if "WHERE lead_id = $1" in normalized:
rows = [row for row in rows if row["lead_id"] == args[0]]
return rows
if "GROUP BY source" in normalized:
grouped: dict[str, dict[str, Any]] = {}
for lead in self.leads.values():
slot = grouped.setdefault(lead["source"], {"source": lead["source"], "lead_count": 0, "avg_score": 0.0})
slot["lead_count"] += 1
slot["avg_score"] += float(lead["score"])
for slot in grouped.values():
slot["avg_score"] = slot["avg_score"] / slot["lead_count"]
return list(grouped.values())
if "GROUP BY qualification" in normalized:
grouped: dict[str, dict[str, Any]] = {}
for lead in self.leads.values():
slot = grouped.setdefault(lead["qualification"], {"qualification": lead["qualification"], "lead_count": 0})
slot["lead_count"] += 1
return list(grouped.values())
raise AssertionError(f"Unexpected fetch query: {query}")
class FakePool:
def __init__(self) -> None:
self.conn = FakeConn()
@asynccontextmanager
async def acquire(self):
yield self.conn
def _build_client() -> tuple[TestClient, FakePool]:
app = FastAPI()
pool = FakePool()
app.state.db_pool = pool
app.include_router(crm_router, prefix="/api")
app.include_router(analytics_router, prefix="/api/analytics")
return TestClient(app), pool
def test_crm_crud_and_analytics_flow() -> None:
client, _pool = _build_client()
create_response = client.post(
"/api/leads",
json={
"name": "Amina Rahman",
"email": "amina@example.com",
"phone": "+971500000001",
"source": "website",
"notes": "Cash buyer interested in marina penthouse",
"score": 92,
"kanban_status": "qualified",
"budget": "AED 12M",
"unit_interest": "Penthouse",
"metadata": {"campaign": "meta-velocity-marina"},
},
)
assert create_response.status_code == 201
lead_id = create_response.json()["data"]["id"]
list_response = client.get("/api/leads")
assert list_response.status_code == 200
assert list_response.json()["meta"]["count"] == 1
chat_response = client.post(
"/api/chat-logs",
json={
"lead_id": lead_id,
"sender": "oracle",
"channel": "whatsapp",
"content": "Lead requested a private marina walkthrough.",
"metadata": {"sentiment": "positive"},
},
)
assert chat_response.status_code == 201
board_response = client.get("/api/kanban/board")
assert board_response.status_code == 200
board = board_response.json()["data"]
qualifying_column = next(column for column in board if column["status"] == "Qualifying")
assert qualifying_column["count"] == 1
move_response = client.put("/api/kanban/move", json={"lead_id": lead_id, "target_status": "negotiation"})
assert move_response.status_code == 200
assert move_response.json()["data"]["kanban_status"] == "Negotiation"
scatter_response = client.get("/api/analytics/sentiment-scatter")
assert scatter_response.status_code == 200
scatter = scatter_response.json()
assert scatter[0]["qualification"] == "WHALE"
assert scatter[0]["kanban_status"] == "Negotiation"
def test_lead_demographics_groups_by_source_and_qualification() -> None:
client, _pool = _build_client()
client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 80})
client.post("/api/leads", json={"name": "Lead Two", "source": "walkin", "score": 45})
response = client.get("/api/leads/demographics")
assert response.status_code == 200
payload = response.json()["data"]
assert len(payload["by_source"]) == 2
assert any(row["qualification"] == "POTENTIAL" for row in payload["by_qualification"])