feat(crm): canonical crm and imported routes implementation

This commit is contained in:
Sagnik
2026-04-18 21:32:54 +05:30
parent 37c06de749
commit 954618c3ef
52 changed files with 80656 additions and 1 deletions

View File

@@ -0,0 +1,798 @@
"""
backend/api/routes_crm_imports.py
CRM Import Route Family + Client 360 Routes
Implements the canonical CRM API surface as specified in Doc 09 (Root API Spec):
POST /api/crm/imports — upload CSV batch
GET /api/crm/imports — list import batches
GET /api/crm/imports/{id} — get batch detail + proposals
PUT /api/crm/imports/{id}/approve-proposal — approve a single proposal
POST /api/crm/imports/{id}/commit — commit approved proposals to canonical
GET /api/crm/contacts — canonical contact list (with QD summary)
GET /api/crm/contacts/{id} — canonical contact detail
GET /api/crm/client-360/{id} — Client 360 aggregated snapshot
GET /api/crm/opportunities — opportunity pipeline list
GET /api/crm/tasks — reminder/task list
GET /api/crm/kanban — kanban board (canonical leads)
Uses canonical crm_*, intel_*, workflow_* tables.
"""
from __future__ import annotations
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, HTTPException, Query, Request, UploadFile, File, status
from pydantic import BaseModel, Field
from backend.services.imports.ingest_service import (
parse_csv_content,
infer_column_mapping,
build_normalized_proposals,
create_import_batch_record,
persist_import_batch,
persist_proposals_as_workflow_actions,
)
from backend.services.client_graph.aggregation_service import (
get_client_360,
get_contact_list,
)
logger = logging.getLogger("velocity.api.crm_imports")
router = APIRouter()
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
async def _get_pool(request: Request):
pool = getattr(request.app.state, "db_pool", None)
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
# ── Models ────────────────────────────────────────────────────────────────────
class ProposalApprovalRequest(BaseModel):
proposal_id: str
decision: str = Field(..., pattern="^(approved|rejected|needs_more_info)$")
notes: str = Field(default="", max_length=2000)
class CreatePersonRequest(BaseModel):
full_name: str = Field(..., min_length=1, max_length=200)
primary_email: str | None = None
primary_phone: str | None = None
buyer_type: str | None = None
budget_band: str | None = None
project_name: str | None = None
source_system: str = "manual"
notes: str | None = None
metadata_json: dict[str, Any] = Field(default_factory=dict)
class CreateLeadRequest(BaseModel):
person_id: str
status: str = "new"
budget_band: str | None = None
urgency: str | None = None
financing_posture: str | None = None
assigned_user_id: str | None = None
metadata_json: dict[str, Any] = Field(default_factory=dict)
class CreateReminderRequest(BaseModel):
person_id: str
lead_id: str | None = None
reminder_type: str = "follow_up"
title: str = Field(..., min_length=1, max_length=500)
notes: str | None = None
due_at: str | None = None
priority: str = "normal"
# ── Import Endpoints ──────────────────────────────────────────────────────────
@router.post("/crm/imports", status_code=201, tags=["CRM Imports"])
async def upload_crm_import(
request: Request,
file: UploadFile = File(...),
source_system: str = Query(default="csv_upload"),
) -> dict[str, Any]:
"""
Upload a CSV file to start a CRM import batch.
Parses headers, infers column mapping, and creates workflow_actions proposals.
"""
pool = await _get_pool(request)
content_bytes = await file.read()
try:
content = content_bytes.decode("utf-8")
except UnicodeDecodeError:
content = content_bytes.decode("latin-1")
parsed = parse_csv_content(content)
mapping = infer_column_mapping(parsed["headers"])
batch = create_import_batch_record(
filename=file.filename or "upload.csv",
row_count=parsed["row_count"],
mapping_manifest=mapping,
source_system=source_system,
)
proposals = build_normalized_proposals(
rows=parsed["rows"],
mapping=mapping["mapped"],
batch_id=batch["batch_id"],
source_system=source_system,
)
async with pool.acquire() as conn:
await persist_import_batch(conn, batch)
inserted = await persist_proposals_as_workflow_actions(conn, proposals)
logger.info("Import batch %s: %d rows, %d proposals", batch["batch_id"], parsed["row_count"], inserted)
return {
"status": "ok",
"data": {
"batch_id": batch["batch_id"],
"row_count": parsed["row_count"],
"mapped_columns": mapping["mapped_count"],
"unmapped_columns": mapping["unmapped_count"],
"mapping_confidence": mapping["confidence"],
"proposals_created": inserted,
"parse_errors": parsed["parse_errors"],
"lifecycle": "parsed",
"message": f"Import batch created. {inserted} proposals queued for review.",
},
}
@router.get("/crm/imports", tags=["CRM Imports"])
async def list_import_batches(
request: Request,
lifecycle: str | None = None,
limit: int = Query(default=20, ge=1, le=100),
offset: int = Query(default=0, ge=0),
) -> dict[str, Any]:
"""List all CRM import batches with lifecycle status."""
pool = await _get_pool(request)
clauses = ["1=1"]
params: list[Any] = []
if lifecycle:
params.append(lifecycle)
clauses.append(f"lifecycle = ${len(params)}::import_lifecycle")
params.extend([limit, offset])
where = " AND ".join(clauses)
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT batch_id, source_system, uploaded_filename, row_count,
mapped_count, unresolved_count, lifecycle, created_at, updated_at
FROM workflow_import_batches
WHERE {where}
ORDER BY created_at DESC
LIMIT ${len(params) - 1} OFFSET ${len(params)}
""",
*params,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM workflow_import_batches WHERE {where}",
*params[:-2],
)
batches = [
{
"batch_id": str(r["batch_id"]),
"source_system": r["source_system"],
"filename": r["uploaded_filename"],
"row_count": r["row_count"],
"mapped_count": r["mapped_count"],
"unresolved_count": r["unresolved_count"],
"lifecycle": r["lifecycle"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
for r in rows
]
return {"status": "ok", "data": batches, "meta": {"total": total, "limit": limit, "offset": offset}}
@router.get("/crm/imports/{batch_id}", tags=["CRM Imports"])
async def get_import_batch(request: Request, batch_id: str) -> dict[str, Any]:
"""Get import batch detail including pending proposals."""
pool = await _get_pool(request)
async with pool.acquire() as conn:
batch_row = await conn.fetchrow(
"SELECT * FROM workflow_import_batches WHERE batch_id = $1::uuid",
batch_id,
)
if not batch_row:
raise HTTPException(status_code=404, detail=f"Import batch '{batch_id}' not found.")
proposal_rows = await conn.fetch(
"""
SELECT action_id, proposal_payload, confidence, status, approval_required, created_at
FROM workflow_actions
WHERE action_type = 'import_proposal'
AND proposal_payload->>'batch_id' = $1
ORDER BY (proposal_payload->>'row_number')::int ASC
LIMIT 200
""",
batch_id,
)
proposals = [
{
"proposal_id": str(r["action_id"]),
"payload": r["proposal_payload"],
"confidence": float(r["confidence"]) if r["confidence"] else 0.0,
"status": r["status"],
"review_required": r["approval_required"],
}
for r in proposal_rows
]
return {
"status": "ok",
"data": {
"batch_id": str(batch_row["batch_id"]),
"source_system": batch_row["source_system"],
"filename": batch_row["uploaded_filename"],
"row_count": batch_row["row_count"],
"mapping_manifest": batch_row["mapping_manifest"],
"lifecycle": batch_row["lifecycle"],
"proposals": proposals,
"proposal_count": len(proposals),
},
}
@router.put("/crm/imports/{batch_id}/review-proposal", tags=["CRM Imports"])
async def review_proposal(
request: Request, batch_id: str, body: ProposalApprovalRequest
) -> dict[str, Any]:
"""
Human review of a single import proposal.
Creates a workflow_approvals record and updates the action status.
Approved actions with high confidence may be auto-staged for commit.
"""
pool = await _get_pool(request)
async with pool.acquire() as conn:
action = await conn.fetchrow(
"SELECT action_id, confidence, approval_required FROM workflow_actions WHERE action_id = $1::uuid",
body.proposal_id,
)
if not action:
raise HTTPException(status_code=404, detail="Proposal not found.")
decision_id = str(uuid.uuid4())
new_status = "approved" if body.decision == "approved" else "rejected"
await conn.execute(
"""
INSERT INTO workflow_approvals (decision_id, action_id, decision, decision_notes, decided_at)
VALUES ($1::uuid, $2::uuid, $3, $4, NOW())
""",
decision_id,
body.proposal_id,
body.decision,
body.notes,
)
await conn.execute(
"UPDATE workflow_actions SET status = $1::wf_status, updated_at = NOW() WHERE action_id = $2::uuid",
new_status,
body.proposal_id,
)
return {
"status": "ok",
"data": {
"decision_id": decision_id,
"proposal_id": body.proposal_id,
"decision": body.decision,
"message": f"Proposal {body.decision}.",
},
}
@router.post("/crm/imports/{batch_id}/commit", tags=["CRM Imports"])
async def commit_approved_proposals(request: Request, batch_id: str) -> dict[str, Any]:
"""
Commit all approved proposals for a batch into canonical crm_people + crm_leads tables.
Only approved proposals are committed. Rejected/pending are skipped.
This implements the writeback flow from Doc 07 and Doc 09.
"""
pool = await _get_pool(request)
committed = 0
skipped = 0
errors: list[str] = []
async with pool.acquire() as conn:
approved_rows = await conn.fetch(
"""
SELECT action_id, proposal_payload
FROM workflow_actions
WHERE action_type = 'import_proposal'
AND proposal_payload->>'batch_id' = $1
AND status = 'approved'
""",
batch_id,
)
for row in approved_rows:
try:
payload = row["proposal_payload"]
canonical = payload.get("canonical_payload", {})
if not canonical.get("full_name"):
skipped += 1
continue
person_id = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO crm_people (
person_id, full_name, primary_email, primary_phone,
buyer_type, source_confidence, metadata_json, created_at, updated_at
) VALUES (
$1::uuid, $2, $3, $4, $5, $6, $7::jsonb, NOW(), NOW()
)
ON CONFLICT DO NOTHING
""",
person_id,
canonical.get("full_name"),
canonical.get("primary_email"),
canonical.get("primary_phone"),
canonical.get("buyer_type"),
payload.get("confidence", 0.5),
json.dumps({"source_batch": batch_id, "import_row": payload.get("row_number")}),
)
if canonical.get("status") or canonical.get("budget_band"):
lead_id = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO crm_leads (
lead_id, person_id, source_system, status, budget_band,
metadata_json, created_at, updated_at
) VALUES (
$1::uuid, $2::uuid, $3, 'new'::crm_lead_status, $4, $5::jsonb, NOW(), NOW()
)
ON CONFLICT DO NOTHING
""",
lead_id,
person_id,
payload.get("source_system", "csv_upload"),
canonical.get("budget_band"),
json.dumps({"import_batch": batch_id}),
)
# Stage property interest if project_name present
if canonical.get("project_name"):
await conn.execute(
"""
INSERT INTO crm_property_interests (
interest_id, person_id, lead_id, project_name, created_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4, NOW()
)
ON CONFLICT DO NOTHING
""",
str(uuid.uuid4()),
person_id,
lead_id,
canonical.get("project_name"),
)
# Mark action as executed
await conn.execute(
"UPDATE workflow_actions SET status = 'executed'::wf_status, updated_at = NOW() WHERE action_id = $1::uuid",
row["action_id"],
)
committed += 1
except Exception as e:
errors.append(f"Proposal {row['action_id']}: {str(e)}")
skipped += 1
# Update batch lifecycle
await conn.execute(
"UPDATE workflow_import_batches SET lifecycle = 'committed'::import_lifecycle, canonical_count = $1, updated_at = NOW() WHERE batch_id = $2",
committed,
batch_id,
)
return {
"status": "ok",
"data": {
"batch_id": batch_id,
"committed": committed,
"skipped": skipped,
"errors": errors,
"lifecycle": "committed",
},
}
# ── Contact / Person Endpoints ──────────────────────────────────────────────
@router.get("/crm/contacts", tags=["CRM Contacts"])
async def list_contacts(
request: Request,
search: str | None = Query(default=None),
buyer_type: str | None = Query(default=None),
status: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> dict[str, Any]:
"""Canonical contact list with QD summary, interaction count, and lead status."""
pool = await _get_pool(request)
async with pool.acquire() as conn:
result = await get_contact_list(conn, search, buyer_type, status, limit, offset)
return {"status": "ok", "data": result}
@router.post("/crm/contacts", status_code=201, tags=["CRM Contacts"])
async def create_contact(request: Request, body: CreatePersonRequest) -> dict[str, Any]:
"""Create a new canonical person record manually."""
pool = await _get_pool(request)
person_id = str(uuid.uuid4())
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO crm_people (
person_id, full_name, primary_email, primary_phone,
buyer_type, source_confidence, metadata_json, created_at, updated_at
) VALUES ($1::uuid, $2, $3, $4, $5, 1.0, $6::jsonb, NOW(), NOW())
""",
person_id,
body.full_name,
body.primary_email,
body.primary_phone,
body.buyer_type,
json.dumps({**body.metadata_json, "manual_entry": True}),
)
if body.project_name or body.budget_band:
lead_id = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO crm_leads (
lead_id, person_id, source_system, status, budget_band,
metadata_json, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3, 'new'::crm_lead_status, $4, '{}'::jsonb, NOW(), NOW())
""",
lead_id,
person_id,
body.source_system,
body.budget_band,
)
if body.project_name:
await conn.execute(
"""
INSERT INTO crm_property_interests (interest_id, person_id, lead_id, project_name, created_at)
VALUES ($1::uuid, $2::uuid, $3::uuid, $4, NOW())
""",
str(uuid.uuid4()),
person_id,
lead_id,
body.project_name,
)
return {"status": "ok", "data": {"person_id": person_id, "full_name": body.full_name}}
@router.get("/crm/contacts/{person_id}", tags=["CRM Contacts"])
async def get_contact(request: Request, person_id: str) -> dict[str, Any]:
"""Get a single canonical contact record."""
pool = await _get_pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT person_id, full_name, primary_email, primary_phone, secondary_phone,
buyer_type, persona_labels, source_confidence, created_at, updated_at
FROM crm_people WHERE person_id = $1::uuid
""",
person_id,
)
if not row:
raise HTTPException(status_code=404, detail=f"Contact '{person_id}' not found.")
return {
"status": "ok",
"data": {
"person_id": str(row["person_id"]),
"full_name": row["full_name"],
"primary_email": row["primary_email"],
"primary_phone": row["primary_phone"],
"buyer_type": row["buyer_type"],
"persona_labels": row["persona_labels"] or [],
"source_confidence": float(row["source_confidence"] or 0.0),
},
}
# ── Client 360 Endpoint ────────────────────────────────────────────────────
@router.get("/crm/client-360/{person_id}", tags=["CRM Client 360"])
async def client_360(request: Request, person_id: str) -> dict[str, Any]:
"""
Aggregated Client360 dossier — identity, opportunities, interactions,
property interests, tasks, QD overview, risk flags, and next actions.
Derived read model — not primary truth.
"""
pool = await _get_pool(request)
async with pool.acquire() as conn:
snapshot = await get_client_360(conn, person_id)
if not snapshot:
raise HTTPException(status_code=404, detail=f"Client '{person_id}' not found.")
return {"status": "ok", "data": snapshot}
# ── Opportunities Endpoint ─────────────────────────────────────────────────
@router.get("/crm/opportunities", tags=["CRM Opportunities"])
async def list_opportunities(
request: Request,
stage: str | None = None,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> dict[str, Any]:
"""Canonical opportunity pipeline list."""
pool = await _get_pool(request)
clauses = ["1=1"]
params: list[Any] = []
if stage:
params.append(stage)
clauses.append(f"co.stage = ${len(params)}::crm_opportunity_stage")
params.extend([limit, offset])
where = " AND ".join(clauses)
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT co.opportunity_id, co.stage, co.value, co.probability,
co.expected_close_date, co.next_action,
p.person_id, p.full_name, p.primary_phone,
ip.project_name,
co.created_at, co.updated_at
FROM crm_opportunities co
INNER JOIN crm_leads cl ON cl.lead_id = co.lead_id
INNER JOIN crm_people p ON p.person_id = cl.person_id
LEFT JOIN inventory_projects ip ON ip.project_id = co.project_id
WHERE {where}
ORDER BY co.updated_at DESC
LIMIT ${len(params) - 1} OFFSET ${len(params)}
""",
*params,
)
opportunities = [
{
"opportunity_id": str(r["opportunity_id"]),
"stage": r["stage"],
"value": float(r["value"]) if r["value"] else None,
"probability": r["probability"],
"expected_close_date": r["expected_close_date"].isoformat() if r["expected_close_date"] else None,
"next_action": r["next_action"],
"person_id": str(r["person_id"]),
"client_name": r["full_name"],
"client_phone": r["primary_phone"],
"project_name": r["project_name"],
}
for r in rows
]
return {"status": "ok", "data": opportunities, "meta": {"count": len(opportunities)}}
# ── Tasks / Reminders Endpoint ─────────────────────────────────────────────
@router.get("/crm/tasks", tags=["CRM Tasks"])
async def list_tasks(
request: Request,
status_filter: str | None = Query(default="pending", alias="status"),
assigned_to: str | None = None,
limit: int = Query(default=50, ge=1, le=200),
) -> dict[str, Any]:
"""Reminder / task inbox for the CRM operator."""
pool = await _get_pool(request)
clauses = ["1=1"]
params: list[Any] = []
if status_filter:
params.append(status_filter)
clauses.append(f"ir.status = ${len(params)}")
if assigned_to:
params.append(assigned_to)
clauses.append(f"ir.assigned_to = ${len(params)}::uuid")
params.append(limit)
where = " AND ".join(clauses)
async with pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT ir.reminder_id, ir.reminder_type, ir.title, ir.notes,
ir.due_at, ir.status, ir.priority,
p.person_id, p.full_name, p.primary_phone
FROM intel_reminders ir
INNER JOIN crm_people p ON p.person_id = ir.person_id
WHERE {where}
ORDER BY ir.due_at ASC NULLS LAST, ir.created_at DESC
LIMIT ${len(params)}
""",
*params,
)
tasks = [
{
"reminder_id": str(r["reminder_id"]),
"reminder_type": r["reminder_type"],
"title": r["title"],
"notes": r["notes"],
"due_at": r["due_at"].isoformat() if r["due_at"] else None,
"status": r["status"],
"priority": r["priority"],
"person_id": str(r["person_id"]),
"client_name": r["full_name"],
"client_phone": r["primary_phone"],
}
for r in rows
]
return {"status": "ok", "data": tasks, "meta": {"count": len(tasks)}}
@router.post("/crm/tasks", status_code=201, tags=["CRM Tasks"])
async def create_task(request: Request, body: CreateReminderRequest) -> dict[str, Any]:
"""Create a reminder / follow-up task."""
pool = await _get_pool(request)
reminder_id = str(uuid.uuid4())
async with pool.acquire() as conn:
due_dt = None
if body.due_at:
try:
due_dt = datetime.fromisoformat(body.due_at)
except ValueError:
pass
await conn.execute(
"""
INSERT INTO intel_reminders (
reminder_id, person_id, lead_id, reminder_type, title,
notes, due_at, status, priority, created_by_type, created_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7, 'pending', $8, 'human', NOW()
)
""",
reminder_id,
body.person_id,
body.lead_id,
body.reminder_type,
body.title,
body.notes,
due_dt,
body.priority,
)
return {"status": "ok", "data": {"reminder_id": reminder_id, "title": body.title}}
# ── Canonical Kanban (from crm_leads) ─────────────────────────────────────
@router.get("/crm/kanban", tags=["CRM Kanban"])
async def get_canonical_kanban(request: Request) -> dict[str, Any]:
"""
Canonical Kanban board from crm_leads table.
Groups clients by lead status with QD summary.
"""
pool = await _get_pool(request)
STAGES = ["new", "contacted", "qualified", "site_visit_scheduled", "site_visited",
"negotiation", "booking_initiated", "booked", "lost", "dormant"]
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
cl.lead_id, cl.status, cl.budget_band, cl.urgency,
p.person_id, p.full_name, p.primary_phone, p.buyer_type,
COALESCE(qs.intent_value, 0.0) AS intent_score
FROM crm_leads cl
INNER JOIN crm_people p ON p.person_id = cl.person_id
LEFT JOIN LATERAL (
SELECT MAX(CASE WHEN score_type = 'intent_score' THEN current_value END) AS intent_value
FROM intel_qd_scores WHERE person_id = p.person_id
) qs ON TRUE
ORDER BY qs.intent_value DESC NULLS LAST, cl.updated_at DESC
"""
)
grouped: dict[str, list[dict]] = {s: [] for s in STAGES}
for r in rows:
s = r["status"] or "new"
grouped.setdefault(s, []).append({
"lead_id": str(r["lead_id"]),
"person_id": str(r["person_id"]),
"client_name": r["full_name"],
"client_phone": r["primary_phone"],
"buyer_type": r["buyer_type"],
"budget_band": r["budget_band"],
"urgency": r["urgency"],
"intent_score": float(r["intent_score"]),
})
board = [
{
"status": s,
"label": s.replace("_", " ").title(),
"count": len(grouped.get(s, [])),
"items": grouped.get(s, []),
}
for s in STAGES
]
return {"status": "ok", "data": board}
# ── QD Score Access ────────────────────────────────────────────────────────
@router.get("/crm/qd/{person_id}", tags=["CRM QD"])
async def get_qd_score(request: Request, person_id: str) -> dict[str, Any]:
"""QD score summary and recent timeseries for a client."""
pool = await _get_pool(request)
async with pool.acquire() as conn:
scores = await conn.fetch(
"SELECT score_type, current_value, computed_at, reasoning FROM intel_qd_scores WHERE person_id = $1::uuid",
person_id,
)
timeseries = await conn.fetch(
"""
SELECT score_type, value, timestamp, signal_source, delta
FROM intel_qd_timeseries
WHERE person_id = $1::uuid
ORDER BY timestamp DESC
LIMIT 50
""",
person_id,
)
if not scores:
raise HTTPException(status_code=404, detail=f"No QD scores for client '{person_id}'.")
return {
"status": "ok",
"data": {
"person_id": person_id,
"scores": {
r["score_type"]: {
"current_value": float(r["current_value"]),
"computed_at": r["computed_at"].isoformat() if r["computed_at"] else None,
"reasoning": r["reasoning"],
}
for r in scores
},
"timeseries": [
{
"score_type": r["score_type"],
"value": float(r["value"]),
"timestamp": r["timestamp"].isoformat() if r["timestamp"] else None,
"signal_source": r["signal_source"],
"delta": float(r["delta"]) if r["delta"] else None,
}
for r in timeseries
],
},
}

View File

@@ -0,0 +1,708 @@
-- =============================================================================
-- schema_crm_canonical.sql
-- Project Velocity — Canonical CRM and Platform Schema
-- =============================================================================
-- Covers: crm_*, intel_*, inventory_*, workflow_* canonical domains
-- as specified in Doc 09: Database Schema and Root API Spec
-- and Doc 07: Contracts and Schema Blueprint
--
-- Run AFTER schema.sql and schema_addendum.sql
-- psql -U velocity_user -d velocity_db -f schema_crm_canonical.sql
--
-- Existing tables: users_and_roles, leads_intelligence, velocity_vault_assets,
-- omnichannel_logs, consent_log, video_scene_maps,
-- perception_sessions, cctv_events, leads, chat_logs
-- These are treated as legacy feeders per the reconciliation matrix.
-- =============================================================================
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "pg_trgm";
-- ─────────────────────────────────────────────────────────────────────────────
-- ENUM TYPES — Canonical Domain
-- ─────────────────────────────────────────────────────────────────────────────
DO $$ BEGIN
CREATE TYPE crm_lead_status AS ENUM (
'new', 'contacted', 'qualified', 'site_visit_scheduled', 'site_visited',
'negotiation', 'booking_initiated', 'booked', 'lost', 'dormant'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE crm_opportunity_stage AS ENUM (
'prospect', 'qualified', 'proposal', 'site_visit', 'negotiation',
'booking', 'agreement', 'closed_won', 'closed_lost'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE crm_account_type AS ENUM (
'individual', 'company', 'broker', 'developer', 'referral_partner', 'nri_family'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE crm_relationship_type AS ENUM (
'spouse', 'parent', 'sibling', 'business_partner', 'broker_referral',
'co_buyer', 'family_member', 'advisor'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE intel_channel AS ENUM (
'whatsapp', 'phone', 'email', 'site_visit', 'office_meeting',
'video_call', 'cctv', 'perception_session', 'system'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE intel_call_direction AS ENUM ('inbound', 'outbound');
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE wf_status AS ENUM (
'pending', 'review_required', 'approved', 'rejected', 'executed', 'failed', 'cancelled'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
DO $$ BEGIN
CREATE TYPE import_lifecycle AS ENUM (
'uploaded', 'parsed', 'mapped', 'proposed', 'approved', 'committed', 'failed'
);
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
-- ─────────────────────────────────────────────────────────────────────────────
-- SECTION 1: CRM CORE DOMAIN (crm_*)
-- ─────────────────────────────────────────────────────────────────────────────
-- TABLE: crm_people
-- Purpose: canonical person-level contact identity
CREATE TABLE IF NOT EXISTS crm_people (
person_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
full_name TEXT NOT NULL,
primary_email TEXT,
primary_phone TEXT,
secondary_phone TEXT,
linkedin_url TEXT,
city TEXT,
nationality TEXT,
buyer_type TEXT, -- high_intent, slow_burn_investor, nri, etc.
persona_labels JSONB NOT NULL DEFAULT '[]'::jsonb,
source_confidence FLOAT CHECK (source_confidence BETWEEN 0.0 AND 1.0),
-- Legacy feeder references (migration linkage)
legacy_lead_id TEXT, -- links to old leads.id
legacy_li_id UUID, -- links to leads_intelligence.id
-- Metadata
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_people_email ON crm_people (primary_email);
CREATE INDEX IF NOT EXISTS idx_crm_people_phone ON crm_people (primary_phone);
CREATE INDEX IF NOT EXISTS idx_crm_people_name_trgm ON crm_people USING GIN (full_name gin_trgm_ops);
CREATE INDEX IF NOT EXISTS idx_crm_people_buyer_type ON crm_people (buyer_type);
-- TABLE: crm_accounts
-- Purpose: company, employer, brokerage, or client organization
CREATE TABLE IF NOT EXISTS crm_accounts (
account_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
account_name TEXT NOT NULL,
parent_account_id UUID REFERENCES crm_accounts(account_id) ON DELETE SET NULL,
account_type crm_account_type NOT NULL DEFAULT 'company',
industry TEXT,
location_ref TEXT,
website TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_accounts_name ON crm_accounts (account_name);
CREATE INDEX IF NOT EXISTS idx_crm_accounts_type ON crm_accounts (account_type);
-- TABLE: crm_households
-- Purpose: family or co-buyer unit grouping
CREATE TABLE IF NOT EXISTS crm_households (
household_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
household_name TEXT NOT NULL,
primary_person_id UUID REFERENCES crm_people(person_id) ON DELETE SET NULL,
notes TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- TABLE: crm_relationships
-- Purpose: person-to-person relationship graph
CREATE TABLE IF NOT EXISTS crm_relationships (
relationship_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_a_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
person_b_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
relationship_type crm_relationship_type NOT NULL,
household_id UUID REFERENCES crm_households(household_id) ON DELETE SET NULL,
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (person_a_id, person_b_id, relationship_type)
);
CREATE INDEX IF NOT EXISTS idx_crm_rel_a ON crm_relationships (person_a_id);
CREATE INDEX IF NOT EXISTS idx_crm_rel_b ON crm_relationships (person_b_id);
-- TABLE: crm_leads
-- Purpose: funnel-stage commercial qualification layer
CREATE TABLE IF NOT EXISTS crm_leads (
lead_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
account_id UUID REFERENCES crm_accounts(account_id) ON DELETE SET NULL,
source_system TEXT DEFAULT 'velocity',
status crm_lead_status NOT NULL DEFAULT 'new',
budget_band TEXT,
urgency TEXT, -- low, medium, high, critical
financing_posture TEXT, -- cash, loan, nri_remittance, emi
timeline_to_decision TEXT,
objections JSONB NOT NULL DEFAULT '[]'::jsonb,
motivations JSONB NOT NULL DEFAULT '[]'::jsonb,
assigned_user_id UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
-- Legacy feeder
legacy_lead_id TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_leads_person ON crm_leads (person_id);
CREATE INDEX IF NOT EXISTS idx_crm_leads_status ON crm_leads (status);
CREATE INDEX IF NOT EXISTS idx_crm_leads_assigned ON crm_leads (assigned_user_id);
CREATE INDEX IF NOT EXISTS idx_crm_leads_source ON crm_leads (source_system);
-- TABLE: crm_opportunities
-- Purpose: deal pipeline objects
CREATE TABLE IF NOT EXISTS crm_opportunities (
opportunity_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lead_id UUID NOT NULL REFERENCES crm_leads(lead_id) ON DELETE CASCADE,
project_id UUID, -- references inventory_projects
unit_id UUID, -- references inventory_units
stage crm_opportunity_stage NOT NULL DEFAULT 'prospect',
value DECIMAL(15, 2),
probability INTEGER CHECK (probability BETWEEN 0 AND 100),
expected_close_date DATE,
next_action TEXT,
notes TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_opp_lead ON crm_opportunities (lead_id);
CREATE INDEX IF NOT EXISTS idx_crm_opp_stage ON crm_opportunities (stage);
CREATE INDEX IF NOT EXISTS idx_crm_opp_project ON crm_opportunities (project_id);
-- TABLE: crm_property_interests
-- Purpose: project and unit interest linking per client
CREATE TABLE IF NOT EXISTS crm_property_interests (
interest_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
lead_id UUID REFERENCES crm_leads(lead_id) ON DELETE SET NULL,
project_id UUID,
project_name TEXT NOT NULL,
unit_preference TEXT,
configuration TEXT, -- 2BHK, 3BHK, Penthouse, etc.
budget_min DECIMAL(15, 2),
budget_max DECIMAL(15, 2),
priority INTEGER DEFAULT 1, -- 1 = primary, 2 = secondary
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_pi_person ON crm_property_interests (person_id);
CREATE INDEX IF NOT EXISTS idx_crm_pi_project ON crm_property_interests (project_id);
-- TABLE: crm_stage_history
-- Purpose: canonical audit trail of lead stage transitions
CREATE TABLE IF NOT EXISTS crm_stage_history (
history_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lead_id UUID NOT NULL REFERENCES crm_leads(lead_id) ON DELETE CASCADE,
from_status TEXT,
to_status TEXT NOT NULL,
changed_by UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
changed_by_type TEXT DEFAULT 'human', -- human, ai, system
notes TEXT,
happened_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_crm_stage_lead ON crm_stage_history (lead_id, happened_at DESC);
-- ─────────────────────────────────────────────────────────────────────────────
-- SECTION 2: INTERACTION AND EVIDENCE GRAPH (intel_*)
-- ─────────────────────────────────────────────────────────────────────────────
-- TABLE: intel_interactions
-- Purpose: umbrella interaction event record
CREATE TABLE IF NOT EXISTS intel_interactions (
interaction_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
lead_id UUID REFERENCES crm_leads(lead_id) ON DELETE SET NULL,
channel intel_channel NOT NULL,
interaction_type TEXT NOT NULL, -- message, call, visit, email, note
happened_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
summary TEXT,
source_ref TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_intel_int_person ON intel_interactions (person_id, happened_at DESC);
CREATE INDEX IF NOT EXISTS idx_intel_int_lead ON intel_interactions (lead_id, happened_at DESC);
CREATE INDEX IF NOT EXISTS idx_intel_int_channel ON intel_interactions (channel);
-- TABLE: intel_messages
-- Purpose: text-level message records (WhatsApp, chat)
CREATE TABLE IF NOT EXISTS intel_messages (
message_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
interaction_id UUID NOT NULL REFERENCES intel_interactions(interaction_id) ON DELETE CASCADE,
thread_id UUID,
sender_role TEXT NOT NULL, -- lead, broker, system, oracle
sender_name TEXT,
message_text TEXT NOT NULL,
delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_msg_interaction ON intel_messages (interaction_id, delivered_at DESC);
-- TABLE: intel_whatsapp_threads
-- Purpose: WhatsApp thread-level summaries
CREATE TABLE IF NOT EXISTS intel_whatsapp_threads (
thread_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
lead_id UUID REFERENCES crm_leads(lead_id) ON DELETE SET NULL,
phone_number TEXT,
thread_summary TEXT,
message_count INTEGER DEFAULT 0,
last_message_at TIMESTAMPTZ,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_intel_wa_person ON intel_whatsapp_threads (person_id);
-- TABLE: intel_calls
-- Purpose: voice call records
CREATE TABLE IF NOT EXISTS intel_calls (
call_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
interaction_id UUID NOT NULL REFERENCES intel_interactions(interaction_id) ON DELETE CASCADE,
call_direction intel_call_direction NOT NULL DEFAULT 'outbound',
duration_seconds INTEGER,
recording_ref TEXT, -- storage path or URL to recording
transcript_ref TEXT, -- path to transcript JSON
call_outcome TEXT, -- connected, no_answer, voicemail, dropped
called_number TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_call_interaction ON intel_calls (interaction_id);
-- TABLE: intel_transcripts
-- Purpose: transcript and speaker segmentation storage
CREATE TABLE IF NOT EXISTS intel_transcripts (
transcript_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
call_id UUID REFERENCES intel_calls(call_id) ON DELETE SET NULL,
interaction_id UUID REFERENCES intel_interactions(interaction_id) ON DELETE SET NULL,
language TEXT DEFAULT 'en',
full_text TEXT,
speaker_segments_json JSONB NOT NULL DEFAULT '[]'::jsonb,
confidence FLOAT CHECK (confidence BETWEEN 0.0 AND 1.0),
word_count INTEGER,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_intel_transcript_call ON intel_transcripts (call_id);
CREATE INDEX IF NOT EXISTS idx_intel_transcript_interaction ON intel_transcripts (interaction_id);
-- TABLE: intel_emails
-- Purpose: email thread records
CREATE TABLE IF NOT EXISTS intel_emails (
email_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
interaction_id UUID NOT NULL REFERENCES intel_interactions(interaction_id) ON DELETE CASCADE,
from_address TEXT,
to_addresses JSONB NOT NULL DEFAULT '[]'::jsonb,
subject TEXT,
body_text TEXT,
has_attachments BOOLEAN DEFAULT FALSE,
sent_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_email_interaction ON intel_emails (interaction_id);
-- TABLE: intel_visits
-- Purpose: site visit and meeting records
CREATE TABLE IF NOT EXISTS intel_visits (
visit_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
lead_id UUID REFERENCES crm_leads(lead_id) ON DELETE SET NULL,
project_id UUID,
project_name TEXT,
unit_id UUID,
visited_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
visit_notes TEXT,
host_user_id UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
revisit_intent TEXT, -- very_likely, likely, uncertain, unlikely
cctv_session_ref TEXT,
perception_session_ref TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_visits_person ON intel_visits (person_id, visited_at DESC);
CREATE INDEX IF NOT EXISTS idx_intel_visits_project ON intel_visits (project_id);
-- TABLE: intel_reminders
-- Purpose: reminders and follow-up task chains
CREATE TABLE IF NOT EXISTS intel_reminders (
reminder_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
lead_id UUID REFERENCES crm_leads(lead_id) ON DELETE SET NULL,
opportunity_id UUID REFERENCES crm_opportunities(opportunity_id) ON DELETE SET NULL,
reminder_type TEXT NOT NULL, -- call_back, follow_up, site_visit, document, negotiation
title TEXT NOT NULL,
notes TEXT,
due_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'pending', -- pending, done, snoozed, cancelled
assigned_to UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
created_by_type TEXT DEFAULT 'human', -- human, ai, system
priority TEXT DEFAULT 'normal', -- low, normal, high, urgent
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_intel_reminder_person ON intel_reminders (person_id, due_at);
CREATE INDEX IF NOT EXISTS idx_intel_reminder_status ON intel_reminders (status, due_at);
CREATE INDEX IF NOT EXISTS idx_intel_reminder_assigned ON intel_reminders (assigned_to, due_at);
-- TABLE: intel_qd_scores
-- Purpose: latest meaningful QD summary by client
CREATE TABLE IF NOT EXISTS intel_qd_scores (
qd_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
score_type TEXT NOT NULL, -- intent_score, urgency_score, engagement_score
current_value FLOAT NOT NULL CHECK (current_value BETWEEN 0.0 AND 1.0),
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
evidence_refs_json JSONB NOT NULL DEFAULT '[]'::jsonb,
reasoning TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
UNIQUE (person_id, score_type)
);
CREATE INDEX IF NOT EXISTS idx_intel_qd_person ON intel_qd_scores (person_id);
-- TABLE: intel_qd_timeseries
-- Purpose: time-series QD propagation and shifts
CREATE TABLE IF NOT EXISTS intel_qd_timeseries (
timeseries_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID NOT NULL REFERENCES crm_people(person_id) ON DELETE CASCADE,
score_type TEXT NOT NULL,
signal_source TEXT,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
value FLOAT NOT NULL CHECK (value BETWEEN 0.0 AND 1.0),
delta FLOAT,
evidence_ref TEXT,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_qd_ts_person ON intel_qd_timeseries (person_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_intel_qd_ts_type ON intel_qd_timeseries (score_type, timestamp DESC);
-- TABLE: intel_vehicle_events
-- Purpose: number-plate and vehicle detection events
CREATE TABLE IF NOT EXISTS intel_vehicle_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID REFERENCES crm_people(person_id) ON DELETE SET NULL,
visit_id UUID REFERENCES intel_visits(visit_id) ON DELETE SET NULL,
zone TEXT,
license_plate_hash TEXT, -- hashed for privacy
vehicle_class TEXT, -- luxury, standard, unknown
wealth_indicator TEXT, -- HNI, standard, unknown
cctv_ref TEXT,
captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_vehicle_person ON intel_vehicle_events (person_id);
-- TABLE: intel_perception_events
-- Purpose: behavioral and dwell-time intelligence from perception sessions
CREATE TABLE IF NOT EXISTS intel_perception_events (
perception_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID REFERENCES crm_people(person_id) ON DELETE SET NULL,
visit_id UUID REFERENCES intel_visits(visit_id) ON DELETE SET NULL,
session_ref TEXT, -- perception_sessions.id linkage
event_type TEXT NOT NULL, -- room_dwell, engagement_spike, exit
rooms_visited JSONB NOT NULL DEFAULT '[]'::jsonb,
dwell_time_seconds INTEGER,
engagement_score FLOAT CHECK (engagement_score BETWEEN 0.0 AND 1.0),
camera_id TEXT,
media_ref TEXT,
happened_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_perception_person ON intel_perception_events (person_id);
-- TABLE: intel_cctv_links
-- Purpose: CCTV evidence references linked to client/visit contexts
CREATE TABLE IF NOT EXISTS intel_cctv_links (
link_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
person_id UUID REFERENCES crm_people(person_id) ON DELETE SET NULL,
visit_id UUID REFERENCES intel_visits(visit_id) ON DELETE SET NULL,
cctv_event_id UUID REFERENCES cctv_events(id) ON DELETE SET NULL,
clip_ref TEXT,
camera_zone TEXT,
confidence FLOAT CHECK (confidence BETWEEN 0.0 AND 1.0),
linked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE INDEX IF NOT EXISTS idx_intel_cctv_person ON intel_cctv_links (person_id);
-- ─────────────────────────────────────────────────────────────────────────────
-- SECTION 3: INVENTORY DOMAIN (inventory_*)
-- ─────────────────────────────────────────────────────────────────────────────
-- TABLE: inventory_projects
-- Purpose: project-level inventory master
CREATE TABLE IF NOT EXISTS inventory_projects (
project_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_name TEXT NOT NULL UNIQUE,
developer_name TEXT NOT NULL,
city TEXT NOT NULL DEFAULT 'Kolkata',
micro_market TEXT,
address TEXT,
total_units INTEGER,
rera_number TEXT,
project_status TEXT DEFAULT 'active', -- active, sold_out, upcoming
launch_date DATE,
possession_date DATE,
location_json JSONB NOT NULL DEFAULT '{}'::jsonb,
amenities_json JSONB NOT NULL DEFAULT '[]'::jsonb,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_inv_projects_name ON inventory_projects (project_name);
CREATE INDEX IF NOT EXISTS idx_inv_projects_market ON inventory_projects (micro_market);
-- TABLE: inventory_units
-- Purpose: unit-level availability and attributes
CREATE TABLE IF NOT EXISTS inventory_units (
unit_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES inventory_projects(project_id) ON DELETE CASCADE,
unit_label TEXT NOT NULL,
configuration TEXT NOT NULL, -- 2BHK, 3BHK, Penthouse, etc.
area_sqft DECIMAL(10, 2),
price_current DECIMAL(15, 2),
price_psf DECIMAL(10, 2),
status TEXT NOT NULL DEFAULT 'available', -- available, reserved, sold, hold
floor INTEGER,
tower TEXT,
facing TEXT,
has_attached_amenities JSONB NOT NULL DEFAULT '[]'::jsonb,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (project_id, unit_label)
);
CREATE INDEX IF NOT EXISTS idx_inv_units_project ON inventory_units (project_id);
CREATE INDEX IF NOT EXISTS idx_inv_units_status ON inventory_units (status);
CREATE INDEX IF NOT EXISTS idx_inv_units_config ON inventory_units (configuration);
-- TABLE: inventory_import_jobs
-- Purpose: track inventory CSV import operations
CREATE TABLE IF NOT EXISTS inventory_import_jobs (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID REFERENCES inventory_projects(project_id) ON DELETE SET NULL,
filename TEXT NOT NULL,
row_count INTEGER,
imported_by UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
status TEXT NOT NULL DEFAULT 'pending',
errors_json JSONB NOT NULL DEFAULT '[]'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
-- ─────────────────────────────────────────────────────────────────────────────
-- SECTION 4: AI WORKFLOW AND GOVERNANCE (workflow_*)
-- ─────────────────────────────────────────────────────────────────────────────
-- TABLE: workflow_actions
-- Purpose: track proposed AI/human actions before approval
CREATE TABLE IF NOT EXISTS workflow_actions (
action_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
action_type TEXT NOT NULL, -- import_review, merge_proposal, writeback, enrichment
target_domain TEXT NOT NULL, -- crm, intel, inventory
target_entity_ref TEXT,
proposal_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
reasoning_summary TEXT,
evidence_refs JSONB NOT NULL DEFAULT '[]'::jsonb,
confidence FLOAT CHECK (confidence BETWEEN 0.0 AND 1.0),
status wf_status NOT NULL DEFAULT 'pending',
approval_required BOOLEAN NOT NULL DEFAULT TRUE,
created_by_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_wf_actions_status ON workflow_actions (status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_wf_actions_domain ON workflow_actions (target_domain);
-- TABLE: workflow_approvals
-- Purpose: explicit human review decisions
CREATE TABLE IF NOT EXISTS workflow_approvals (
decision_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
action_id UUID NOT NULL REFERENCES workflow_actions(action_id) ON DELETE CASCADE,
reviewer_id UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
decision TEXT NOT NULL, -- approved, rejected, needs_more_info
decision_notes TEXT,
decided_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_wf_approvals_action ON workflow_approvals (action_id);
-- TABLE: workflow_writebacks
-- Purpose: track AI-suggested and approved canonical mutations
CREATE TABLE IF NOT EXISTS workflow_writebacks (
writeback_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
action_id UUID REFERENCES workflow_actions(action_id) ON DELETE SET NULL,
approval_id UUID REFERENCES workflow_approvals(decision_id) ON DELETE SET NULL,
target_domain TEXT NOT NULL,
target_entity_ref TEXT NOT NULL,
change_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status wf_status NOT NULL DEFAULT 'pending',
approved_by UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
executed_at TIMESTAMPTZ,
error_detail TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_wf_wb_status ON workflow_writebacks (status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_wf_wb_domain ON workflow_writebacks (target_domain);
-- TABLE: workflow_import_batches
-- Purpose: CRM import batch lifecycle tracking (RawImportBatch contract)
CREATE TABLE IF NOT EXISTS workflow_import_batches (
batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_system TEXT NOT NULL, -- csv_upload, salesforce, hubspot, manual
uploaded_filename TEXT,
mime_type TEXT DEFAULT 'text/csv',
storage_ref TEXT,
row_count INTEGER,
mapped_count INTEGER DEFAULT 0,
unresolved_count INTEGER DEFAULT 0,
canonical_count INTEGER DEFAULT 0,
uploaded_by UUID REFERENCES users_and_roles(id) ON DELETE SET NULL,
lifecycle import_lifecycle NOT NULL DEFAULT 'uploaded',
mapping_manifest JSONB NOT NULL DEFAULT '{}'::jsonb,
errors_json JSONB NOT NULL DEFAULT '[]'::jsonb,
metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_wf_import_lifecycle ON workflow_import_batches (lifecycle, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_wf_import_user ON workflow_import_batches (uploaded_by);
-- TABLE: workflow_agent_runs
-- Purpose: track NemoClaw and AI agent invocation logs
CREATE TABLE IF NOT EXISTS workflow_agent_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_name TEXT NOT NULL, -- nemoclaw, import_mapper, enrichment_engine
trigger_type TEXT NOT NULL, -- import, enrichment, qd_update, writeback
trigger_ref TEXT,
input_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
output_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL DEFAULT 'running', -- running, completed, failed
duration_ms INTEGER,
error_detail TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_wf_agent_runs_agent ON workflow_agent_runs (agent_name, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_wf_agent_runs_status ON workflow_agent_runs (status);
-- ─────────────────────────────────────────────────────────────────────────────
-- TRIGGERS: auto-update updated_at
-- ─────────────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION set_canonical_updated_at()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$;
DO $$ DECLARE
t TEXT;
BEGIN
FOREACH t IN ARRAY ARRAY[
'crm_people', 'crm_accounts', 'crm_leads', 'crm_opportunities',
'inventory_projects', 'inventory_units',
'workflow_actions', 'workflow_import_batches'
] LOOP
EXECUTE format(
'DROP TRIGGER IF EXISTS trg_%s_updated_at ON %s;
CREATE TRIGGER trg_%s_updated_at
BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE FUNCTION set_canonical_updated_at();',
t, t, t, t
);
END LOOP;
END $$;
-- ─────────────────────────────────────────────────────────────────────────────
-- INVENTORY SEED: 14 Canonical Kolkata Projects
-- ─────────────────────────────────────────────────────────────────────────────
INSERT INTO inventory_projects (project_id, project_name, developer_name, city, micro_market)
VALUES
(gen_random_uuid(), 'Eden Devprayag', 'Eden Group', 'Kolkata', 'Rajarhat'),
(gen_random_uuid(), 'Sugam Prakriti', 'Sugam Homes', 'Kolkata', 'Barasat'),
(gen_random_uuid(), 'Atri Aqua', 'Atri Developers', 'Kolkata', 'New Town'),
(gen_random_uuid(), 'Atri Surya Toron', 'Atri Developers', 'Kolkata', 'Rajarhat'),
(gen_random_uuid(), 'Siddha Suburbia Bungalow', 'Siddha Group', 'Kolkata', 'Madanpur'),
(gen_random_uuid(), 'Merlin Avana', 'Merlin Group', 'Kolkata', 'Tangra'),
(gen_random_uuid(), 'DTC Good Earth', 'DTC Projects', 'Kolkata', 'New Town'),
(gen_random_uuid(), 'Siddha Serena', 'Siddha Group', 'Kolkata', 'New Town'),
(gen_random_uuid(), 'Siddha Sky Waterfront', 'Siddha Group', 'Kolkata', 'Beliaghata'),
(gen_random_uuid(), 'Godrej Blue', 'Godrej Properties', 'Kolkata', 'New Town'),
(gen_random_uuid(), 'DTC Sojon', 'DTC Projects', 'Kolkata', 'Rajarhat'),
(gen_random_uuid(), 'Shriram Grand City', 'Shriram Properties', 'Kolkata', 'Howrah'),
(gen_random_uuid(), 'Godrej Elevate', 'Godrej Properties', 'Kolkata', 'Dum Dum'),
(gen_random_uuid(), 'Ambuja Utpaala', 'Ambuja Neotia', 'Kolkata', 'Tollygunge')
ON CONFLICT (project_name) DO NOTHING;
-- ─────────────────────────────────────────────────────────────────────────────
-- COMMENTS
-- ─────────────────────────────────────────────────────────────────────────────
COMMENT ON TABLE crm_people IS 'Canonical person-level contact identity. Primary join key across all CRM tables.';
COMMENT ON TABLE crm_leads IS 'Funnel-stage commercial qualification. One person may have multiple lead contexts.';
COMMENT ON TABLE crm_opportunities IS 'Deal pipeline objects linked to leads and inventory.';
COMMENT ON TABLE intel_interactions IS 'Umbrella interaction event. All channels (WhatsApp, call, email, visit) link here.';
COMMENT ON TABLE intel_transcripts IS 'Speaker-segmented call transcripts. speaker_segments_json is first-class data.';
COMMENT ON TABLE intel_qd_scores IS 'Latest QD summary by score_type per client. UNIQUE constraint enforces one row per type.';
COMMENT ON TABLE inventory_projects IS 'Master project catalog. 14 canonical Kolkata projects seeded.';
COMMENT ON TABLE workflow_import_batches IS 'RawImportBatch contract. Immutable after upload.';
COMMENT ON TABLE workflow_writebacks IS 'AI-proposed canonical mutations. Never auto-execute without approval.';

View File

@@ -27,6 +27,7 @@ from backend.api.routes_mobile_edge import router as mobile_edge_router
from backend.api.routes_inventory import router as inventory_router
from backend.api.routes_admin_surface import router as admin_surface_router
from backend.api.routes_oracle_templates import router as oracle_templates_router
from backend.api.routes_crm_imports import router as crm_imports_router
from backend.auth.dependencies import (
create_access_token, verify_password, get_current_user
)
@@ -106,6 +107,7 @@ app.include_router(vault_router, prefix="/api/vault", tags=["Vault"])
app.include_router(mobile_edge_router, prefix="/api/mobile-edge", tags=["Mobile Edge"])
app.include_router(inventory_router, prefix="/api/inventory", tags=["Inventory"])
app.include_router(admin_surface_router, prefix="/api/admin-surface", tags=["Admin Surface"])
app.include_router(crm_imports_router, prefix="/api", tags=["CRM Canonical"])
# Public vault link (no /api prefix — shared externally with prospects)
from backend.routers.vault import router as public_vault_router

View File

@@ -0,0 +1,458 @@
#!/usr/bin/env python3
"""
backend/scripts/seed_synthetic_crm.py
Seed the canonical CRM tables from the synthetic dataset CSVs.
Usage:
python -m backend.scripts.seed_synthetic_crm [--dry-run] [--limit N]
Reads from: db assets/synthetic_crm_v1/csv/
Writes to: canonical crm_*, intel_*, inventory_* tables
This script implements the import → canonical commit flow without going through
the HTTP import review UI — for initial database seeding only.
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import logging
import os
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger("velocity.seed")
# ── Data directory ────────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).parent.parent.parent
CSV_DIR = REPO_ROOT / "db assets" / "synthetic_crm_v1" / "csv"
def read_csv(filename: str) -> list[dict]:
path = CSV_DIR / filename
if not path.exists():
logger.warning("CSV not found: %s", path)
return []
with open(path, encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def safe_float(val: str | None, default: float | None = None) -> float | None:
if not val or val.strip() in ("", "null", "None", "nan"):
return default
try:
return float(val)
except (ValueError, TypeError):
return default
def safe_int(val: str | None, default: int | None = None) -> int | None:
if not val or val.strip() in ("", "null", "None"):
return default
try:
return int(float(val))
except (ValueError, TypeError):
return default
def safe_dt(val: str | None) -> datetime | None:
if not val or val.strip() in ("", "null", "None"):
return None
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y-%m-%dT%H:%M:%S.%f"):
try:
return datetime.strptime(val.strip(), fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
async def seed(dry_run: bool = False, limit: int | None = None) -> None:
from backend.db.pool import create_pool, close_pool
logger.info("Connecting to database…")
pool = await create_pool()
async with pool.acquire() as conn:
# Verify canonical schema exists
exists = await conn.fetchval(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'crm_people')"
)
if not exists:
logger.error("Canonical schema not found. Run schema_crm_canonical.sql first.")
return
# ── Phase 1: Inventory Projects ──────────────────────────────────────────
logger.info("[1/9] Seeding inventory_projects…")
projects_rows = read_csv("inventory_projects.csv")
project_name_to_id: dict[str, str] = {}
if not dry_run:
async with pool.acquire() as conn:
for row in projects_rows:
pname = row.get("project_name", "").strip()
if not pname:
continue
pid = await conn.fetchval(
"SELECT project_id FROM inventory_projects WHERE project_name = $1",
pname,
)
if pid:
project_name_to_id[pname] = str(pid)
continue
pid = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO inventory_projects (project_id, project_name, developer_name, city, micro_market, created_at, updated_at)
VALUES ($1::uuid, $2, $3, $4, $5, NOW(), NOW())
ON CONFLICT (project_name) DO NOTHING
""",
pid,
pname,
row.get("developer_name", ""),
row.get("city", "Kolkata"),
row.get("micro_market", ""),
)
project_name_to_id[pname] = pid
logger.info("%d projects mapped", len(project_name_to_id))
# ── Phase 2: crm_people ──────────────────────────────────────────────────
logger.info("[2/9] Seeding crm_people…")
people_rows = read_csv("crm_people.csv")
if limit:
people_rows = people_rows[:limit]
person_id_map: dict[str, str] = {} # original CSV person_id → DB person_id
if not dry_run:
async with pool.acquire() as conn:
for row in people_rows:
src_id = row.get("person_id", "")
full_name = row.get("full_name", "").strip()
if not full_name:
continue
new_id = str(uuid.uuid4())
persona_labels: list[str] = []
raw_labels = row.get("persona_labels", "")
if raw_labels.startswith("["):
try:
persona_labels = json.loads(raw_labels)
except json.JSONDecodeError:
pass
await conn.execute(
"""
INSERT INTO crm_people (
person_id, full_name, primary_email, primary_phone,
buyer_type, persona_labels, source_confidence,
legacy_lead_id, metadata_json, created_at, updated_at
) VALUES (
$1::uuid, $2, $3, $4, $5, $6::jsonb, $7,
$8, $9::jsonb, NOW(), NOW()
)
ON CONFLICT DO NOTHING
""",
new_id,
full_name,
row.get("primary_email") or None,
row.get("primary_phone") or None,
row.get("buyer_type") or None,
json.dumps(persona_labels),
safe_float(row.get("source_confidence"), 0.8),
src_id or None,
json.dumps({"synthetic": True, "source_id": src_id}),
)
person_id_map[src_id] = new_id
logger.info("%d people seeded", len(person_id_map))
# ── Phase 3: crm_leads ───────────────────────────────────────────────────
logger.info("[3/9] Seeding crm_leads…")
leads_rows = read_csv("crm_leads.csv")
lead_id_map: dict[str, str] = {}
VALID_STATUSES = {
'new', 'contacted', 'qualified', 'site_visit_scheduled', 'site_visited',
'negotiation', 'booking_initiated', 'booked', 'lost', 'dormant'
}
if not dry_run:
async with pool.acquire() as conn:
for row in leads_rows:
src_person_id = row.get("person_id", "")
db_person_id = person_id_map.get(src_person_id)
if not db_person_id:
continue
src_lead_id = row.get("lead_id", "")
raw_status = row.get("status", "new").lower().strip()
status = raw_status if raw_status in VALID_STATUSES else "new"
new_lead_id = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO crm_leads (
lead_id, person_id, source_system, status,
budget_band, urgency, financing_posture,
timeline_to_decision, legacy_lead_id,
metadata_json, created_at, updated_at
) VALUES (
$1::uuid, $2::uuid, $3, $4::crm_lead_status,
$5, $6, $7, $8, $9, $10::jsonb, NOW(), NOW()
)
ON CONFLICT DO NOTHING
""",
new_lead_id,
db_person_id,
row.get("source_system", "csv_upload"),
status,
row.get("budget_band") or None,
row.get("urgency") or None,
row.get("financing_posture") or None,
row.get("timeline_to_decision") or None,
src_lead_id or None,
json.dumps({"synthetic": True, "source_lead_id": src_lead_id}),
)
lead_id_map[src_lead_id] = new_lead_id
logger.info("%d leads seeded", len(lead_id_map))
# ── Phase 4: crm_property_interests ─────────────────────────────────────
logger.info("[4/9] Seeding crm_property_interests…")
pi_rows = read_csv("crm_property_interests.csv")
seeded_pi = 0
if not dry_run:
async with pool.acquire() as conn:
for row in pi_rows:
src_person_id = row.get("person_id", "")
db_person_id = person_id_map.get(src_person_id)
if not db_person_id:
continue
db_lead_id = lead_id_map.get(row.get("lead_id", ""))
project_name = row.get("project_name", "").strip()
if not project_name:
continue
await conn.execute(
"""
INSERT INTO crm_property_interests (
interest_id, person_id, lead_id, project_name,
unit_preference, configuration, budget_min, budget_max, priority, created_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7, $8, $9, NOW()
)
ON CONFLICT DO NOTHING
""",
str(uuid.uuid4()),
db_person_id,
db_lead_id,
project_name,
row.get("unit_preference") or None,
row.get("configuration") or None,
safe_float(row.get("budget_min")),
safe_float(row.get("budget_max")),
safe_int(row.get("priority"), 1),
)
seeded_pi += 1
logger.info("%d property interests seeded", seeded_pi)
# ── Phase 5: intel_interactions ──────────────────────────────────────────
logger.info("[5/9] Seeding intel_interactions…")
int_rows = read_csv("intel_interactions.csv")
interaction_id_map: dict[str, str] = {}
VALID_CHANNELS = {
'whatsapp', 'phone', 'email', 'site_visit', 'office_meeting',
'video_call', 'cctv', 'perception_session', 'system'
}
if not dry_run:
async with pool.acquire() as conn:
for row in int_rows:
src_person_id = row.get("person_id", "")
db_person_id = person_id_map.get(src_person_id)
if not db_person_id:
continue
raw_channel = row.get("channel", "system").lower().strip()
channel = raw_channel if raw_channel in VALID_CHANNELS else "system"
src_int_id = row.get("interaction_id", "")
new_int_id = str(uuid.uuid4())
happened_at = safe_dt(row.get("happened_at")) or datetime.now(timezone.utc)
db_lead_id = lead_id_map.get(row.get("lead_id", ""))
await conn.execute(
"""
INSERT INTO intel_interactions (
interaction_id, person_id, lead_id, channel,
interaction_type, happened_at, summary, created_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4::intel_channel,
$5, $6, $7, NOW()
)
ON CONFLICT DO NOTHING
""",
new_int_id,
db_person_id,
db_lead_id,
channel,
row.get("interaction_type", "message"),
happened_at,
row.get("summary") or None,
)
interaction_id_map[src_int_id] = new_int_id
logger.info("%d interactions seeded", len(interaction_id_map))
# ── Phase 6: intel_qd_scores ─────────────────────────────────────────────
logger.info("[6/9] Seeding intel_qd_scores…")
qd_rows = read_csv("intel_qd_scores.csv")
seeded_qd = 0
if not dry_run:
async with pool.acquire() as conn:
for row in qd_rows:
src_person_id = row.get("person_id", "")
db_person_id = person_id_map.get(src_person_id)
if not db_person_id:
continue
score_type = row.get("score_type", "intent_score")
current_value = safe_float(row.get("current_value"), 0.5)
if current_value is None:
continue
current_value = max(0.0, min(1.0, current_value))
await conn.execute(
"""
INSERT INTO intel_qd_scores (
qd_id, person_id, score_type, current_value,
reasoning, computed_at
) VALUES (
$1::uuid, $2::uuid, $3, $4, $5, NOW()
)
ON CONFLICT (person_id, score_type) DO UPDATE
SET current_value = EXCLUDED.current_value,
computed_at = NOW()
""",
str(uuid.uuid4()),
db_person_id,
score_type,
current_value,
row.get("reasoning") or None,
)
seeded_qd += 1
logger.info("%d QD scores seeded", seeded_qd)
# ── Phase 7: intel_reminders ─────────────────────────────────────────────
logger.info("[7/9] Seeding intel_reminders…")
rem_rows = read_csv("intel_reminders.csv")
seeded_rem = 0
if not dry_run:
async with pool.acquire() as conn:
for row in rem_rows:
src_person_id = row.get("person_id", "")
db_person_id = person_id_map.get(src_person_id)
if not db_person_id:
continue
title = row.get("title", "").strip()
if not title:
continue
db_lead_id = lead_id_map.get(row.get("lead_id", ""))
await conn.execute(
"""
INSERT INTO intel_reminders (
reminder_id, person_id, lead_id, reminder_type, title, notes,
due_at, status, priority, created_by_type, created_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4, $5, $6,
$7, $8, $9, 'system', NOW()
)
ON CONFLICT DO NOTHING
""",
str(uuid.uuid4()),
db_person_id,
db_lead_id,
row.get("reminder_type", "follow_up"),
title,
row.get("notes") or None,
safe_dt(row.get("due_at")),
row.get("status", "pending"),
row.get("priority", "normal"),
)
seeded_rem += 1
logger.info("%d reminders seeded", seeded_rem)
# ── Phase 8: crm_stage_history ───────────────────────────────────────────
logger.info("[8/9] Seeding crm_stage_history…")
hist_rows = read_csv("crm_stage_history.csv")
seeded_hist = 0
if not dry_run:
async with pool.acquire() as conn:
for row in hist_rows:
src_lead_id = row.get("lead_id", "")
db_lead_id = lead_id_map.get(src_lead_id)
if not db_lead_id:
continue
await conn.execute(
"""
INSERT INTO crm_stage_history (
history_id, lead_id, from_status, to_status, notes, happened_at
) VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
""",
str(uuid.uuid4()),
db_lead_id,
row.get("from_status") or None,
row.get("to_status", "new"),
row.get("notes") or None,
safe_dt(row.get("happened_at")) or datetime.now(timezone.utc),
)
seeded_hist += 1
logger.info("%d stage history records seeded", seeded_hist)
# ── Phase 9: Summary ─────────────────────────────────────────────────────
logger.info("[9/9] Seeding complete.")
logger.info(
"Summary: people=%d, leads=%d, interactions=%d, qd_scores=%d, reminders=%d, stage_history=%d",
len(person_id_map),
len(lead_id_map),
len(interaction_id_map),
seeded_qd,
seeded_rem,
seeded_hist,
)
if dry_run:
logger.info("DRY RUN — no data was written to the database.")
await close_pool()
def main() -> None:
parser = argparse.ArgumentParser(description="Seed canonical CRM tables from synthetic data CSVs")
parser.add_argument("--dry-run", action="store_true", help="Parse and validate without writing to DB")
parser.add_argument("--limit", type=int, default=None, help="Limit number of people to seed (for testing)")
args = parser.parse_args()
asyncio.run(seed(dry_run=args.dry_run, limit=args.limit))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
"""
backend/services/client_graph/__init__.py
"""

View File

@@ -0,0 +1,369 @@
"""
backend/services/client_graph/aggregation_service.py
Client 360 Aggregation Service
Produces Client360Snapshot read models by joining across
crm_people, crm_leads, crm_opportunities, intel_interactions,
intel_reminders, intel_qd_scores, crm_property_interests.
This is a derived read model — never the sole source of truth.
As specified in Doc 07 (Client360Snapshot contract) and Doc 08 (Adapter Spec).
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger("velocity.client_graph.aggregation")
def _serialize_person(row: Any) -> dict[str, Any]:
return {
"person_id": str(row["person_id"]),
"full_name": row["full_name"],
"primary_email": row["primary_email"],
"primary_phone": row["primary_phone"],
"buyer_type": row["buyer_type"],
"persona_labels": row["persona_labels"] or [],
"source_confidence": float(row["source_confidence"] or 0.0),
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
}
def _serialize_lead(row: Any) -> dict[str, Any]:
return {
"lead_id": str(row["lead_id"]),
"status": row["status"],
"budget_band": row["budget_band"],
"urgency": row["urgency"],
"financing_posture": row["financing_posture"],
"timeline_to_decision": row["timeline_to_decision"],
"objections": row["objections"] or [],
"motivations": row["motivations"] or [],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
}
def _serialize_opportunity(row: Any) -> dict[str, Any]:
return {
"opportunity_id": str(row["opportunity_id"]),
"stage": row["stage"],
"value": float(row["value"]) if row["value"] else None,
"probability": row["probability"],
"expected_close_date": row["expected_close_date"].isoformat() if row["expected_close_date"] else None,
"next_action": row["next_action"],
"project_id": str(row["project_id"]) if row["project_id"] else None,
"unit_id": str(row["unit_id"]) if row["unit_id"] else None,
}
def _serialize_interaction(row: Any) -> dict[str, Any]:
return {
"interaction_id": str(row["interaction_id"]),
"channel": row["channel"],
"interaction_type": row["interaction_type"],
"happened_at": row["happened_at"].isoformat() if row["happened_at"] else None,
"summary": row["summary"],
}
def _serialize_reminder(row: Any) -> dict[str, Any]:
return {
"reminder_id": str(row["reminder_id"]),
"reminder_type": row["reminder_type"],
"title": row["title"],
"due_at": row["due_at"].isoformat() if row["due_at"] else None,
"status": row["status"],
"priority": row["priority"],
}
def _serialize_qd_score(row: Any) -> dict[str, Any]:
return {
"score_type": row["score_type"],
"current_value": float(row["current_value"]),
"computed_at": row["computed_at"].isoformat() if row["computed_at"] else None,
"reasoning": row["reasoning"],
}
def _serialize_property_interest(row: Any) -> dict[str, Any]:
return {
"interest_id": str(row["interest_id"]),
"project_name": row["project_name"],
"unit_preference": row["unit_preference"],
"configuration": row["configuration"],
"budget_min": float(row["budget_min"]) if row["budget_min"] else None,
"budget_max": float(row["budget_max"]) if row["budget_max"] else None,
"priority": row["priority"],
}
async def get_client_360(conn: Any, person_id: str) -> dict[str, Any] | None:
"""
Aggregate a full Client360Snapshot for a given person_id.
This is a read model — derived from canonical tables, never primary truth.
"""
# 1. Core identity
person_row = await conn.fetchrow(
"""
SELECT person_id, full_name, primary_email, primary_phone,
buyer_type, persona_labels, source_confidence, created_at
FROM crm_people
WHERE person_id = $1::uuid
""",
person_id,
)
if not person_row:
return None
identity = _serialize_person(person_row)
# 2. Account links
account_rows = await conn.fetch(
"""
SELECT ca.account_id, ca.account_name, ca.account_type, ca.industry
FROM crm_accounts ca
INNER JOIN crm_leads cl ON cl.account_id = ca.account_id
WHERE cl.person_id = $1::uuid
LIMIT 5
""",
person_id,
)
account_links = [
{
"account_id": str(r["account_id"]),
"account_name": r["account_name"],
"account_type": r["account_type"],
"industry": r["industry"],
}
for r in account_rows
]
# 3. Active lead
lead_row = await conn.fetchrow(
"""
SELECT lead_id, status, budget_band, urgency, financing_posture,
timeline_to_decision, objections, motivations, created_at
FROM crm_leads
WHERE person_id = $1::uuid
ORDER BY created_at DESC
LIMIT 1
""",
person_id,
)
lead = _serialize_lead(lead_row) if lead_row else None
# 4. Active opportunities (top 5)
opp_rows = await conn.fetch(
"""
SELECT co.opportunity_id, co.stage, co.value, co.probability,
co.expected_close_date, co.next_action, co.project_id, co.unit_id
FROM crm_opportunities co
INNER JOIN crm_leads cl ON cl.lead_id = co.lead_id
WHERE cl.person_id = $1::uuid
ORDER BY co.updated_at DESC
LIMIT 5
""",
person_id,
)
active_opportunities = [_serialize_opportunity(r) for r in opp_rows]
# 5. Recent interactions (last 10)
interaction_rows = await conn.fetch(
"""
SELECT interaction_id, channel, interaction_type, happened_at, summary
FROM intel_interactions
WHERE person_id = $1::uuid
ORDER BY happened_at DESC
LIMIT 10
""",
person_id,
)
recent_interactions = [_serialize_interaction(r) for r in interaction_rows]
# 6. Property interests
interest_rows = await conn.fetch(
"""
SELECT interest_id, project_name, unit_preference, configuration,
budget_min, budget_max, priority
FROM crm_property_interests
WHERE person_id = $1::uuid
ORDER BY priority ASC, interest_id ASC
LIMIT 10
""",
person_id,
)
property_interests = [_serialize_property_interest(r) for r in interest_rows]
# 7. Pending tasks / reminders
task_rows = await conn.fetch(
"""
SELECT reminder_id, reminder_type, title, due_at, status, priority
FROM intel_reminders
WHERE person_id = $1::uuid
AND status IN ('pending', 'snoozed')
ORDER BY due_at ASC NULLS LAST
LIMIT 10
""",
person_id,
)
tasks = [_serialize_reminder(r) for r in task_rows]
# 8. QD overview (all score types)
qd_rows = await conn.fetch(
"""
SELECT score_type, current_value, computed_at, reasoning
FROM intel_qd_scores
WHERE person_id = $1::uuid
""",
person_id,
)
qd_overview = {r["score_type"]: _serialize_qd_score(r) for r in qd_rows}
# 9. Risk flags — heuristic derivation
risk_flags: list[str] = []
if lead and lead.get("urgency") in ("high", "critical") and not active_opportunities:
risk_flags.append("high_urgency_without_active_opportunity")
if not recent_interactions:
risk_flags.append("no_recent_interactions")
if qd_overview.get("intent_score", {}).get("current_value", 1.0) < 0.3:
risk_flags.append("low_intent_score")
if not property_interests:
risk_flags.append("no_property_interests_recorded")
# 10. Recommended next actions — simple heuristic
recommended_next_actions: list[str] = []
if tasks:
overdue = [t for t in tasks if t.get("status") == "pending"]
if overdue:
recommended_next_actions.append(f"Complete pending task: {overdue[0]['title']}")
if lead and lead.get("urgency") in ("high", "critical"):
recommended_next_actions.append("High-urgency client — prioritize callback within 24h")
if not recent_interactions and lead:
recommended_next_actions.append("No recent interactions — schedule follow-up")
return {
"client_ref": person_id,
"snapshot_type": "client_360",
"identity": identity,
"account_links": account_links,
"current_lead": lead,
"active_opportunities": active_opportunities,
"recent_interactions": recent_interactions,
"property_interests": property_interests,
"tasks": tasks,
"qd_overview": qd_overview,
"risk_flags": risk_flags,
"recommended_next_actions": recommended_next_actions,
"note": "Derived read model. Not primary truth. Refresh from canonical tables.",
}
async def get_contact_list(
conn: Any,
search: str | None = None,
buyer_type: str | None = None,
status: str | None = None,
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
"""
Paginated contact list with lead status and QD summary.
Implements the 'summary query' pattern from Doc 09.
"""
clauses: list[str] = ["1=1"]
params: list[Any] = []
if search:
params.append(f"%{search}%")
clauses.append(
f"(p.full_name ILIKE ${len(params)} OR p.primary_email ILIKE ${len(params)} OR p.primary_phone ILIKE ${len(params)})"
)
if buyer_type:
params.append(buyer_type)
clauses.append(f"p.buyer_type = ${len(params)}")
if status:
params.append(status)
clauses.append(f"cl.status = ${len(params)}::crm_lead_status")
where = "WHERE " + " AND ".join(clauses)
params_for_count = params.copy()
params.append(limit)
params.append(offset)
query = f"""
SELECT
p.person_id,
p.full_name,
p.primary_email,
p.primary_phone,
p.buyer_type,
p.created_at,
cl.lead_id,
cl.status AS lead_status,
cl.budget_band,
cl.urgency,
COALESCE(qs.intent_value, 0.0) AS intent_score,
COALESCE(qs.urgency_value, 0.0) AS urgency_score,
(SELECT COUNT(*) FROM intel_interactions ii WHERE ii.person_id = p.person_id) AS interaction_count,
(SELECT MAX(happened_at) FROM intel_interactions ii WHERE ii.person_id = p.person_id) AS last_interaction_at,
(SELECT COUNT(*) FROM intel_reminders ir WHERE ir.person_id = p.person_id AND ir.status = 'pending') AS pending_tasks
FROM crm_people p
LEFT JOIN LATERAL (
SELECT lead_id, status, budget_band, urgency
FROM crm_leads
WHERE person_id = p.person_id
ORDER BY created_at DESC
LIMIT 1
) cl ON TRUE
LEFT JOIN LATERAL (
SELECT
MAX(CASE WHEN score_type = 'intent_score' THEN current_value END) AS intent_value,
MAX(CASE WHEN score_type = 'urgency_score' THEN current_value END) AS urgency_value
FROM intel_qd_scores
WHERE person_id = p.person_id
) qs ON TRUE
{where}
ORDER BY last_interaction_at DESC NULLS LAST, p.created_at DESC
LIMIT ${len(params) - 1} OFFSET ${len(params)}
"""
count_query = f"""
SELECT COUNT(*)
FROM crm_people p
LEFT JOIN crm_leads cl ON cl.person_id = p.person_id
{where}
"""
rows = await conn.fetch(query, *params)
total_row = await conn.fetchrow(count_query, *params_for_count)
total = int(total_row[0]) if total_row else 0
contacts = []
for r in rows:
contacts.append({
"person_id": str(r["person_id"]),
"full_name": r["full_name"],
"primary_email": r["primary_email"],
"primary_phone": r["primary_phone"],
"buyer_type": r["buyer_type"],
"lead_id": str(r["lead_id"]) if r["lead_id"] else None,
"lead_status": r["lead_status"],
"budget_band": r["budget_band"],
"urgency": r["urgency"],
"intent_score": float(r["intent_score"]),
"urgency_score": float(r["urgency_score"]),
"interaction_count": int(r["interaction_count"]),
"last_interaction_at": r["last_interaction_at"].isoformat() if r["last_interaction_at"] else None,
"pending_tasks": int(r["pending_tasks"]),
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
})
return {
"contacts": contacts,
"total": total,
"limit": limit,
"offset": offset,
}

View File

@@ -0,0 +1,3 @@
"""
backend/services/imports/__init__.py
"""

View File

@@ -0,0 +1,282 @@
"""
backend/services/imports/ingest_service.py
CRM Import Ingestion Service
Implements the RawImportBatch → ImportMappingManifest → NormalizedEntityProposal pipeline
as specified in Doc 08 (Adapter Spec) and Doc 07 (Contracts and Schema Blueprint).
Flow:
1. receive CSV upload, store raw batch record
2. parse headers and infer column mapping
3. validate row structure, detect unresolved columns
4. create NormalizedEntityProposal records for review
5. queue for human approval before canonical commit
"""
from __future__ import annotations
import csv
import io
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger("velocity.imports.ingest")
# ── Column mapping heuristics ─────────────────────────────────────────────────
# Maps common source column names → canonical crm_people / crm_leads fields.
CANONICAL_COLUMN_MAP: dict[str, str] = {
# Identity
"name": "full_name",
"full name": "full_name",
"client name": "full_name",
"contact name": "full_name",
"first name": "full_name",
"customer name": "full_name",
# Email
"email": "primary_email",
"email address": "primary_email",
"e-mail": "primary_email",
# Phone
"phone": "primary_phone",
"mobile": "primary_phone",
"contact number": "primary_phone",
"mobile number": "primary_phone",
"phone number": "primary_phone",
# Budget
"budget": "budget_band",
"budget range": "budget_band",
"investment budget": "budget_band",
# Project interest
"project": "project_name",
"project name": "project_name",
"interested in": "project_name",
"property interest": "project_name",
# Source
"source": "source_system",
"lead source": "source_system",
"channel": "source_system",
# Status / Stage
"status": "status",
"lead status": "status",
"stage": "status",
"funnel stage": "status",
# Notes
"notes": "notes",
"remarks": "notes",
"comment": "notes",
"comments": "notes",
# Buyer type
"type": "buyer_type",
"client type": "buyer_type",
"category": "buyer_type",
}
REQUIRED_CANONICAL_FIELDS = {"full_name"}
HIGH_RISK_FIELDS = {"primary_email", "primary_phone"}
def _normalize_header(h: str) -> str:
return h.strip().lower().replace("_", " ")
def infer_column_mapping(headers: list[str]) -> dict[str, Any]:
"""
Produce an ImportMappingManifest-compatible mapping dict.
Returns: {
mapped: {source_col → canonical_field},
unmapped: [source_col, ...],
confidence: 0.0-1.0
}
"""
mapped: dict[str, str] = {}
unmapped: list[str] = []
for h in headers:
normalized = _normalize_header(h)
canonical = CANONICAL_COLUMN_MAP.get(normalized)
if canonical:
mapped[h] = canonical
else:
unmapped.append(h)
mapped_count = len(mapped)
total = len(headers)
confidence = mapped_count / total if total > 0 else 0.0
return {
"mapped": mapped,
"unmapped": unmapped,
"mapped_count": mapped_count,
"unmapped_count": len(unmapped),
"confidence": round(confidence, 3),
}
def parse_csv_content(content: str) -> dict[str, Any]:
"""
Parse CSV content, detect headers, and extract rows.
Returns: {headers, rows, row_count, parse_errors}
"""
reader = csv.DictReader(io.StringIO(content))
headers = reader.fieldnames or []
rows: list[dict[str, Any]] = []
parse_errors: list[str] = []
for i, row in enumerate(reader):
try:
rows.append(dict(row))
except Exception as e:
parse_errors.append(f"Row {i + 2}: {str(e)}")
return {
"headers": list(headers),
"rows": rows,
"row_count": len(rows),
"parse_errors": parse_errors,
}
def build_normalized_proposals(
rows: list[dict[str, Any]],
mapping: dict[str, str],
batch_id: str,
source_system: str = "csv_upload",
) -> list[dict[str, Any]]:
"""
Convert raw CSV rows to NormalizedEntityProposal payloads.
One proposal per row — each must be approved before canonical commit.
"""
proposals: list[dict[str, Any]] = []
now = datetime.now(timezone.utc).isoformat()
for i, row in enumerate(rows):
canonical: dict[str, Any] = {}
unresolved: list[str] = []
confidence = 1.0
for src_col, canonical_field in mapping.items():
val = row.get(src_col, "").strip()
if val:
canonical[canonical_field] = val
else:
unresolved.append(src_col)
# Validate required fields
review_required = False
missing_required = [f for f in REQUIRED_CANONICAL_FIELDS if not canonical.get(f)]
if missing_required:
review_required = True
confidence = max(0.0, confidence - 0.4)
# Flag high-risk fields (email/phone) if empty
missing_high_risk = [f for f in HIGH_RISK_FIELDS if not canonical.get(f)]
if missing_high_risk:
confidence = max(0.0, confidence - 0.1 * len(missing_high_risk))
proposal: dict[str, Any] = {
"proposal_id": str(uuid.uuid4()),
"batch_id": batch_id,
"row_number": i + 2,
"entity_type": "crm_person_with_lead",
"canonical_payload": canonical,
"raw_row": row,
"unresolved_fields": unresolved,
"missing_required": missing_required,
"confidence": round(confidence, 3),
"review_required": review_required,
"status": "proposed",
"created_at": now,
"source_system": source_system,
}
proposals.append(proposal)
return proposals
def create_import_batch_record(
filename: str,
row_count: int,
mapping_manifest: dict[str, Any],
source_system: str = "csv_upload",
uploaded_by_id: str | None = None,
) -> dict[str, Any]:
"""
Build the workflow_import_batches record payload.
"""
now = datetime.now(timezone.utc).isoformat()
return {
"batch_id": str(uuid.uuid4()),
"source_system": source_system,
"uploaded_filename": filename,
"mime_type": "text/csv",
"row_count": row_count,
"mapped_count": mapping_manifest.get("mapped_count", 0),
"unresolved_count": mapping_manifest.get("unmapped_count", 0),
"uploaded_by": uploaded_by_id,
"lifecycle": "parsed",
"mapping_manifest": mapping_manifest,
"created_at": now,
"updated_at": now,
}
async def persist_import_batch(conn: Any, batch: dict[str, Any]) -> str:
"""
Insert a workflow_import_batches row and return batch_id.
"""
await conn.execute(
"""
INSERT INTO workflow_import_batches (
batch_id, source_system, uploaded_filename, mime_type, row_count,
mapped_count, unresolved_count, uploaded_by, lifecycle, mapping_manifest,
created_at, updated_at
) VALUES (
$1::uuid, $2, $3, $4, $5, $6, $7,
$8::uuid, $9::import_lifecycle, $10::jsonb, NOW(), NOW()
)
""",
batch["batch_id"],
batch["source_system"],
batch.get("uploaded_filename", "unknown.csv"),
batch.get("mime_type", "text/csv"),
batch.get("row_count", 0),
batch.get("mapped_count", 0),
batch.get("unresolved_count", 0),
batch.get("uploaded_by"),
batch.get("lifecycle", "parsed"),
json.dumps(batch.get("mapping_manifest", {})),
)
return batch["batch_id"]
async def persist_proposals_as_workflow_actions(
conn: Any, proposals: list[dict[str, Any]]
) -> int:
"""
Insert proposals into workflow_actions table for human review.
Returns inserted count.
"""
inserted = 0
for p in proposals:
await conn.execute(
"""
INSERT INTO workflow_actions (
action_id, action_type, target_domain, proposal_payload,
reasoning_summary, confidence, status, approval_required,
created_by_agent, created_at, updated_at
) VALUES (
$1::uuid, 'import_proposal', 'crm', $2::jsonb,
$3, $4, 'pending'::wf_status, $5, 'ingest_service', NOW(), NOW()
)
""",
p["proposal_id"],
json.dumps(p),
f"Import row {p['row_number']}: {p['canonical_payload'].get('full_name', 'unknown')}",
p["confidence"],
p["review_required"],
)
inserted += 1
return inserted