Files
Project_Velocity/backend/oracle/semantic_catalog.py

361 lines
18 KiB
Python

"""
oracle/semantic_catalog.py
Business-semantic layer for Oracle's natural DB planner.
This sits between raw schema introspection and SQL generation. It defines:
- authoritative tables and columns for business concepts
- deprecated or sparse fields the planner should avoid
- preferred join paths
- compact semantic context for the planner prompt
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
class Confidence:
RELIABLE = "reliable"
PARTIAL = "partial"
SPARSE = "sparse"
DEPRECATED = "deprecated"
@dataclass(frozen=True)
class FieldDescriptor:
table: str
column: str
confidence: str
description: str
notes: str = ""
@dataclass(frozen=True)
class JoinPath:
from_table: str
from_col: str
to_table: str
to_col: str
join_type: str = "INNER"
notes: str = ""
@dataclass
class ConceptDescriptor:
concept_id: str
label: str
description: str
authoritative_fields: list[FieldDescriptor]
deprecated_fields: list[FieldDescriptor] = field(default_factory=list)
preferred_join_paths: list[JoinPath] = field(default_factory=list)
usage_notes: str = ""
CATALOG_VERSION = "velocity_semantic_v2026_04_25_01"
CONCEPTS: list[ConceptDescriptor] = [
ConceptDescriptor(
concept_id="person_identity",
label="Client Identity",
description="Canonical identity record for a person in CRM.",
authoritative_fields=[
FieldDescriptor("crm_people", "person_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_people", "full_name", Confidence.RELIABLE, "Display name"),
FieldDescriptor("crm_people", "primary_email", Confidence.RELIABLE, "Email"),
FieldDescriptor("crm_people", "primary_phone", Confidence.RELIABLE, "Phone"),
FieldDescriptor("crm_people", "persona_labels", Confidence.PARTIAL, "Buyer persona labels"),
],
usage_notes=(
"Anchor client-level queries on crm_people.person_id. "
"Treat crm_people as the identity source of truth."
),
),
ConceptDescriptor(
concept_id="lead_funnel",
label="Lead Funnel",
description="Lead ownership, stage, status, and urgency.",
authoritative_fields=[
FieldDescriptor("crm_leads", "lead_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_leads", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("crm_leads", "stage", Confidence.RELIABLE, "Current funnel stage"),
FieldDescriptor("crm_leads", "status", Confidence.RELIABLE, "Lead status"),
FieldDescriptor("crm_leads", "assigned_user_id", Confidence.RELIABLE, "Owning user"),
FieldDescriptor("crm_leads", "budget_band", Confidence.PARTIAL, "Budget band"),
FieldDescriptor("crm_leads", "urgency", Confidence.PARTIAL, "Urgency tag"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
],
),
ConceptDescriptor(
concept_id="qd_score",
label="QD Score",
description="Qualification / Desire score source of truth.",
authoritative_fields=[
FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"),
FieldDescriptor("intel_qd_scores", "score_type", Confidence.RELIABLE, "Score family"),
FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"),
],
deprecated_fields=[
FieldDescriptor("crm_people", "engagement_score", Confidence.DEPRECATED, "Not QD"),
FieldDescriptor("crm_leads", "engagement_score", Confidence.DEPRECATED, "Not QD"),
FieldDescriptor("intel_interactions", "engagement_score", Confidence.DEPRECATED, "Not QD"),
],
usage_notes=(
"When a prompt mentions QD, qualification, desire, or intent score, "
"use intel_qd_scores.current_value. Do not substitute engagement_score."
),
),
ConceptDescriptor(
concept_id="communication_events",
label="Communication Events",
description="Authoritative recent-contact and interaction history source.",
authoritative_fields=[
FieldDescriptor("intel_interactions", "interaction_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_interactions", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_interactions", "channel", Confidence.RELIABLE, "Interaction channel"),
FieldDescriptor("intel_interactions", "interaction_type", Confidence.RELIABLE, "Interaction type"),
FieldDescriptor("intel_interactions", "happened_at", Confidence.RELIABLE, "Primary recency timestamp"),
FieldDescriptor("intel_interactions", "summary", Confidence.RELIABLE, "Interaction summary"),
],
deprecated_fields=[
FieldDescriptor("edge_communication_events", "timestamp", Confidence.SPARSE, "Do not use for recency"),
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.SPARSE, "Do not use for recency"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "intel_interactions", "person_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_calls", "interaction_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_messages", "interaction_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_emails", "interaction_id", "LEFT"),
],
usage_notes=(
"For recent contact, last contact, or contacted us, prefer intel_interactions.happened_at. "
"Use read_last_contacted if available for precomputed summaries."
),
),
ConceptDescriptor(
concept_id="last_contact_read_model",
label="Last Contact Read Model",
description="Per-person last-contact summary materialization.",
authoritative_fields=[
FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("read_last_contacted", "last_contacted_at", Confidence.RELIABLE, "Last contact time"),
FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"),
FieldDescriptor("read_last_contacted", "days_since_last_contact", Confidence.RELIABLE, "Recency in days"),
FieldDescriptor("read_last_contacted", "staleness_label", Confidence.RELIABLE, "Hot/warm/cold bucket"),
],
deprecated_fields=[
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"),
],
usage_notes=(
"If this table exists, prefer it for last-contact prompts over rebuilding recency from raw interactions."
),
),
ConceptDescriptor(
concept_id="next_best_action",
label="Next Best Action",
description="Precomputed follow-up action recommendations.",
authoritative_fields=[
FieldDescriptor("read_next_best_action", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("read_next_best_action", "action_label", Confidence.RELIABLE, "Human-readable action"),
FieldDescriptor("read_next_best_action", "urgency", Confidence.RELIABLE, "Urgency"),
FieldDescriptor("read_next_best_action", "recommended_channel", Confidence.RELIABLE, "Suggested channel"),
FieldDescriptor("read_next_best_action", "execute_within_hours", Confidence.RELIABLE, "Action SLA"),
],
),
ConceptDescriptor(
concept_id="property_interest",
label="Property Interest",
description="Client-level project or unit interest records.",
authoritative_fields=[
FieldDescriptor("crm_property_interests", "interest_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_property_interests", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("crm_property_interests", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
FieldDescriptor("crm_property_interests", "project_name", Confidence.RELIABLE, "Primary text project scope"),
FieldDescriptor("crm_property_interests", "unit_id", Confidence.PARTIAL, "FK to inventory_units"),
FieldDescriptor("crm_property_interests", "interest_level", Confidence.RELIABLE, "Interest strength"),
FieldDescriptor("crm_property_interests", "configuration_preference", Confidence.PARTIAL, "Configuration"),
FieldDescriptor("crm_property_interests", "budget_min", Confidence.PARTIAL, "Minimum budget"),
FieldDescriptor("crm_property_interests", "budget_max", Confidence.PARTIAL, "Maximum budget"),
FieldDescriptor("crm_property_interests", "financing_plan", Confidence.PARTIAL, "Financing plan"),
FieldDescriptor("crm_property_interests", "notes", Confidence.PARTIAL, "Free-text notes"),
],
deprecated_fields=[
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Do not use for recency"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_property_interests", "person_id", "LEFT"),
JoinPath("crm_property_interests", "project_id", "inventory_projects", "project_id", "LEFT"),
],
usage_notes=(
"For prompts scoped to a specific property or project, filter on crm_property_interests.project_name "
"case-insensitively. For top properties, group by project_name and count distinct person_id."
),
),
ConceptDescriptor(
concept_id="opportunities",
label="Opportunities",
description="Deal pipeline records.",
authoritative_fields=[
FieldDescriptor("crm_opportunities", "opportunity_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_opportunities", "lead_id", Confidence.RELIABLE, "FK to crm_leads"),
FieldDescriptor("crm_opportunities", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
FieldDescriptor("crm_opportunities", "stage", Confidence.RELIABLE, "Opportunity stage"),
FieldDescriptor("crm_opportunities", "value", Confidence.RELIABLE, "Deal value"),
FieldDescriptor("crm_opportunities", "probability", Confidence.PARTIAL, "Probability"),
FieldDescriptor("crm_opportunities", "next_action", Confidence.RELIABLE, "Next action"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
JoinPath("crm_leads", "lead_id", "crm_opportunities", "lead_id", "LEFT"),
JoinPath("crm_opportunities", "project_id", "inventory_projects", "project_id", "LEFT"),
],
),
ConceptDescriptor(
concept_id="site_visits",
label="Site Visits",
description="Physical visit records and outcomes.",
authoritative_fields=[
FieldDescriptor("intel_visits", "visit_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_visits", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_visits", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
FieldDescriptor("intel_visits", "project_name", Confidence.PARTIAL, "Project name"),
FieldDescriptor("intel_visits", "visited_at", Confidence.RELIABLE, "Visit timestamp"),
FieldDescriptor("intel_visits", "visit_notes", Confidence.RELIABLE, "Visit notes"),
],
),
ConceptDescriptor(
concept_id="inventory",
label="Inventory",
description="Project and unit master data.",
authoritative_fields=[
FieldDescriptor("inventory_projects", "project_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("inventory_projects", "project_name", Confidence.RELIABLE, "Project name"),
FieldDescriptor("inventory_projects", "developer_name", Confidence.RELIABLE, "Developer"),
FieldDescriptor("inventory_projects", "micro_market", Confidence.RELIABLE, "Micro market"),
FieldDescriptor("inventory_units", "unit_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("inventory_units", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
FieldDescriptor("inventory_units", "configuration", Confidence.RELIABLE, "Configuration"),
FieldDescriptor("inventory_units", "price_current", Confidence.RELIABLE, "Current price"),
FieldDescriptor("inventory_units", "status", Confidence.RELIABLE, "Unit status"),
],
),
ConceptDescriptor(
concept_id="extracted_facts",
label="Extracted Facts",
description="AI-extracted CRM memory facts.",
authoritative_fields=[
FieldDescriptor("intel_extracted_facts", "fact_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_extracted_facts", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_extracted_facts", "fact_type", Confidence.RELIABLE, "Fact type"),
FieldDescriptor("intel_extracted_facts", "fact_text", Confidence.RELIABLE, "Fact text"),
FieldDescriptor("intel_extracted_facts", "confidence", Confidence.RELIABLE, "Extraction confidence"),
FieldDescriptor("intel_extracted_facts", "effective_date", Confidence.PARTIAL, "Fact date"),
],
),
ConceptDescriptor(
concept_id="call_objections",
label="Call Objections",
description="Structured objections extracted from calls.",
authoritative_fields=[
FieldDescriptor("intel_call_objections", "objection_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_call_objections", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_call_objections", "objection_type", Confidence.RELIABLE, "Objection type"),
FieldDescriptor("intel_call_objections", "objection_text", Confidence.RELIABLE, "Objection text"),
FieldDescriptor("intel_call_objections", "intensity", Confidence.RELIABLE, "Intensity"),
FieldDescriptor("intel_call_objections", "was_resolved", Confidence.RELIABLE, "Resolution flag"),
FieldDescriptor("intel_call_objections", "raised_at", Confidence.RELIABLE, "Raised timestamp"),
],
),
]
_CONCEPT_INDEX: dict[str, ConceptDescriptor] = {concept.concept_id: concept for concept in CONCEPTS}
def get_concept(concept_id: str) -> ConceptDescriptor | None:
return _CONCEPT_INDEX.get(concept_id)
def all_concepts() -> list[ConceptDescriptor]:
return CONCEPTS
INTENT_CONCEPT_MAP: dict[str, list[str]] = {
"last_contacted": ["last_contact_read_model", "communication_events", "person_identity"],
"interested_clients": ["property_interest", "person_identity", "lead_funnel"],
"qd_score": ["qd_score", "person_identity"],
"pipeline": ["opportunities", "lead_funnel", "person_identity"],
"site_visits": ["site_visits", "person_identity", "property_interest"],
"timeline": ["communication_events", "person_identity"],
"objections": ["call_objections", "communication_events", "person_identity"],
"broker_performance": ["lead_funnel", "opportunities"],
"next_action": ["next_best_action", "person_identity", "lead_funnel"],
"inventory": ["inventory", "property_interest"],
"extracted_facts": ["extracted_facts", "person_identity"],
"client_360": [
"person_identity",
"lead_funnel",
"qd_score",
"communication_events",
"property_interest",
"opportunities",
"next_best_action",
],
}
def concepts_for_intent(intent: str) -> list[ConceptDescriptor]:
ids = INTENT_CONCEPT_MAP.get(intent, ["person_identity", "lead_funnel"])
return [_CONCEPT_INDEX[concept_id] for concept_id in ids if concept_id in _CONCEPT_INDEX]
def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]:
return {
"table": field.table,
"column": field.column,
"confidence": field.confidence,
"description": field.description,
**({"notes": field.notes} if field.notes else {}),
}
def concept_to_dict(concept: ConceptDescriptor) -> dict[str, Any]:
return {
"concept_id": concept.concept_id,
"label": concept.label,
"description": concept.description,
"authoritative_fields": [_field_to_dict(field) for field in concept.authoritative_fields],
"deprecated_fields": [_field_to_dict(field) for field in concept.deprecated_fields],
"preferred_join_paths": [
{
"from": f"{join.from_table}.{join.from_col}",
"to": f"{join.to_table}.{join.to_col}",
"join_type": join.join_type,
**({"notes": join.notes} if join.notes else {}),
}
for join in concept.preferred_join_paths
],
**({"usage_notes": concept.usage_notes} if concept.usage_notes else {}),
}
def build_semantic_context_for_planner(detected_intents: list[str], *, max_concepts: int = 5) -> str:
import json
seen: set[str] = set()
ordered: list[ConceptDescriptor] = []
for intent in detected_intents:
for concept in concepts_for_intent(intent):
if concept.concept_id not in seen:
seen.add(concept.concept_id)
ordered.append(concept)
return json.dumps(
{
"catalog_version": CATALOG_VERSION,
"concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]],
},
separators=(",", ":"),
)