Files
Project_Velocity/backend/tests/test_surface_route_tenant_scoping.py

471 lines
18 KiB
Python

from __future__ import annotations
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any
from fastapi import FastAPI
from fastapi.testclient import TestClient
os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret")
from backend.api.routes_inventory import router as inventory_router
from backend.api.routes_mobile_edge import router as mobile_edge_router
from backend.auth.dependencies import UserPrincipal, get_current_user
def _now() -> datetime:
return datetime.now(timezone.utc)
class FakeSurfaceConn:
def __init__(self) -> None:
self.events: dict[str, dict[str, Any]] = {}
self.calendar_events: dict[str, dict[str, Any]] = {}
self.properties: dict[str, dict[str, Any]] = {}
self.import_batches: dict[str, dict[str, Any]] = {}
async def fetchrow(self, query: str, *args):
normalized = " ".join(query.strip().split())
if "INSERT INTO edge_communication_events" in normalized:
event_id = str(uuid.uuid4())
row = {
"event_id": event_id,
"tenant_id": args[0],
"lead_id": args[1],
"channel": args[2],
"direction": args[3],
"provider": args[4],
"capture_mode": args[5],
"consent_state": args[6],
"duration_seconds": args[7],
"summary": args[8],
"raw_reference": args[9],
"recording_ref": args[10],
"provider_metadata": args[11],
"timestamp": _now(),
"created_at": _now(),
}
self.events[event_id] = row
return {"event_id": event_id, "created_at": row["created_at"]}
if "INSERT INTO user_calendar_events" in normalized:
calendar_event_id = str(uuid.uuid4())
row = {
"calendar_event_id": calendar_event_id,
"tenant_id": args[0],
"owner_user_id": args[1],
"lead_id": args[2],
"source_event_id": args[3],
"title": args[4],
"description": args[5],
"start_at": args[6],
"end_at": args[7],
"all_day": args[8],
"status": args[9],
"reminder_minutes": args[10],
"created_by": args[11],
"location": args[12],
"metadata": args[13],
"created_at": _now(),
}
self.calendar_events[calendar_event_id] = row
return row
if "INSERT INTO inventory_import_batches" in normalized:
batch_id = str(uuid.uuid4())
row = {
"batch_id": batch_id,
"tenant_id": args[0],
"source_type": args[1],
"submitted_by": args[2],
"total_rows": args[3],
"source_file_ref": args[4],
"accepted_rows": 0,
"rejected_rows": 0,
"status": "pending",
"created_at": _now(),
"completed_at": None,
}
self.import_batches[batch_id] = row
return {
"batch_id": batch_id,
"status": row["status"],
"created_at": row["created_at"],
}
if "INSERT INTO inventory_properties" in normalized:
property_id = str(uuid.uuid4())
row = {
"property_id": property_id,
"tenant_id": args[0],
"batch_id": args[1],
"source_id": args[2],
"project_name": args[3],
"developer_name": args[4],
"location": args[5],
"property_type": args[6],
"price_bands": args[7],
"unit_mix": args[8],
"amenities": args[9],
"status": args[10],
"validation_state": args[11],
"ingested_at": None,
"created_at": _now(),
"updated_at": _now(),
}
self.properties[property_id] = row
return {"property_id": property_id, "created_at": row["created_at"]}
if normalized.startswith("SELECT * FROM inventory_import_batches WHERE batch_id=$1 AND tenant_id=$2"):
row = self.import_batches.get(args[0])
return row if row and row["tenant_id"] == args[1] else None
if normalized.startswith("SELECT * FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2"):
row = self.properties.get(args[0])
return row if row and row["tenant_id"] == args[1] else None
raise AssertionError(f"Unexpected fetchrow query: {query}")
async def fetch(self, query: str, *args):
normalized = " ".join(query.strip().split())
if "FROM edge_communication_events" in normalized:
tenant_id, lead_id, limit, offset = args
rows = [
{
"event_id": row["event_id"],
"lead_id": row["lead_id"],
"channel": row["channel"],
"direction": row["direction"],
"provider": row["provider"],
"capture_mode": row["capture_mode"],
"consent_state": row["consent_state"],
"timestamp": row["timestamp"].isoformat(),
"duration_seconds": row["duration_seconds"],
"summary": row["summary"],
"raw_reference": row["raw_reference"],
"recording_ref": row["recording_ref"],
"provider_metadata": row["provider_metadata"],
"created_at": row["created_at"].isoformat(),
}
for row in self.events.values()
if row["tenant_id"] == tenant_id and row["lead_id"] == lead_id
]
rows.sort(key=lambda item: item["timestamp"], reverse=True)
return rows[offset : offset + limit]
if "FROM user_calendar_events" in normalized:
tenant_id = args[0]
owner_user_id = args[1]
limit = args[-1]
rows = [
{
"calendar_event_id": row["calendar_event_id"],
"lead_id": row["lead_id"],
"title": row["title"],
"description": row["description"],
"start_at": row["start_at"],
"end_at": row["end_at"],
"all_day": row["all_day"],
"status": row["status"],
"reminder_minutes": row["reminder_minutes"],
"created_by": row["created_by"],
"location": row["location"],
"metadata": row["metadata"],
"created_at": row["created_at"].isoformat(),
}
for row in self.calendar_events.values()
if row["tenant_id"] == tenant_id and row["owner_user_id"] == owner_user_id
and row["status"] != "cancelled"
]
rows.sort(key=lambda item: item["start_at"])
return rows[:limit]
if "FROM inventory_import_batches" in normalized:
tenant_id, limit, offset = args
rows = [
{
"batch_id": row["batch_id"],
"source_type": row["source_type"],
"submitted_by": row["submitted_by"],
"status": row["status"],
"total_rows": row["total_rows"],
"accepted_rows": row["accepted_rows"],
"rejected_rows": row["rejected_rows"],
"created_at": row["created_at"].isoformat(),
"completed_at": row["completed_at"],
}
for row in self.import_batches.values()
if row["tenant_id"] == tenant_id
]
rows.sort(key=lambda item: item["created_at"], reverse=True)
return rows[offset : offset + limit]
if "FROM inventory_properties" in normalized:
params = list(args)
tenant_id = params[0]
limit = params[-2]
offset = params[-1]
status_filter = None
property_type = None
if len(params) == 4:
status_filter = params[1]
if len(params) == 5:
status_filter = params[1]
property_type = params[2]
rows = [
{
"property_id": row["property_id"],
"project_name": row["project_name"],
"developer_name": row["developer_name"],
"property_type": row["property_type"],
"location": row["location"],
"price_bands": row["price_bands"],
"unit_mix": row["unit_mix"],
"status": row["status"],
"ingested_at": row["ingested_at"],
"created_at": row["created_at"].isoformat(),
}
for row in self.properties.values()
if row["tenant_id"] == tenant_id
and (status_filter is None or row["status"] == status_filter)
and (property_type is None or row["property_type"] == property_type)
]
rows.sort(key=lambda item: item["created_at"], reverse=True)
return rows[offset : offset + limit]
raise AssertionError(f"Unexpected fetch query: {query}")
async def fetchval(self, query: str, *args):
normalized = " ".join(query.strip().split())
if normalized.startswith("SELECT COUNT(*) FROM edge_communication_events WHERE tenant_id = $1 AND lead_id = $2"):
tenant_id, lead_id = args
return sum(
1
for row in self.events.values()
if row["tenant_id"] == tenant_id and row["lead_id"] == lead_id
)
if normalized.startswith("SELECT COUNT(*) FROM inventory_import_batches WHERE tenant_id=$1"):
tenant_id = args[0]
return sum(1 for row in self.import_batches.values() if row["tenant_id"] == tenant_id)
if normalized.startswith("SELECT COUNT(*) FROM inventory_properties WHERE tenant_id = $1"):
tenant_id = args[0]
return sum(1 for row in self.properties.values() if row["tenant_id"] == tenant_id)
raise AssertionError(f"Unexpected fetchval query: {query}")
async def execute(self, query: str, *args):
normalized = " ".join(query.strip().split())
if "UPDATE user_calendar_events SET status='cancelled'" in normalized:
tenant_id, owner_user_id, calendar_event_id = args
row = self.calendar_events.get(calendar_event_id)
if not row or row["tenant_id"] != tenant_id or row["owner_user_id"] != owner_user_id:
return "UPDATE 0"
row["status"] = "cancelled"
return "UPDATE 1"
if normalized.startswith("UPDATE user_calendar_events SET"):
tenant_id = args[-3]
owner_user_id = args[-2]
calendar_event_id = args[-1]
row = self.calendar_events.get(calendar_event_id)
if not row or row["tenant_id"] != tenant_id or row["owner_user_id"] != owner_user_id:
return "UPDATE 0"
assignments = normalized.split(" SET ", 1)[1].split(" WHERE ", 1)[0].split(", ")
for assignment, value in zip(assignments, args):
column = assignment.split(" = ", 1)[0]
if column not in {"tenant_id", "owner_user_id"}:
row[column] = value
return "UPDATE 1"
raise AssertionError(f"Unexpected execute query: {query}")
class FakeSurfacePool:
def __init__(self) -> None:
self.conn = FakeSurfaceConn()
@asynccontextmanager
async def acquire(self):
yield self.conn
def _build_shared_app(pool: FakeSurfacePool, current_user: dict[str, str]) -> TestClient:
app = FastAPI()
app.state.db_pool = pool
app.include_router(mobile_edge_router, prefix="/api/mobile-edge")
app.include_router(inventory_router, prefix="/api/inventory")
app.dependency_overrides[get_current_user] = lambda: UserPrincipal(
current_user["user_id"],
current_user["role"],
current_user["tenant_id"],
)
return TestClient(app)
def test_mobile_edge_event_routes_scope_by_tenant_id_instead_of_role() -> None:
pool = FakeSurfacePool()
tenant_a = _build_shared_app(
pool,
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
)
tenant_b = _build_shared_app(
pool,
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
)
create_response = tenant_a.post(
"/api/mobile-edge/events",
json={
"lead_id": "lead-123",
"channel": "whatsapp_message",
"direction": "inbound",
"capture_mode": "operator_note",
"consent_state": "granted",
"summary": "Client asked for a marina brochure.",
},
)
assert create_response.status_code == 201
tenant_a_events = tenant_a.get("/api/mobile-edge/events", params={"lead_id": "lead-123", "limit": 10})
assert tenant_a_events.status_code == 200
assert tenant_a_events.json()["total"] == 1
tenant_b_events = tenant_b.get("/api/mobile-edge/events", params={"lead_id": "lead-123", "limit": 10})
assert tenant_b_events.status_code == 200
assert tenant_b_events.json()["total"] == 0
assert tenant_b_events.json()["events"] == []
def test_mobile_edge_calendar_routes_scope_by_tenant_id_instead_of_role() -> None:
pool = FakeSurfacePool()
tenant_a = _build_shared_app(
pool,
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
)
tenant_b = _build_shared_app(
pool,
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
)
create_response = tenant_a.post(
"/api/mobile-edge/calendar",
json={
"lead_id": "lead-123",
"title": "Private site visit",
"description": "Walkthrough with the lead",
"start_at": "2026-04-23T10:00:00Z",
"end_at": "2026-04-23T11:00:00Z",
"all_day": False,
"status": "tentative",
"reminder_minutes": [15],
"location": "Dubai Marina",
"metadata": {"source": "ipad"},
},
)
assert create_response.status_code == 201
created_payload = create_response.json()
assert created_payload["status"] == "ok"
assert created_payload["event"]["title"] == "Private site visit"
assert created_payload["event"]["location"] == "Dubai Marina"
assert created_payload["event"]["status"] == "tentative"
assert created_payload["event"]["reminder_minutes"] == [15]
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
assert tenant_a_calendar.status_code == 200
assert len(tenant_a_calendar.json()["events"]) == 1
event_id = created_payload["event"]["calendar_event_id"]
update_response = tenant_a.patch(
f"/api/mobile-edge/calendar/{event_id}",
json={"status": "confirmed"},
)
assert update_response.status_code == 200
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
assert tenant_a_calendar.json()["events"][0]["status"] == "confirmed"
cancel_response = tenant_a.delete(f"/api/mobile-edge/calendar/{event_id}")
assert cancel_response.status_code == 200
tenant_a_calendar = tenant_a.get("/api/mobile-edge/calendar")
assert tenant_a_calendar.json()["events"] == []
tenant_b_calendar = tenant_b.get("/api/mobile-edge/calendar")
assert tenant_b_calendar.status_code == 200
assert tenant_b_calendar.json()["events"] == []
def test_inventory_property_routes_scope_by_tenant_id_instead_of_role() -> None:
pool = FakeSurfacePool()
tenant_a = _build_shared_app(
pool,
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
)
tenant_b = _build_shared_app(
pool,
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
)
create_response = tenant_a.post(
"/api/inventory/properties",
json={
"project_name": "Marina One",
"developer_name": "Desi Neuron Estates",
"location": {"city": "Dubai", "district": "Marina"},
"property_type": "apartment",
"price_bands": [{"label": "from", "amount": 2400000}],
"unit_mix": [{"type": "2BR", "count": 18}],
"amenities": ["pool", "gym"],
"status": "active",
"validation_state": {"validated": True},
},
)
assert create_response.status_code == 201
tenant_a_properties = tenant_a.get("/api/inventory/properties", params={"limit": 20})
assert tenant_a_properties.status_code == 200
assert tenant_a_properties.json()["total"] == 1
tenant_b_properties = tenant_b.get("/api/inventory/properties", params={"limit": 20})
assert tenant_b_properties.status_code == 200
assert tenant_b_properties.json()["total"] == 0
assert tenant_b_properties.json()["properties"] == []
def test_inventory_import_batch_routes_scope_by_tenant_id_instead_of_role() -> None:
pool = FakeSurfacePool()
tenant_a = _build_shared_app(
pool,
{"user_id": "user-a", "role": "ADMIN", "tenant_id": "tenant_alpha"},
)
tenant_b = _build_shared_app(
pool,
{"user_id": "user-b", "role": "ADMIN", "tenant_id": "tenant_beta"},
)
create_response = tenant_a.post(
"/api/inventory/import-batches",
json={
"source_type": "csv",
"source_file_ref": "s3://velocity/imports/marina.csv",
"total_rows": 24,
},
)
assert create_response.status_code == 201
tenant_a_batches = tenant_a.get("/api/inventory/import-batches", params={"limit": 20})
assert tenant_a_batches.status_code == 200
assert tenant_a_batches.json()["total"] == 1
tenant_b_batches = tenant_b.get("/api/inventory/import-batches", params={"limit": 20})
assert tenant_b_batches.status_code == 200
assert tenant_b_batches.json()["total"] == 0
assert tenant_b_batches.json()["batches"] == []