Files
Project_Velocity/backend/tests/test_colony_routes.py
sayan eeb684b46c feat: Ipad app production readiness, Colony orchestration, Social posting (#44)
#38 Ipad app production readiness, Colony orchestration, Social posting

Co-authored-by: Sayan Datta <sayan@Sayans-MacBook-Air.local>
Reviewed-on: sagnik/Project_Velocity#44
2026-05-03 18:30:38 +05:30

100 lines
3.3 KiB
Python

from __future__ import annotations
import os
from datetime import datetime, timezone
from typing import Any
os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret")
from fastapi import FastAPI
from fastapi.testclient import TestClient
from backend.api import routes_colony
from backend.auth.dependencies import UserPrincipal, get_current_user
def _mission_row(mission: dict[str, Any], *, status: str = "pending") -> dict[str, Any]:
now = datetime(2026, 5, 3, tzinfo=timezone.utc)
return {
**mission,
"status": status,
"review_status": None,
"created_at": now,
"updated_at": now,
"completed_at": None,
}
def _build_client() -> TestClient:
app = FastAPI()
app.state.db_pool = object()
app.dependency_overrides[get_current_user] = lambda: UserPrincipal(
user_id="sayan",
role="ADMIN",
tenant_id="tenant-root",
)
app.include_router(routes_colony.router, prefix="/api/colony")
return TestClient(app)
def test_colony_create_mission_persists_and_dispatches(monkeypatch) -> None:
stored: dict[str, Any] = {}
events: list[str] = []
class FakeRepo:
def __init__(self, _pool) -> None:
pass
async def create_mission(self, mission: dict[str, Any]) -> dict[str, Any]:
stored.update(mission)
return _mission_row(mission)
async def update_status(self, mission_id: str, tenant_id: str, status: str, **_kwargs) -> dict[str, Any]:
assert mission_id == stored["mission_id"]
assert tenant_id == "tenant-root"
return _mission_row(stored, status=status)
async def log_event(self, **kwargs) -> None:
events.append(kwargs["event_type"])
class FakeGateway:
async def dispatch_mission(self, mission: dict[str, Any]) -> dict[str, Any]:
assert mission["tenant_id"] == "tenant-root"
assert mission["actor_id"] == "sayan"
return {"accepted": True, "remote_mission_id": mission["mission_id"]}
monkeypatch.setattr(routes_colony, "ColonyRepository", FakeRepo)
monkeypatch.setattr(routes_colony, "ColonyGateway", FakeGateway)
response = _build_client().post(
"/api/colony/missions",
json={
"mission_type": "oracle_advisory",
"user_goal": "Compare Palm Jumeirah leads and recommend the next broker action.",
"context_refs": {"lead_id": "lead-1"},
"requested_outputs": ["summary", "writeback_proposals"],
},
)
assert response.status_code == 201
body = response.json()["data"]
assert body["tenant_id"] == "tenant-root"
assert body["actor_id"] == "sayan"
assert body["status"] == "queued"
assert body["dispatch"]["accepted"] is True
assert events == ["mission_created", "mission_dispatched"]
def test_colony_create_mission_requires_configured_orchestrator(monkeypatch) -> None:
monkeypatch.delenv("COLONY_SERVICE_URL", raising=False)
response = _build_client().post(
"/api/colony/missions",
json={
"mission_type": "catalyst_strategy_brief",
"user_goal": "Prepare a launch brief.",
},
)
assert response.status_code == 503
assert "COLONY_SERVICE_URL" in response.json()["detail"]