89 lines
3.4 KiB
Python
89 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from backend.auth.dependencies import default_tenant_id
|
|
|
|
_CANONICAL_CRM_SCHEMA_CACHE_KEY = "_canonical_crm_schema_ready"
|
|
|
|
|
|
def _sql_text_literal(value: str) -> str:
|
|
return "'" + value.replace("'", "''") + "'"
|
|
|
|
|
|
async def ensure_canonical_crm_schema(app: Any) -> None:
|
|
if getattr(app.state, _CANONICAL_CRM_SCHEMA_CACHE_KEY, False):
|
|
return
|
|
|
|
pool = getattr(app.state, "db_pool", None)
|
|
if pool is None:
|
|
raise HTTPException(status_code=503, detail="Database unavailable.")
|
|
|
|
tenant_fallback = default_tenant_id()
|
|
tenant_default_literal = _sql_text_literal(tenant_fallback)
|
|
tenant_tables = (
|
|
"crm_people",
|
|
"crm_accounts",
|
|
"crm_leads",
|
|
"crm_opportunities",
|
|
"crm_property_interests",
|
|
"intel_interactions",
|
|
"intel_reminders",
|
|
"intel_qd_scores",
|
|
"intel_qd_timeseries",
|
|
"workflow_actions",
|
|
"workflow_approvals",
|
|
"workflow_import_batches",
|
|
)
|
|
|
|
async with pool.acquire() as conn:
|
|
for table_name in tenant_tables:
|
|
await conn.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS tenant_id TEXT")
|
|
await conn.execute(
|
|
f"""
|
|
UPDATE {table_name}
|
|
SET tenant_id = $1
|
|
WHERE tenant_id IS NULL OR tenant_id = ''
|
|
""",
|
|
tenant_fallback,
|
|
)
|
|
await conn.execute(
|
|
f"ALTER TABLE {table_name} ALTER COLUMN tenant_id SET DEFAULT {tenant_default_literal}"
|
|
)
|
|
await conn.execute(f"ALTER TABLE {table_name} ALTER COLUMN tenant_id SET NOT NULL")
|
|
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_people_tenant_created ON crm_people (tenant_id, created_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_leads_tenant_status ON crm_leads (tenant_id, status, updated_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_opportunities_tenant_stage ON crm_opportunities (tenant_id, stage, updated_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_property_interests_tenant_person ON crm_property_interests (tenant_id, person_id, created_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_intel_interactions_tenant_person ON intel_interactions (tenant_id, person_id, happened_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_intel_reminders_tenant_status ON intel_reminders (tenant_id, status, due_at)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_intel_qd_scores_tenant_person ON intel_qd_scores (tenant_id, person_id, score_type)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_intel_qd_timeseries_tenant_person ON intel_qd_timeseries (tenant_id, person_id, timestamp DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_workflow_actions_tenant_status ON workflow_actions (tenant_id, status, created_at DESC)"
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_workflow_import_batches_tenant_lifecycle ON workflow_import_batches (tenant_id, lifecycle, created_at DESC)"
|
|
)
|
|
|
|
setattr(app.state, _CANONICAL_CRM_SCHEMA_CACHE_KEY, True)
|