""" backend/api/routes_crm_imports.py CRM Import Route Family + Client 360 Routes Implements the canonical CRM API surface as specified in Doc 09 (Root API Spec): POST /api/crm/imports — upload CSV batch GET /api/crm/imports — list import batches GET /api/crm/imports/{id} — get batch detail + proposals PUT /api/crm/imports/{id}/approve-proposal — approve a single proposal POST /api/crm/imports/{id}/commit — commit approved proposals to canonical GET /api/crm/contacts — canonical contact list (with QD summary) GET /api/crm/contacts/{id} — canonical contact detail GET /api/crm/client-360/{id} — Client 360 aggregated snapshot GET /api/crm/opportunities — opportunity pipeline list GET /api/crm/tasks — reminder/task list GET /api/crm/kanban — kanban board (canonical leads) Uses canonical crm_*, intel_*, workflow_* tables. """ from __future__ import annotations import json import logging import uuid from datetime import datetime, timezone from typing import Any from fastapi import APIRouter, HTTPException, Query, Request, UploadFile, File, status from pydantic import BaseModel, Field from backend.services.imports.ingest_service import ( parse_csv_content, infer_column_mapping, build_normalized_proposals, create_import_batch_record, persist_import_batch, persist_proposals_as_workflow_actions, ) from backend.services.client_graph.aggregation_service import ( get_client_360, get_contact_list, ) logger = logging.getLogger("velocity.api.crm_imports") router = APIRouter() def _now() -> str: return datetime.now(timezone.utc).isoformat() async def _get_pool(request: Request): pool = getattr(request.app.state, "db_pool", None) if pool is None: raise HTTPException(status_code=503, detail="Database unavailable.") return pool # ── Models ──────────────────────────────────────────────────────────────────── class ProposalApprovalRequest(BaseModel): proposal_id: str decision: str = Field(..., pattern="^(approved|rejected|needs_more_info)$") notes: str = Field(default="", max_length=2000) class CreatePersonRequest(BaseModel): full_name: str = Field(..., min_length=1, max_length=200) primary_email: str | None = None primary_phone: str | None = None buyer_type: str | None = None budget_band: str | None = None project_name: str | None = None source_system: str = "manual" notes: str | None = None metadata_json: dict[str, Any] = Field(default_factory=dict) class CreateLeadRequest(BaseModel): person_id: str status: str = "new" budget_band: str | None = None urgency: str | None = None financing_posture: str | None = None assigned_user_id: str | None = None metadata_json: dict[str, Any] = Field(default_factory=dict) class CreateReminderRequest(BaseModel): person_id: str lead_id: str | None = None reminder_type: str = "follow_up" title: str = Field(..., min_length=1, max_length=500) notes: str | None = None due_at: str | None = None priority: str = "normal" class ClientDataPatchRequest(BaseModel): full_name: str | None = Field(default=None, max_length=256) primary_email: str | None = Field(default=None, max_length=256) primary_phone: str | None = Field(default=None, max_length=64) buyer_type: str | None = Field(default=None, max_length=128) communication_preference: str | None = Field(default=None, max_length=64) best_contact_time: str | None = Field(default=None, max_length=128) lead_status: str | None = Field(default=None, max_length=64) budget_band: str | None = Field(default=None, max_length=128) urgency: str | None = Field(default=None, max_length=64) # ── Import Endpoints ────────────────────────────────────────────────────────── @router.post("/crm/imports", status_code=201, tags=["CRM Imports"]) async def upload_crm_import( request: Request, file: UploadFile = File(...), source_system: str = Query(default="csv_upload"), ) -> dict[str, Any]: """ Upload a CSV file to start a CRM import batch. Parses headers, infers column mapping, and creates workflow_actions proposals. """ pool = await _get_pool(request) content_bytes = await file.read() try: content = content_bytes.decode("utf-8") except UnicodeDecodeError: content = content_bytes.decode("latin-1") parsed = parse_csv_content(content) mapping = infer_column_mapping(parsed["headers"]) batch = create_import_batch_record( filename=file.filename or "upload.csv", row_count=parsed["row_count"], mapping_manifest=mapping, source_system=source_system, ) proposals = build_normalized_proposals( rows=parsed["rows"], mapping=mapping["mapped"], batch_id=batch["batch_id"], source_system=source_system, ) async with pool.acquire() as conn: await persist_import_batch(conn, batch) inserted = await persist_proposals_as_workflow_actions(conn, proposals) logger.info("Import batch %s: %d rows, %d proposals", batch["batch_id"], parsed["row_count"], inserted) return { "status": "ok", "data": { "batch_id": batch["batch_id"], "row_count": parsed["row_count"], "mapped_columns": mapping["mapped_count"], "unmapped_columns": mapping["unmapped_count"], "mapping_confidence": mapping["confidence"], "proposals_created": inserted, "parse_errors": parsed["parse_errors"], "lifecycle": "parsed", "message": f"Import batch created. {inserted} proposals queued for review.", }, } @router.get("/crm/imports", tags=["CRM Imports"]) async def list_import_batches( request: Request, lifecycle: str | None = None, limit: int = Query(default=20, ge=1, le=100), offset: int = Query(default=0, ge=0), ) -> dict[str, Any]: """List all CRM import batches with lifecycle status.""" pool = await _get_pool(request) clauses = ["1=1"] params: list[Any] = [] if lifecycle: params.append(lifecycle) clauses.append(f"lifecycle = ${len(params)}::import_lifecycle") params.extend([limit, offset]) where = " AND ".join(clauses) async with pool.acquire() as conn: rows = await conn.fetch( f""" SELECT batch_id, source_system, uploaded_filename, row_count, mapped_count, unresolved_count, lifecycle, created_at, updated_at FROM workflow_import_batches WHERE {where} ORDER BY created_at DESC LIMIT ${len(params) - 1} OFFSET ${len(params)} """, *params, ) total = await conn.fetchval( f"SELECT COUNT(*) FROM workflow_import_batches WHERE {where}", *params[:-2], ) batches = [ { "batch_id": str(r["batch_id"]), "source_system": r["source_system"], "filename": r["uploaded_filename"], "row_count": r["row_count"], "mapped_count": r["mapped_count"], "unresolved_count": r["unresolved_count"], "lifecycle": r["lifecycle"], "created_at": r["created_at"].isoformat() if r["created_at"] else None, } for r in rows ] return {"status": "ok", "data": batches, "meta": {"total": total, "limit": limit, "offset": offset}} @router.get("/crm/imports/{batch_id}", tags=["CRM Imports"]) async def get_import_batch(request: Request, batch_id: str) -> dict[str, Any]: """Get import batch detail including pending proposals.""" pool = await _get_pool(request) async with pool.acquire() as conn: batch_row = await conn.fetchrow( "SELECT * FROM workflow_import_batches WHERE batch_id = $1::uuid", batch_id, ) if not batch_row: raise HTTPException(status_code=404, detail=f"Import batch '{batch_id}' not found.") proposal_rows = await conn.fetch( """ SELECT action_id, proposal_payload, confidence, status, approval_required, created_at FROM workflow_actions WHERE action_type = 'import_proposal' AND proposal_payload->>'batch_id' = $1 ORDER BY (proposal_payload->>'row_number')::int ASC LIMIT 200 """, batch_id, ) proposals = [ { "proposal_id": str(r["action_id"]), "payload": r["proposal_payload"], "confidence": float(r["confidence"]) if r["confidence"] else 0.0, "status": r["status"], "review_required": r["approval_required"], } for r in proposal_rows ] return { "status": "ok", "data": { "batch_id": str(batch_row["batch_id"]), "source_system": batch_row["source_system"], "filename": batch_row["uploaded_filename"], "row_count": batch_row["row_count"], "mapping_manifest": batch_row["mapping_manifest"], "lifecycle": batch_row["lifecycle"], "proposals": proposals, "proposal_count": len(proposals), }, } @router.put("/crm/imports/{batch_id}/review-proposal", tags=["CRM Imports"]) async def review_proposal( request: Request, batch_id: str, body: ProposalApprovalRequest ) -> dict[str, Any]: """ Human review of a single import proposal. Creates a workflow_approvals record and updates the action status. Approved actions with high confidence may be auto-staged for commit. """ pool = await _get_pool(request) async with pool.acquire() as conn: action = await conn.fetchrow( "SELECT action_id, confidence, approval_required FROM workflow_actions WHERE action_id = $1::uuid", body.proposal_id, ) if not action: raise HTTPException(status_code=404, detail="Proposal not found.") decision_id = str(uuid.uuid4()) new_status = "approved" if body.decision == "approved" else "rejected" await conn.execute( """ INSERT INTO workflow_approvals (decision_id, action_id, decision, decision_notes, decided_at) VALUES ($1::uuid, $2::uuid, $3, $4, NOW()) """, decision_id, body.proposal_id, body.decision, body.notes, ) await conn.execute( "UPDATE workflow_actions SET status = $1::wf_status, updated_at = NOW() WHERE action_id = $2::uuid", new_status, body.proposal_id, ) return { "status": "ok", "data": { "decision_id": decision_id, "proposal_id": body.proposal_id, "decision": body.decision, "message": f"Proposal {body.decision}.", }, } @router.post("/crm/imports/{batch_id}/commit", tags=["CRM Imports"]) async def commit_approved_proposals(request: Request, batch_id: str) -> dict[str, Any]: """ Commit all approved proposals for a batch into canonical crm_people + crm_leads tables. Only approved proposals are committed. Rejected/pending are skipped. This implements the writeback flow from Doc 07 and Doc 09. """ pool = await _get_pool(request) committed = 0 skipped = 0 errors: list[str] = [] async with pool.acquire() as conn: approved_rows = await conn.fetch( """ SELECT action_id, proposal_payload FROM workflow_actions WHERE action_type = 'import_proposal' AND proposal_payload->>'batch_id' = $1 AND status = 'approved' """, batch_id, ) for row in approved_rows: try: payload = row["proposal_payload"] canonical = payload.get("canonical_payload", {}) if not canonical.get("full_name"): skipped += 1 continue person_id = str(uuid.uuid4()) await conn.execute( """ INSERT INTO crm_people ( person_id, full_name, primary_email, primary_phone, buyer_type, source_confidence, metadata_json, created_at, updated_at ) VALUES ( $1::uuid, $2, $3, $4, $5, $6, $7::jsonb, NOW(), NOW() ) ON CONFLICT DO NOTHING """, person_id, canonical.get("full_name"), canonical.get("primary_email"), canonical.get("primary_phone"), canonical.get("buyer_type"), payload.get("confidence", 0.5), json.dumps({"source_batch": batch_id, "import_row": payload.get("row_number")}), ) if canonical.get("status") or canonical.get("budget_band"): lead_id = str(uuid.uuid4()) await conn.execute( """ INSERT INTO crm_leads ( lead_id, person_id, source_system, status, budget_band, metadata_json, created_at, updated_at ) VALUES ( $1::uuid, $2::uuid, $3, 'new'::crm_lead_status, $4, $5::jsonb, NOW(), NOW() ) ON CONFLICT DO NOTHING """, lead_id, person_id, payload.get("source_system", "csv_upload"), canonical.get("budget_band"), json.dumps({"import_batch": batch_id}), ) # Stage property interest if project_name present if canonical.get("project_name"): await conn.execute( """ INSERT INTO crm_property_interests ( interest_id, person_id, lead_id, project_name, created_at ) VALUES ( $1::uuid, $2::uuid, $3::uuid, $4, NOW() ) ON CONFLICT DO NOTHING """, str(uuid.uuid4()), person_id, lead_id, canonical.get("project_name"), ) # Mark action as executed await conn.execute( "UPDATE workflow_actions SET status = 'executed'::wf_status, updated_at = NOW() WHERE action_id = $1::uuid", row["action_id"], ) committed += 1 except Exception as e: errors.append(f"Proposal {row['action_id']}: {str(e)}") skipped += 1 # Update batch lifecycle await conn.execute( "UPDATE workflow_import_batches SET lifecycle = 'committed'::import_lifecycle, canonical_count = $1, updated_at = NOW() WHERE batch_id = $2", committed, batch_id, ) return { "status": "ok", "data": { "batch_id": batch_id, "committed": committed, "skipped": skipped, "errors": errors, "lifecycle": "committed", }, } # ── Contact / Person Endpoints ────────────────────────────────────────────── @router.get("/crm/contacts", tags=["CRM Contacts"]) async def list_contacts( request: Request, search: str | None = Query(default=None), buyer_type: str | None = Query(default=None), status: str | None = Query(default=None), limit: int = Query(default=50, ge=1, le=200), offset: int = Query(default=0, ge=0), ) -> dict[str, Any]: """Canonical contact list with QD summary, interaction count, and lead status.""" pool = await _get_pool(request) async with pool.acquire() as conn: result = await get_contact_list(conn, search, buyer_type, status, limit, offset) return {"status": "ok", "data": result} @router.post("/crm/contacts", status_code=201, tags=["CRM Contacts"]) async def create_contact(request: Request, body: CreatePersonRequest) -> dict[str, Any]: """Create a new canonical person record manually.""" pool = await _get_pool(request) person_id = str(uuid.uuid4()) async with pool.acquire() as conn: await conn.execute( """ INSERT INTO crm_people ( person_id, full_name, primary_email, primary_phone, buyer_type, source_confidence, metadata_json, created_at, updated_at ) VALUES ($1::uuid, $2, $3, $4, $5, 1.0, $6::jsonb, NOW(), NOW()) """, person_id, body.full_name, body.primary_email, body.primary_phone, body.buyer_type, json.dumps({**body.metadata_json, "manual_entry": True}), ) if body.project_name or body.budget_band: lead_id = str(uuid.uuid4()) await conn.execute( """ INSERT INTO crm_leads ( lead_id, person_id, source_system, status, budget_band, metadata_json, created_at, updated_at ) VALUES ($1::uuid, $2::uuid, $3, 'new'::crm_lead_status, $4, '{}'::jsonb, NOW(), NOW()) """, lead_id, person_id, body.source_system, body.budget_band, ) if body.project_name: await conn.execute( """ INSERT INTO crm_property_interests (interest_id, person_id, lead_id, project_name, created_at) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, NOW()) """, str(uuid.uuid4()), person_id, lead_id, body.project_name, ) return {"status": "ok", "data": {"person_id": person_id, "full_name": body.full_name}} @router.get("/crm/contacts/{person_id}", tags=["CRM Contacts"]) async def get_contact(request: Request, person_id: str) -> dict[str, Any]: """Get a single canonical contact record.""" pool = await _get_pool(request) async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT person_id, full_name, primary_email, primary_phone, secondary_phone, buyer_type, persona_labels, source_confidence, created_at, updated_at FROM crm_people WHERE person_id = $1::uuid """, person_id, ) if not row: raise HTTPException(status_code=404, detail=f"Contact '{person_id}' not found.") return { "status": "ok", "data": { "person_id": str(row["person_id"]), "full_name": row["full_name"], "primary_email": row["primary_email"], "primary_phone": row["primary_phone"], "buyer_type": row["buyer_type"], "persona_labels": row["persona_labels"] or [], "source_confidence": float(row["source_confidence"] or 0.0), }, } # ── Client 360 Endpoint ──────────────────────────────────────────────────── @router.get("/crm/client-360/{person_id}", tags=["CRM Client 360"]) async def client_360(request: Request, person_id: str) -> dict[str, Any]: """ Aggregated Client360 dossier — identity, opportunities, interactions, property interests, tasks, QD overview, risk flags, and next actions. Derived read model — not primary truth. """ pool = await _get_pool(request) async with pool.acquire() as conn: snapshot = await get_client_360(conn, person_id) if not snapshot: raise HTTPException(status_code=404, detail=f"Client '{person_id}' not found.") return {"status": "ok", "data": snapshot} # ── Opportunities Endpoint ───────────────────────────────────────────────── @router.get("/crm/opportunities", tags=["CRM Opportunities"]) async def list_opportunities( request: Request, stage: str | None = None, limit: int = Query(default=50, ge=1, le=200), offset: int = Query(default=0, ge=0), ) -> dict[str, Any]: """Canonical opportunity pipeline list.""" pool = await _get_pool(request) clauses = ["1=1"] params: list[Any] = [] if stage: params.append(stage) clauses.append(f"co.stage = ${len(params)}::crm_opportunity_stage") params.extend([limit, offset]) where = " AND ".join(clauses) async with pool.acquire() as conn: rows = await conn.fetch( f""" SELECT co.opportunity_id, co.stage, co.value, co.probability, co.expected_close_date, co.next_action, p.person_id, p.full_name, p.primary_phone, ip.project_name, co.created_at, co.updated_at FROM crm_opportunities co INNER JOIN crm_leads cl ON cl.lead_id = co.lead_id INNER JOIN crm_people p ON p.person_id = cl.person_id LEFT JOIN inventory_projects ip ON ip.project_id = co.project_id WHERE {where} ORDER BY co.updated_at DESC LIMIT ${len(params) - 1} OFFSET ${len(params)} """, *params, ) opportunities = [ { "opportunity_id": str(r["opportunity_id"]), "stage": r["stage"], "value": float(r["value"]) if r["value"] else None, "probability": r["probability"], "expected_close_date": r["expected_close_date"].isoformat() if r["expected_close_date"] else None, "next_action": r["next_action"], "person_id": str(r["person_id"]), "client_name": r["full_name"], "client_phone": r["primary_phone"], "project_name": r["project_name"], } for r in rows ] return {"status": "ok", "data": opportunities, "meta": {"count": len(opportunities)}} # ── Tasks / Reminders Endpoint ───────────────────────────────────────────── @router.get("/crm/tasks", tags=["CRM Tasks"]) async def list_tasks( request: Request, status_filter: str | None = Query(default="pending", alias="status"), assigned_to: str | None = None, limit: int = Query(default=50, ge=1, le=200), ) -> dict[str, Any]: """Reminder / task inbox for the CRM operator.""" pool = await _get_pool(request) clauses = ["1=1"] params: list[Any] = [] if status_filter: params.append(status_filter) clauses.append(f"ir.status = ${len(params)}") if assigned_to: params.append(assigned_to) clauses.append(f"ir.assigned_to = ${len(params)}::uuid") params.append(limit) where = " AND ".join(clauses) async with pool.acquire() as conn: rows = await conn.fetch( f""" SELECT ir.reminder_id, ir.reminder_type, ir.title, ir.notes, ir.due_at, ir.status, ir.priority, p.person_id, p.full_name, p.primary_phone FROM intel_reminders ir INNER JOIN crm_people p ON p.person_id = ir.person_id WHERE {where} ORDER BY ir.due_at ASC NULLS LAST, ir.created_at DESC LIMIT ${len(params)} """, *params, ) tasks = [ { "reminder_id": str(r["reminder_id"]), "reminder_type": r["reminder_type"], "title": r["title"], "notes": r["notes"], "due_at": r["due_at"].isoformat() if r["due_at"] else None, "status": r["status"], "priority": r["priority"], "person_id": str(r["person_id"]), "client_name": r["full_name"], "client_phone": r["primary_phone"], } for r in rows ] return {"status": "ok", "data": tasks, "meta": {"count": len(tasks)}} @router.post("/crm/tasks", status_code=201, tags=["CRM Tasks"]) async def create_task(request: Request, body: CreateReminderRequest) -> dict[str, Any]: """Create a reminder / follow-up task.""" pool = await _get_pool(request) reminder_id = str(uuid.uuid4()) async with pool.acquire() as conn: due_dt = None if body.due_at: try: due_dt = datetime.fromisoformat(body.due_at) except ValueError: pass await conn.execute( """ INSERT INTO intel_reminders ( reminder_id, person_id, lead_id, reminder_type, title, notes, due_at, status, priority, created_by_type, created_at ) VALUES ( $1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7, 'pending', $8, 'human', NOW() ) """, reminder_id, body.person_id, body.lead_id, body.reminder_type, body.title, body.notes, due_dt, body.priority, ) return {"status": "ok", "data": {"reminder_id": reminder_id, "title": body.title}} # ── Canonical Kanban (from crm_leads) ───────────────────────────────────── @router.get("/crm/kanban", tags=["CRM Kanban"]) async def get_canonical_kanban(request: Request) -> dict[str, Any]: """ Canonical Kanban board from crm_leads table. Groups clients by lead status with QD summary. """ pool = await _get_pool(request) STAGES = ["new", "contacted", "qualified", "site_visit_scheduled", "site_visited", "negotiation", "booking_initiated", "booked", "lost", "dormant"] async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT cl.lead_id, cl.status, cl.budget_band, cl.urgency, p.person_id, p.full_name, p.primary_phone, p.buyer_type, COALESCE(qs.intent_value, 0.0) AS intent_score FROM crm_leads cl INNER JOIN crm_people p ON p.person_id = cl.person_id LEFT JOIN LATERAL ( SELECT MAX(CASE WHEN score_type = 'intent_score' THEN current_value END) AS intent_value FROM intel_qd_scores WHERE person_id = p.person_id ) qs ON TRUE ORDER BY qs.intent_value DESC NULLS LAST, cl.updated_at DESC """ ) grouped: dict[str, list[dict]] = {s: [] for s in STAGES} for r in rows: s = r["status"] or "new" grouped.setdefault(s, []).append({ "lead_id": str(r["lead_id"]), "person_id": str(r["person_id"]), "client_name": r["full_name"], "client_phone": r["primary_phone"], "buyer_type": r["buyer_type"], "budget_band": r["budget_band"], "urgency": r["urgency"], "intent_score": float(r["intent_score"]), }) board = [ { "status": s, "label": s.replace("_", " ").title(), "count": len(grouped.get(s, [])), "items": grouped.get(s, []), } for s in STAGES ] return {"status": "ok", "data": board} # ── QD Score Access ──────────────────────────────────────────────────────── @router.get("/crm/qd/{person_id}", tags=["CRM QD"]) async def get_qd_score(request: Request, person_id: str) -> dict[str, Any]: """QD score summary and recent timeseries for a client.""" pool = await _get_pool(request) async with pool.acquire() as conn: scores = await conn.fetch( "SELECT score_type, current_value, computed_at, reasoning FROM intel_qd_scores WHERE person_id = $1::uuid", person_id, ) timeseries = await conn.fetch( """ SELECT score_type, value, timestamp, signal_source, delta FROM intel_qd_timeseries WHERE person_id = $1::uuid ORDER BY timestamp DESC LIMIT 50 """, person_id, ) if not scores: raise HTTPException(status_code=404, detail=f"No QD scores for client '{person_id}'.") return { "status": "ok", "data": { "person_id": person_id, "scores": { r["score_type"]: { "current_value": float(r["current_value"]), "computed_at": r["computed_at"].isoformat() if r["computed_at"] else None, "reasoning": r["reasoning"], } for r in scores }, "timeseries": [ { "score_type": r["score_type"], "value": float(r["value"]), "timestamp": r["timestamp"].isoformat() if r["timestamp"] else None, "signal_source": r["signal_source"], "delta": float(r["delta"]) if r["delta"] else None, } for r in timeseries ], }, } # ── Oracle Client Data Lens ─────────────────────────────────────────────────── @router.get("/crm/client-data", tags=["CRM Client Data"]) async def list_client_data( request: Request, search: str | None = Query(default=None), limit: int = Query(default=50, ge=1, le=200), offset: int = Query(default=0, ge=0), ) -> dict[str, Any]: pool = await _get_pool(request) params: list[Any] = [] where = "1=1" if search: params.append(f"%{search.lower()}%") where = f"(lower(p.full_name) LIKE ${len(params)} OR lower(COALESCE(p.primary_phone,'')) LIKE ${len(params)} OR lower(COALESCE(p.primary_email,'')) LIKE ${len(params)} OR lower(COALESCE(pi.projects,'')) LIKE ${len(params)})" params.extend([limit, offset]) async with pool.acquire() as conn: rows = await conn.fetch( f""" WITH interests AS ( SELECT person_id, string_agg(DISTINCT project_name, ', ') AS projects, COUNT(*)::int AS interest_count FROM crm_property_interests GROUP BY person_id ), qd AS ( SELECT DISTINCT ON (person_id) person_id, current_value FROM intel_qd_scores ORDER BY person_id, current_value DESC, computed_at DESC ) SELECT p.person_id::text, p.full_name, p.primary_email, p.primary_phone, p.buyer_type, p.broker_name, p.communication_preference, p.best_contact_time, l.lead_id::text, l.status::text AS lead_status, l.budget_band, l.urgency, COALESCE(qd.current_value, p.engagement_score, 0)::float AS qd_score, lc.last_contact_at, lc.last_channel, lc.days_since_contact, nba.recommended_action AS next_best_action, nba.priority AS next_action_priority, COALESCE(pi.projects, '') AS projects, COALESCE(pi.interest_count, 0)::int AS interest_count FROM crm_people p LEFT JOIN LATERAL ( SELECT * FROM crm_leads l WHERE l.person_id = p.person_id ORDER BY l.updated_at DESC LIMIT 1 ) l ON TRUE LEFT JOIN interests pi ON pi.person_id = p.person_id LEFT JOIN qd ON qd.person_id = p.person_id LEFT JOIN read_last_contacted lc ON lc.person_id = p.person_id LEFT JOIN read_next_best_action nba ON nba.person_id = p.person_id WHERE {where} ORDER BY lc.last_contact_at DESC NULLS LAST, qd_score DESC, p.full_name ASC LIMIT ${len(params)-1} OFFSET ${len(params)} """, *params, ) return {"status": "ok", "data": [dict(r) for r in rows], "meta": {"count": len(rows), "limit": limit, "offset": offset}} @router.get("/crm/client-data/{person_id}", tags=["CRM Client Data"]) async def get_client_data(request: Request, person_id: str) -> dict[str, Any]: pool = await _get_pool(request) async with pool.acquire() as conn: base = await conn.fetchrow( """ SELECT p.*, l.lead_id::text, l.status::text AS lead_status, l.budget_band, l.urgency, l.financing_posture, l.timeline_to_decision, l.broker_team, lc.last_contact_at, lc.last_channel, lc.days_since_contact, nba.recommended_action, nba.priority AS next_action_priority, nba.rationale, nba.suggested_channel, nba.due_within_days FROM crm_people p LEFT JOIN LATERAL ( SELECT * FROM crm_leads l WHERE l.person_id = p.person_id ORDER BY l.updated_at DESC LIMIT 1 ) l ON TRUE LEFT JOIN read_last_contacted lc ON lc.person_id = p.person_id LEFT JOIN read_next_best_action nba ON nba.person_id = p.person_id WHERE p.person_id = $1::uuid """, person_id, ) if not base: raise HTTPException(status_code=404, detail=f"Client '{person_id}' not found.") interests = await conn.fetch("SELECT * FROM crm_property_interests WHERE person_id = $1::uuid ORDER BY priority ASC, created_at DESC", person_id) opportunities = await conn.fetch( """ SELECT o.*, ip.project_name FROM crm_opportunities o JOIN crm_leads l ON l.lead_id = o.lead_id LEFT JOIN inventory_projects ip ON ip.project_id = o.project_id WHERE l.person_id = $1::uuid ORDER BY o.updated_at DESC """, person_id, ) facts = await conn.fetch("SELECT * FROM intel_extracted_facts WHERE person_id = $1::uuid ORDER BY extracted_at DESC LIMIT 50", person_id) qd = await conn.fetch("SELECT * FROM intel_qd_scores WHERE person_id = $1::uuid ORDER BY current_value DESC", person_id) timeline = await _client_timeline(conn, person_id, 60) return { "status": "ok", "data": { "profile": dict(base), "property_interests": [dict(r) for r in interests], "opportunities": [dict(r) for r in opportunities], "extracted_facts": [dict(r) for r in facts], "qd_scores": [dict(r) for r in qd], "timeline": timeline, }, } @router.patch("/crm/client-data/{person_id}", tags=["CRM Client Data"]) async def patch_client_data(request: Request, person_id: str, body: ClientDataPatchRequest) -> dict[str, Any]: pool = await _get_pool(request) payload = body.model_dump(exclude_unset=True) person_fields = {k: payload[k] for k in ("full_name", "primary_email", "primary_phone", "buyer_type", "communication_preference", "best_contact_time") if k in payload} lead_fields = {k: payload[k] for k in ("budget_band", "urgency") if k in payload} async with pool.acquire() as conn: async with conn.transaction(): if person_fields: sets = ", ".join(f"{key} = ${idx}" for idx, key in enumerate(person_fields, start=1)) await conn.execute(f"UPDATE crm_people SET {sets}, updated_at = NOW() WHERE person_id = ${len(person_fields)+1}::uuid", *person_fields.values(), person_id) if lead_fields: lead_id = await conn.fetchval("SELECT lead_id FROM crm_leads WHERE person_id = $1::uuid ORDER BY updated_at DESC LIMIT 1", person_id) if lead_id: sets = ", ".join(f"{key} = ${idx}" for idx, key in enumerate(lead_fields, start=1)) await conn.execute(f"UPDATE crm_leads SET {sets}, updated_at = NOW() WHERE lead_id = ${len(lead_fields)+1}::uuid", *lead_fields.values(), str(lead_id)) if "lead_status" in payload: await conn.execute( "UPDATE crm_leads SET status = $1::crm_lead_status, updated_at = NOW() WHERE person_id = $2::uuid", payload["lead_status"], person_id, ) return {"status": "ok", "data": {"person_id": person_id, "updated": sorted(payload.keys())}} @router.get("/crm/client-data/{person_id}/timeline", tags=["CRM Client Data"]) async def get_client_data_timeline(request: Request, person_id: str, limit: int = Query(default=80, ge=1, le=200)) -> dict[str, Any]: pool = await _get_pool(request) async with pool.acquire() as conn: rows = await _client_timeline(conn, person_id, limit) return {"status": "ok", "data": rows} @router.post("/crm/client-data/{person_id}/tasks", status_code=201, tags=["CRM Client Data"]) async def create_client_data_task(request: Request, person_id: str, body: CreateReminderRequest) -> dict[str, Any]: patched = body.model_copy(update={"person_id": person_id}) return await create_task(request, patched) async def _client_timeline(conn: Any, person_id: str, limit: int) -> list[dict[str, Any]]: rows = await conn.fetch( """ SELECT * FROM ( SELECT i.interaction_id::text AS id, i.channel::text AS type, i.interaction_type AS title, i.summary, i.happened_at AS date, i.broker_name AS actor FROM intel_interactions i WHERE i.person_id = $1::uuid UNION ALL SELECT m.message_id::text, 'message', m.sender_role, m.message_text, m.delivered_at, m.sender_name FROM intel_messages m JOIN intel_interactions i ON i.interaction_id = m.interaction_id WHERE i.person_id = $1::uuid UNION ALL SELECT r.reminder_id::text, 'reminder', r.title, r.notes, r.due_at, r.priority FROM intel_reminders r WHERE r.person_id = $1::uuid UNION ALL SELECT v.visit_id::text, 'visit', COALESCE(v.outcome_type, 'site_visit'), COALESCE(v.visit_notes, v.broker_notes), v.visited_at, v.broker_name FROM intel_visits v WHERE v.person_id = $1::uuid UNION ALL SELECT q.timeseries_id::text, 'qd', q.score_type, q.signal_source, q.timestamp, q.value::text FROM intel_qd_timeseries q WHERE q.person_id = $1::uuid ) events ORDER BY date DESC NULLS LAST LIMIT $2 """, person_id, limit, ) return [dict(r) for r in rows]