Files
Project_Velocity/backend/services/imports/ingest_service.py

287 lines
8.8 KiB
Python

"""
backend/services/imports/ingest_service.py
CRM Import Ingestion Service
Implements the RawImportBatch → ImportMappingManifest → NormalizedEntityProposal pipeline
as specified in Doc 08 (Adapter Spec) and Doc 07 (Contracts and Schema Blueprint).
Flow:
1. receive CSV upload, store raw batch record
2. parse headers and infer column mapping
3. validate row structure, detect unresolved columns
4. create NormalizedEntityProposal records for review
5. queue for human approval before canonical commit
"""
from __future__ import annotations
import csv
import io
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger("velocity.imports.ingest")
# ── Column mapping heuristics ─────────────────────────────────────────────────
# Maps common source column names → canonical crm_people / crm_leads fields.
CANONICAL_COLUMN_MAP: dict[str, str] = {
# Identity
"name": "full_name",
"full name": "full_name",
"client name": "full_name",
"contact name": "full_name",
"first name": "full_name",
"customer name": "full_name",
# Email
"email": "primary_email",
"email address": "primary_email",
"e-mail": "primary_email",
# Phone
"phone": "primary_phone",
"mobile": "primary_phone",
"contact number": "primary_phone",
"mobile number": "primary_phone",
"phone number": "primary_phone",
# Budget
"budget": "budget_band",
"budget range": "budget_band",
"investment budget": "budget_band",
# Project interest
"project": "project_name",
"project name": "project_name",
"interested in": "project_name",
"property interest": "project_name",
# Source
"source": "source_system",
"lead source": "source_system",
"channel": "source_system",
# Status / Stage
"status": "status",
"lead status": "status",
"stage": "status",
"funnel stage": "status",
# Notes
"notes": "notes",
"remarks": "notes",
"comment": "notes",
"comments": "notes",
# Buyer type
"type": "buyer_type",
"client type": "buyer_type",
"category": "buyer_type",
}
REQUIRED_CANONICAL_FIELDS = {"full_name"}
HIGH_RISK_FIELDS = {"primary_email", "primary_phone"}
def _normalize_header(h: str) -> str:
return h.strip().lower().replace("_", " ")
def infer_column_mapping(headers: list[str]) -> dict[str, Any]:
"""
Produce an ImportMappingManifest-compatible mapping dict.
Returns: {
mapped: {source_col → canonical_field},
unmapped: [source_col, ...],
confidence: 0.0-1.0
}
"""
mapped: dict[str, str] = {}
unmapped: list[str] = []
for h in headers:
normalized = _normalize_header(h)
canonical = CANONICAL_COLUMN_MAP.get(normalized)
if canonical:
mapped[h] = canonical
else:
unmapped.append(h)
mapped_count = len(mapped)
total = len(headers)
confidence = mapped_count / total if total > 0 else 0.0
return {
"mapped": mapped,
"unmapped": unmapped,
"mapped_count": mapped_count,
"unmapped_count": len(unmapped),
"confidence": round(confidence, 3),
}
def parse_csv_content(content: str) -> dict[str, Any]:
"""
Parse CSV content, detect headers, and extract rows.
Returns: {headers, rows, row_count, parse_errors}
"""
reader = csv.DictReader(io.StringIO(content))
headers = reader.fieldnames or []
rows: list[dict[str, Any]] = []
parse_errors: list[str] = []
for i, row in enumerate(reader):
try:
rows.append(dict(row))
except Exception as e:
parse_errors.append(f"Row {i + 2}: {str(e)}")
return {
"headers": list(headers),
"rows": rows,
"row_count": len(rows),
"parse_errors": parse_errors,
}
def build_normalized_proposals(
rows: list[dict[str, Any]],
mapping: dict[str, str],
batch_id: str,
source_system: str = "csv_upload",
) -> list[dict[str, Any]]:
"""
Convert raw CSV rows to NormalizedEntityProposal payloads.
One proposal per row — each must be approved before canonical commit.
"""
proposals: list[dict[str, Any]] = []
now = datetime.now(timezone.utc).isoformat()
for i, row in enumerate(rows):
canonical: dict[str, Any] = {}
unresolved: list[str] = []
confidence = 1.0
for src_col, canonical_field in mapping.items():
val = row.get(src_col, "").strip()
if val:
canonical[canonical_field] = val
else:
unresolved.append(src_col)
# Validate required fields
review_required = False
missing_required = [f for f in REQUIRED_CANONICAL_FIELDS if not canonical.get(f)]
if missing_required:
review_required = True
confidence = max(0.0, confidence - 0.4)
# Flag high-risk fields (email/phone) if empty
missing_high_risk = [f for f in HIGH_RISK_FIELDS if not canonical.get(f)]
if missing_high_risk:
confidence = max(0.0, confidence - 0.1 * len(missing_high_risk))
proposal: dict[str, Any] = {
"proposal_id": str(uuid.uuid4()),
"batch_id": batch_id,
"row_number": i + 2,
"entity_type": "crm_person_with_lead",
"canonical_payload": canonical,
"raw_row": row,
"unresolved_fields": unresolved,
"missing_required": missing_required,
"confidence": round(confidence, 3),
"review_required": review_required,
"status": "proposed",
"created_at": now,
"source_system": source_system,
}
proposals.append(proposal)
return proposals
def create_import_batch_record(
filename: str,
row_count: int,
mapping_manifest: dict[str, Any],
source_system: str = "csv_upload",
uploaded_by_id: str | None = None,
tenant_id: str | None = None,
) -> dict[str, Any]:
"""
Build the workflow_import_batches record payload.
"""
now = datetime.now(timezone.utc).isoformat()
return {
"batch_id": str(uuid.uuid4()),
"source_system": source_system,
"uploaded_filename": filename,
"mime_type": "text/csv",
"row_count": row_count,
"mapped_count": mapping_manifest.get("mapped_count", 0),
"unresolved_count": mapping_manifest.get("unmapped_count", 0),
"uploaded_by": uploaded_by_id,
"tenant_id": tenant_id,
"lifecycle": "parsed",
"mapping_manifest": mapping_manifest,
"created_at": now,
"updated_at": now,
}
async def persist_import_batch(conn: Any, batch: dict[str, Any]) -> str:
"""
Insert a workflow_import_batches row and return batch_id.
"""
await conn.execute(
"""
INSERT INTO workflow_import_batches (
batch_id, tenant_id, source_system, uploaded_filename, mime_type, row_count,
mapped_count, unresolved_count, uploaded_by, lifecycle, mapping_manifest,
created_at, updated_at
) VALUES (
$1::uuid, $2, $3, $4, $5, $6, $7, $8,
$9::uuid, $10::import_lifecycle, $11::jsonb, NOW(), NOW()
)
""",
batch["batch_id"],
batch["tenant_id"],
batch["source_system"],
batch.get("uploaded_filename", "unknown.csv"),
batch.get("mime_type", "text/csv"),
batch.get("row_count", 0),
batch.get("mapped_count", 0),
batch.get("unresolved_count", 0),
batch.get("uploaded_by"),
batch.get("lifecycle", "parsed"),
json.dumps(batch.get("mapping_manifest", {})),
)
return batch["batch_id"]
async def persist_proposals_as_workflow_actions(
conn: Any, proposals: list[dict[str, Any]], tenant_id: str
) -> int:
"""
Insert proposals into workflow_actions table for human review.
Returns inserted count.
"""
inserted = 0
for p in proposals:
await conn.execute(
"""
INSERT INTO workflow_actions (
action_id, tenant_id, action_type, target_domain, proposal_payload,
reasoning_summary, confidence, status, approval_required,
created_by_agent, created_at, updated_at
) VALUES (
$1::uuid, $2, 'import_proposal', 'crm', $3::jsonb,
$4, $5, 'pending'::wf_status, $6, 'ingest_service', NOW(), NOW()
)
""",
p["proposal_id"],
tenant_id,
json.dumps(p),
f"Import row {p['row_number']}: {p['canonical_payload'].get('full_name', 'unknown')}",
p["confidence"],
p["review_required"],
)
inserted += 1
return inserted