import os from contextlib import asynccontextmanager os.environ.setdefault('VELOCITY_JWT_SECRET', 'test-secret') from fastapi import FastAPI from fastapi.testclient import TestClient from backend.auth.dependencies import UserPrincipal, get_current_user from backend.db.pool import get_pool from backend.routers.vault import router as vault_router class FakeConn: def __init__(self) -> None: self.assets: dict[str, dict] = {} self.logs: list[dict] = [] async def fetchrow(self, query: str, *args): if 'INSERT INTO velocity_vault_assets' in query: tracking_hash = args[3] self.assets[tracking_hash] = { 'id': 'asset-1', 'lead_id': args[4], 'asset_name': args[0], 'storage_path': args[2], } return {'id': 'asset-1'} if 'UPDATE velocity_vault_assets' in query: asset = self.assets.get(args[0]) if not asset: return None return { 'id': asset['id'], 'lead_id': asset['lead_id'], 'asset_name': asset['asset_name'], 'storage_path': asset['storage_path'], } if 'SELECT name FROM leads_intelligence' in query: return {'name': 'Mohammed Al-Rashid'} return None async def execute(self, query: str, *args): if 'INSERT INTO omnichannel_logs' in query: self.logs.append({'lead_id': args[0], 'payload': args[1]}) return 'OK' class FakePool: def __init__(self) -> None: self.conn = FakeConn() @asynccontextmanager async def acquire(self): yield self.conn def test_vault_open_broadcasts_notification() -> None: app = FastAPI() pool = FakePool() sent_events: list[dict] = [] async def capture_event(event: dict) -> None: sent_events.append(event) app.include_router(vault_router, prefix='/api/vault') app.include_router(vault_router, prefix='/vault') app.state.broadcast_sentinel_event = capture_event app.dependency_overrides[get_pool] = lambda: pool app.dependency_overrides[get_current_user] = lambda: UserPrincipal('user-1', 'ADMIN') client = TestClient(app) response = client.post( '/api/vault/generate-link', json={ 'lead_id': '00000000-0000-4000-8000-000000000001', 'asset_name': 'PH-01 Deck', 'asset_type': 'pdf', 'storage_path': 'brochures/ph-01.pdf', }, ) assert response.status_code == 201 tracking_hash = response.json()['tracking_hash'] open_response = client.get(f'/vault/{tracking_hash}', follow_redirects=False) assert open_response.status_code == 302 assert open_response.headers['location'] == '/assets/brochures/ph-01.pdf' assert len(sent_events) == 1 assert sent_events[0]['type'] == 'WS_ASSET_OPENED' assert sent_events[0]['data']['asset_name'] == 'PH-01 Deck'