471 lines
18 KiB
Python
471 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret")
|
|
|
|
from backend.api.routes_inventory import router as inventory_router
|
|
from backend.api.routes_mobile_edge import router as mobile_edge_router
|
|
from backend.auth.dependencies import UserPrincipal, get_current_user
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
class FakeSurfaceConn:
|
|
def __init__(self) -> None:
|
|
self.events: dict[str, dict[str, Any]] = {}
|
|
self.calendar_events: dict[str, dict[str, Any]] = {}
|
|
self.properties: dict[str, dict[str, Any]] = {}
|
|
self.import_batches: dict[str, dict[str, Any]] = {}
|
|
|
|
async def fetchrow(self, query: str, *args):
|
|
normalized = " ".join(query.strip().split())
|
|
|
|
if "INSERT INTO edge_communication_events" in normalized:
|
|
event_id = str(uuid.uuid4())
|
|
row = {
|
|
"event_id": event_id,
|
|
"tenant_id": args[0],
|
|
"lead_id": args[1],
|
|
"channel": args[2],
|
|
"direction": args[3],
|
|
"provider": args[4],
|
|
"capture_mode": args[5],
|
|
"consent_state": args[6],
|
|
"duration_seconds": args[7],
|
|
"summary": args[8],
|
|
"raw_reference": args[9],
|
|
"recording_ref": args[10],
|
|
"provider_metadata": args[11],
|
|
"timestamp": _now(),
|
|
"created_at": _now(),
|
|
}
|
|
self.events[event_id] = row
|
|
return {"event_id": event_id, "created_at": row["created_at"]}
|
|
|
|
if "INSERT INTO user_calendar_events" in normalized:
|
|
calendar_event_id = str(uuid.uuid4())
|
|
row = {
|
|
"calendar_event_id": calendar_event_id,
|
|
"tenant_id": args[0],
|
|
"owner_user_id": args[1],
|
|
"lead_id": args[2],
|
|
"source_event_id": args[3],
|
|
"title": args[4],
|
|
"description": args[5],
|
|
"start_at": args[6],
|
|
"end_at": args[7],
|
|
"all_day": args[8],
|
|
"status": args[9],
|
|
"reminder_minutes": args[10],
|
|
"created_by": args[11],
|
|
"location": args[12],
|
|
"metadata": args[13],
|
|
"created_at": _now(),
|
|
}
|
|
self.calendar_events[calendar_event_id] = row
|
|
return row
|
|
|
|
if "INSERT INTO inventory_import_batches" in normalized:
|
|
batch_id = str(uuid.uuid4())
|
|
row = {
|
|
"batch_id": batch_id,
|
|
"tenant_id": args[0],
|
|
"source_type": args[1],
|
|
"submitted_by": args[2],
|
|
"total_rows": args[3],
|
|
"source_file_ref": args[4],
|
|
"accepted_rows": 0,
|
|
"rejected_rows": 0,
|
|
"status": "pending",
|
|
"created_at": _now(),
|
|
"completed_at": None,
|
|
}
|
|
self.import_batches[batch_id] = row
|
|
return {
|
|
"batch_id": batch_id,
|
|
"status": row["status"],
|
|
"created_at": row["created_at"],
|
|
}
|
|
|
|
if "INSERT INTO inventory_properties" in normalized:
|
|
property_id = str(uuid.uuid4())
|
|
row = {
|
|
"property_id": property_id,
|
|
"tenant_id": args[0],
|
|
"batch_id": args[1],
|
|
"source_id": args[2],
|
|
"project_name": args[3],
|
|
"developer_name": args[4],
|
|
"location": args[5],
|
|
"property_type": args[6],
|
|
"price_bands": args[7],
|
|
"unit_mix": args[8],
|
|
"amenities": args[9],
|
|
"status": args[10],
|
|
"validation_state": args[11],
|
|
"ingested_at": None,
|
|
"created_at": _now(),
|
|
"updated_at": _now(),
|
|
}
|
|
self.properties[property_id] = row
|
|
return {"property_id": property_id, "created_at": row["created_at"]}
|
|
|
|
if normalized.startswith("SELECT * FROM inventory_import_batches WHERE batch_id=$1 AND tenant_id=$2"):
|
|
row = self.import_batches.get(args[0])
|
|
return row if row and row["tenant_id"] == args[1] else None
|
|
|
|
if normalized.startswith("SELECT * FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2"):
|
|
row = self.properties.get(args[0])
|
|
return row if row and row["tenant_id"] == args[1] else None
|
|
|
|
raise AssertionError(f"Unexpected fetchrow query: {query}")
|
|
|
|
async def fetch(self, query: str, *args):
|
|
normalized = " ".join(query.strip().split())
|
|
|
|
if "FROM edge_communication_events" in normalized:
|
|
tenant_id, lead_id, limit, offset = args
|
|
rows = [
|
|
{
|
|
"event_id": row["event_id"],
|
|
"lead_id": row["lead_id"],
|
|
"channel": row["channel"],
|
|
"direction": row["direction"],
|
|
"provider": row["provider"],
|
|
"capture_mode": row["capture_mode"],
|
|
"consent_state": row["consent_state"],
|
|
"timestamp": row["timestamp"].isoformat(),
|
|
"duration_seconds": row["duration_seconds"],
|
|
"summary": row["summary"],
|
|
"raw_reference": row["raw_reference"],
|
|
"recording_ref": row["recording_ref"],
|
|
"provider_metadata": row["provider_metadata"],
|
|
"created_at": row["created_at"].isoformat(),
|
|
}
|
|
for row in self.events.values()
|
|
if row["tenant_id"] == tenant_id and row["lead_id"] == lead_id
|
|
]
|
|
rows.sort(key=lambda item: item["timestamp"], reverse=True)
|
|
return rows[offset : offset + limit]
|
|
|
|
if "FROM user_calendar_events" in normalized:
|
|
tenant_id = args[0]
|
|
owner_user_id = args[1]
|
|
limit = args[-1]
|
|
rows = [
|
|
{
|
|
"calendar_event_id": row["calendar_event_id"],
|
|
"lead_id": row["lead_id"],
|
|
"title": row["title"],
|
|
"description": row["description"],
|
|
"start_at": row["start_at"],
|
|
"end_at": row["end_at"],
|
|
"all_day": row["all_day"],
|
|
"status": row["status"],
|
|
"reminder_minutes": row["reminder_minutes"],
|
|
"created_by": row["created_by"],
|
|
"location": row["location"],
|
|
"metadata": row["metadata"],
|
|
"created_at": row["created_at"].isoformat(),
|
|
}
|
|
for row in self.calendar_events.values()
|
|
if row["tenant_id"] == tenant_id and row["owner_user_id"] == owner_user_id
|
|
and row["status"] != "cancelled"
|
|
]
|
|
rows.sort(key=lambda item: item["start_at"])
|
|
return rows[:limit]
|
|
|
|
if "FROM inventory_import_batches" in normalized:
|
|
tenant_id, limit, offset = args
|
|
rows = [
|
|
{
|
|
"batch_id": row["batch_id"],
|
|
"source_type": row["source_type"],
|
|
"submitted_by": row["submitted_by"],
|
|
"status": row["status"],
|
|
"total_rows": row["total_rows"],
|
|
"accepted_rows": row["accepted_rows"],
|
|
"rejected_rows": row["rejected_rows"],
|
|
"created_at": row["created_at"].isoformat(),
|
|
"completed_at": row["completed_at"],
|
|
}
|
|
for row in self.import_batches.values()
|
|
if row["tenant_id"] == tenant_id
|
|
]
|
|
rows.sort(key=lambda item: item["created_at"], reverse=True)
|
|
return rows[offset : offset + limit]
|
|
|
|
if "FROM inventory_properties" in normalized:
|
|
params = list(args)
|
|
tenant_id = params[0]
|
|
limit = params[-2]
|
|
offset = params[-1]
|
|
status_filter = None
|
|
property_type = None
|
|
if len(params) == 4:
|
|
status_filter = params[1]
|
|
if len(params) == 5:
|
|
status_filter = params[1]
|
|
property_type = params[2]
|
|
rows = [
|
|
{
|
|
"property_id": row["property_id"],
|
|
"project_name": row["project_name"],
|
|
"developer_name": row["developer_name"],
|
|
"property_type": row["property_type"],
|
|
"location": row["location"],
|
|
"price_bands": row["price_bands"],
|
|
"unit_mix": row["unit_mix"],
|
|
"status": row["status"],
|
|
"ingested_at": row["ingested_at"],
|
|
"created_at": row["created_at"].isoformat(),
|
|
}
|
|
for row in self.properties.values()
|
|
if row["tenant_id"] == tenant_id
|
|
and (status_filter is None or row["status"] == status_filter)
|
|
and (property_type is None or row["property_type"] == property_type)
|
|
]
|
|
rows.sort(key=lambda item: item["created_at"], reverse=True)
|
|
return rows[offset : offset + limit]
|
|
|
|
raise AssertionError(f"Unexpected fetch query: {query}")
|
|
|
|
async def fetchval(self, query: str, *args):
|
|
normalized = " ".join(query.strip().split())
|
|
|
|
if normalized.startswith("SELECT COUNT(*) FROM edge_communication_events WHERE tenant_id = $1 AND lead_id = $2"):
|
|
tenant_id, lead_id = args
|
|
return sum(
|
|
1
|
|
for row in self.events.values()
|
|
if row["tenant_id"] == tenant_id and row["lead_id"] == lead_id
|
|
)
|
|
|
|
if normalized.startswith("SELECT COUNT(*) FROM inventory_import_batches WHERE tenant_id=$1"):
|
|
tenant_id = args[0]
|
|
return sum(1 for row in self.import_batches.values() if row["tenant_id"] == tenant_id)
|
|
|
|
if normalized.startswith("SELECT COUNT(*) FROM inventory_properties WHERE tenant_id = $1"):
|
|
tenant_id = args[0]
|
|
return sum(1 for row in self.properties.values() if row["tenant_id"] == tenant_id)
|
|
|
|
raise AssertionError(f"Unexpected fetchval query: {query}")
|
|
|
|
async def execute(self, query: str, *args):
|
|
normalized = " ".join(query.strip().split())
|
|
|
|
if "UPDATE user_calendar_events SET status='cancelled'" in normalized:
|
|
tenant_id, owner_user_id, calendar_event_id = args
|
|
row = self.calendar_events.get(calendar_event_id)
|
|
if not row or row["tenant_id"] != tenant_id or row["owner_user_id"] != owner_user_id:
|
|
return "UPDATE 0"
|
|
row["status"] = "cancelled"
|
|
return "UPDATE 1"
|
|
|
|
if normalized.startswith("UPDATE user_calendar_events SET"):
|
|
tenant_id = args[-3]
|
|
owner_user_id = args[-2]
|
|
calendar_event_id = args[-1]
|
|
row = self.calendar_events.get(calendar_event_id)
|
|
if not row or row["tenant_id"] != tenant_id or row["owner_user_id"] != owner_user_id:
|
|
return "UPDATE 0"
|
|
assignments = normalized.split(" SET ", 1)[1].split(" WHERE ", 1)[0].split(", ")
|
|
for assignment, value in zip(assignments, args):
|
|
column = assignment.split(" = ", 1)[0]
|
|
if column not in {"tenant_id", "owner_user_id"}:
|
|
row[column] = value
|
|
return "UPDATE 1"
|
|
|
|
raise AssertionError(f"Unexpected execute query: {query}")
|
|
|
|
|
|
class FakeSurfacePool:
|
|
def __init__(self) -> None:
|
|
self.conn = FakeSurfaceConn()
|
|
|
|
@asynccontextmanager
|
|
async def acquire(self):
|
|
yield self.conn
|
|
|
|
|
|
def _build_shared_app(pool: FakeSurfacePool, current_user: dict[str, str]) -> TestClient:
|
|
app = FastAPI()
|
|
app.state.db_pool = pool
|
|
app.include_router(mobile_edge_router, prefix="/api/mobile-edge")
|
|
app.include_router(inventory_router, prefix="/api/inventory")
|
|
app.dependency_overrides[get_current_user] = lambda: UserPrincipal(
|
|
current_user["user_id"],
|
|
current_user["role"],
|
|
current_user["tenant_id"],
|
|
)
|
|
return TestClient(app)
|
|
|
|
|
|
def test_mobile_edge_event_routes_scope_by_tenant_id_instead_of_role() -> None:
|
|
pool = FakeSurfacePool()
|
|
tenant_a = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
|
|
)
|
|
tenant_b = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
|
|
)
|
|
|
|
create_response = tenant_a.post(
|
|
"/api/mobile-edge/events",
|
|
json={
|
|
"lead_id": "lead-123",
|
|
"channel": "whatsapp_message",
|
|
"direction": "inbound",
|
|
"capture_mode": "operator_note",
|
|
"consent_state": "granted",
|
|
"summary": "Client asked for a marina brochure.",
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
|
|
tenant_a_events = tenant_a.get("/api/mobile-edge/events", params={"lead_id": "lead-123", "limit": 10})
|
|
assert tenant_a_events.status_code == 200
|
|
assert tenant_a_events.json()["total"] == 1
|
|
|
|
tenant_b_events = tenant_b.get("/api/mobile-edge/events", params={"lead_id": "lead-123", "limit": 10})
|
|
assert tenant_b_events.status_code == 200
|
|
assert tenant_b_events.json()["total"] == 0
|
|
assert tenant_b_events.json()["events"] == []
|
|
|
|
|
|
def test_mobile_edge_calendar_routes_scope_by_tenant_id_instead_of_role() -> None:
|
|
pool = FakeSurfacePool()
|
|
tenant_a = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
|
|
)
|
|
tenant_b = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
|
|
)
|
|
|
|
create_response = tenant_a.post(
|
|
"/api/mobile-edge/calendar",
|
|
json={
|
|
"lead_id": "lead-123",
|
|
"title": "Private site visit",
|
|
"description": "Walkthrough with the lead",
|
|
"start_at": "2026-04-23T10:00:00Z",
|
|
"end_at": "2026-04-23T11:00:00Z",
|
|
"all_day": False,
|
|
"status": "tentative",
|
|
"reminder_minutes": [15],
|
|
"location": "Dubai Marina",
|
|
"metadata": {"source": "ipad"},
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
created_payload = create_response.json()
|
|
assert created_payload["status"] == "ok"
|
|
assert created_payload["event"]["title"] == "Private site visit"
|
|
assert created_payload["event"]["location"] == "Dubai Marina"
|
|
assert created_payload["event"]["status"] == "tentative"
|
|
assert created_payload["event"]["reminder_minutes"] == [15]
|
|
|
|
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
|
|
assert tenant_a_calendar.status_code == 200
|
|
assert len(tenant_a_calendar.json()["events"]) == 1
|
|
|
|
event_id = created_payload["event"]["calendar_event_id"]
|
|
update_response = tenant_a.patch(
|
|
f"/api/mobile-edge/calendar/{event_id}",
|
|
json={"status": "confirmed"},
|
|
)
|
|
assert update_response.status_code == 200
|
|
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
|
|
assert tenant_a_calendar.json()["events"][0]["status"] == "confirmed"
|
|
|
|
cancel_response = tenant_a.delete(f"/api/mobile-edge/calendar/{event_id}")
|
|
assert cancel_response.status_code == 200
|
|
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
|
|
assert tenant_a_calendar.json()["events"] == []
|
|
|
|
tenant_b_calendar = tenant_b.get("/api/mobile-edge/calendar")
|
|
assert tenant_b_calendar.status_code == 200
|
|
assert tenant_b_calendar.json()["events"] == []
|
|
|
|
|
|
def test_inventory_property_routes_scope_by_tenant_id_instead_of_role() -> None:
|
|
pool = FakeSurfacePool()
|
|
tenant_a = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
|
|
)
|
|
tenant_b = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
|
|
)
|
|
|
|
create_response = tenant_a.post(
|
|
"/api/inventory/properties",
|
|
json={
|
|
"project_name": "Marina One",
|
|
"developer_name": "Desi Neuron Estates",
|
|
"location": {"city": "Dubai", "district": "Marina"},
|
|
"property_type": "apartment",
|
|
"price_bands": [{"label": "from", "amount": 2400000}],
|
|
"unit_mix": [{"type": "2BR", "count": 18}],
|
|
"amenities": ["pool", "gym"],
|
|
"status": "active",
|
|
"validation_state": {"validated": True},
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
|
|
tenant_a_properties = tenant_a.get("/api/inventory/properties", params={"limit": 20})
|
|
assert tenant_a_properties.status_code == 200
|
|
assert tenant_a_properties.json()["total"] == 1
|
|
|
|
tenant_b_properties = tenant_b.get("/api/inventory/properties", params={"limit": 20})
|
|
assert tenant_b_properties.status_code == 200
|
|
assert tenant_b_properties.json()["total"] == 0
|
|
assert tenant_b_properties.json()["properties"] == []
|
|
|
|
|
|
def test_inventory_import_batch_routes_scope_by_tenant_id_instead_of_role() -> None:
|
|
pool = FakeSurfacePool()
|
|
tenant_a = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
|
|
)
|
|
tenant_b = _build_shared_app(
|
|
pool,
|
|
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
|
|
)
|
|
|
|
create_response = tenant_a.post(
|
|
"/api/inventory/import-batches",
|
|
json={
|
|
"source_type": "csv",
|
|
"source_file_ref": "s3://velocity/imports/marina.csv",
|
|
"total_rows": 24,
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
|
|
tenant_a_batches = tenant_a.get("/api/inventory/import-batches", params={"limit": 20})
|
|
assert tenant_a_batches.status_code == 200
|
|
assert tenant_a_batches.json()["total"] == 1
|
|
|
|
tenant_b_batches = tenant_b.get("/api/inventory/import-batches", params={"limit": 20})
|
|
assert tenant_b_batches.status_code == 200
|
|
assert tenant_b_batches.json()["total"] == 0
|
|
assert tenant_b_batches.json()["batches"] == []
|