Files
Project_Velocity/backend/tests/test_vault_notification_flow.py
2026-04-12 02:02:58 +05:30

91 lines
2.9 KiB
Python

import os
from contextlib import asynccontextmanager
os.environ.setdefault('VELOCITY_JWT_SECRET', 'test-secret')
from fastapi import FastAPI
from fastapi.testclient import TestClient
from backend.auth.dependencies import UserPrincipal, get_current_user
from backend.db.pool import get_pool
from backend.routers.vault import router as vault_router
class FakeConn:
def __init__(self) -> None:
self.assets: dict[str, dict] = {}
self.logs: list[dict] = []
async def fetchrow(self, query: str, *args):
if 'INSERT INTO velocity_vault_assets' in query:
tracking_hash = args[3]
self.assets[tracking_hash] = {
'id': 'asset-1',
'lead_id': args[4],
'asset_name': args[0],
'storage_path': args[2],
}
return {'id': 'asset-1'}
if 'UPDATE velocity_vault_assets' in query:
asset = self.assets.get(args[0])
if not asset:
return None
return {
'id': asset['id'],
'lead_id': asset['lead_id'],
'asset_name': asset['asset_name'],
'storage_path': asset['storage_path'],
}
if 'SELECT name FROM leads_intelligence' in query:
return {'name': 'Mohammed Al-Rashid'}
return None
async def execute(self, query: str, *args):
if 'INSERT INTO omnichannel_logs' in query:
self.logs.append({'lead_id': args[0], 'payload': args[1]})
return 'OK'
class FakePool:
def __init__(self) -> None:
self.conn = FakeConn()
@asynccontextmanager
async def acquire(self):
yield self.conn
def test_vault_open_broadcasts_notification() -> None:
app = FastAPI()
pool = FakePool()
sent_events: list[dict] = []
async def capture_event(event: dict) -> None:
sent_events.append(event)
app.include_router(vault_router, prefix='/api/vault')
app.include_router(vault_router, prefix='/vault')
app.state.broadcast_sentinel_event = capture_event
app.dependency_overrides[get_pool] = lambda: pool
app.dependency_overrides[get_current_user] = lambda: UserPrincipal('user-1', 'ADMIN')
client = TestClient(app)
response = client.post(
'/api/vault/generate-link',
json={
'lead_id': '00000000-0000-4000-8000-000000000001',
'asset_name': 'PH-01 Deck',
'asset_type': 'pdf',
'storage_path': 'brochures/ph-01.pdf',
},
)
assert response.status_code == 201
tracking_hash = response.json()['tracking_hash']
open_response = client.get(f'/vault/{tracking_hash}', follow_redirects=False)
assert open_response.status_code == 302
assert open_response.headers['location'] == '/assets/brochures/ph-01.pdf'
assert len(sent_events) == 1
assert sent_events[0]['type'] == 'WS_ASSET_OPENED'
assert sent_events[0]['data']['asset_name'] == 'PH-01 Deck'