""" oracle/semantic_catalog.py Business-semantic layer for Oracle's natural DB planner. This sits between raw schema introspection and SQL generation. It defines: - authoritative tables and columns for business concepts - deprecated or sparse fields the planner should avoid - preferred join paths - compact semantic context for the planner prompt """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any class Confidence: RELIABLE = "reliable" PARTIAL = "partial" SPARSE = "sparse" DEPRECATED = "deprecated" @dataclass(frozen=True) class FieldDescriptor: table: str column: str confidence: str description: str notes: str = "" valid_values: tuple[str, ...] = () examples: tuple[str, ...] = () @dataclass(frozen=True) class JoinPath: from_table: str from_col: str to_table: str to_col: str join_type: str = "INNER" notes: str = "" @dataclass class ConceptDescriptor: concept_id: str label: str description: str authoritative_fields: list[FieldDescriptor] deprecated_fields: list[FieldDescriptor] = field(default_factory=list) preferred_join_paths: list[JoinPath] = field(default_factory=list) usage_notes: str = "" CATALOG_VERSION = "velocity_semantic_v2026_04_25_01" @dataclass(frozen=True) class ColumnMetadata: table: str column: str topic: str meaning: str reliability: str valid_values: tuple[str, ...] = () examples: tuple[str, ...] = () usage: str = "" avoid: bool = False VALID_QD_SCORE_TYPES: tuple[str, ...] = ( "overall", "intent", "engagement", "urgency", "financial_qualification", ) COLUMN_METADATA: list[ColumnMetadata] = [ ColumnMetadata( "intel_qd_scores", "score_type", "qd_score", "Score family/category. There is no score_type value named QD.", Confidence.RELIABLE, valid_values=VALID_QD_SCORE_TYPES, examples=("overall", "intent", "engagement"), usage=( "For generic QD score prompts, prefer score_type = 'overall'. " "For specific intent/engagement/urgency/financial prompts, use the matching valid value. " "Never filter score_type = 'QD'." ), ), ColumnMetadata( "intel_qd_scores", "current_value", "qd_score", "Authoritative numeric score value for the selected score_type.", Confidence.RELIABLE, examples=("98.0", "72.4"), usage="Rank, sort, average, or threshold QD-style scores with this column.", ), ColumnMetadata( "intel_qd_scores", "computed_at", "qd_score", "Timestamp when the score was computed.", Confidence.RELIABLE, examples=("2026-04-18T00:00:00"), usage="Use for score freshness, not client contact recency.", ), ColumnMetadata( "intel_interactions", "happened_at", "contact_recency", "Primary timestamp for client contact and interaction recency.", Confidence.RELIABLE, usage="Use for contacted, last contacted, recent contact, activity, and timeline prompts.", ), ColumnMetadata( "read_last_contacted", "last_contact_at", "contact_recency", "Precomputed per-client last contact timestamp.", Confidence.RELIABLE, usage="Prefer for client-level last-contact summaries when this read model is available.", ), ColumnMetadata( "edge_communication_events", "timestamp", "contact_recency", "Legacy/sparse event timestamp that is not reliable for Oracle CRM recency.", Confidence.SPARSE, usage="Do not use for contact prompts.", avoid=True, ), ColumnMetadata( "crm_property_interests", "last_discussed_at", "contact_recency", "Sparse legacy field; property interest does not prove recent contact.", Confidence.SPARSE, usage="Do not use as the primary recency filter.", avoid=True, ), ColumnMetadata( "crm_property_interests", "project_name", "property_interest", "Human-readable project/property name attached to a client's interest.", Confidence.RELIABLE, examples=("Atri Surya Toron", "Godrej Elevate"), usage="Use ILIKE filters for property/project scoped prompts.", ), ColumnMetadata( "crm_property_interests", "interest_level", "property_interest", "Interest strength label or score imported from CRM enrichment.", Confidence.RELIABLE, usage="Use with project_name and person_id to rank interested clients or properties.", ), ] CONCEPTS: list[ConceptDescriptor] = [ ConceptDescriptor( concept_id="person_identity", label="Client Identity", description="Canonical identity record for a person in CRM.", authoritative_fields=[ FieldDescriptor("crm_people", "person_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("crm_people", "full_name", Confidence.RELIABLE, "Display name"), FieldDescriptor("crm_people", "primary_email", Confidence.RELIABLE, "Email"), FieldDescriptor("crm_people", "primary_phone", Confidence.RELIABLE, "Phone"), FieldDescriptor("crm_people", "persona_labels", Confidence.PARTIAL, "Buyer persona labels"), ], usage_notes=( "Anchor client-level queries on crm_people.person_id. " "Treat crm_people as the identity source of truth." ), ), ConceptDescriptor( concept_id="lead_funnel", label="Lead Funnel", description="Lead ownership, stage, status, and urgency.", authoritative_fields=[ FieldDescriptor("crm_leads", "lead_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("crm_leads", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("crm_leads", "stage", Confidence.RELIABLE, "Current funnel stage"), FieldDescriptor("crm_leads", "status", Confidence.RELIABLE, "Lead status"), FieldDescriptor("crm_leads", "assigned_user_id", Confidence.RELIABLE, "Owning user"), FieldDescriptor("crm_leads", "budget_band", Confidence.PARTIAL, "Budget band"), FieldDescriptor("crm_leads", "urgency", Confidence.PARTIAL, "Urgency tag"), ], preferred_join_paths=[ JoinPath("crm_people", "person_id", "crm_leads", "person_id"), ], ), ConceptDescriptor( concept_id="qd_score", label="QD Score", description="Qualification / Desire score source of truth.", authoritative_fields=[ FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"), FieldDescriptor( "intel_qd_scores", "score_type", Confidence.RELIABLE, "Score family", notes="Valid values are overall, intent, engagement, urgency, financial_qualification. There is no value named QD.", valid_values=VALID_QD_SCORE_TYPES, ), FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"), ], deprecated_fields=[ FieldDescriptor("crm_people", "engagement_score", Confidence.DEPRECATED, "Not QD"), FieldDescriptor("crm_leads", "engagement_score", Confidence.DEPRECATED, "Not QD"), FieldDescriptor("intel_interactions", "engagement_score", Confidence.DEPRECATED, "Not QD"), ], usage_notes=( "When a prompt mentions QD, qualification, desire, or intent score, " "use intel_qd_scores.current_value. Do not substitute engagement_score. " "Do not filter score_type = 'QD'. For generic QD prompts, use score_type = 'overall'. " "Use intent, engagement, urgency, or financial_qualification only when the prompt asks for that specific family." ), ), ConceptDescriptor( concept_id="communication_events", label="Communication Events", description="Authoritative recent-contact and interaction history source.", authoritative_fields=[ FieldDescriptor("intel_interactions", "interaction_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("intel_interactions", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_interactions", "channel", Confidence.RELIABLE, "Interaction channel"), FieldDescriptor("intel_interactions", "interaction_type", Confidence.RELIABLE, "Interaction type"), FieldDescriptor("intel_interactions", "happened_at", Confidence.RELIABLE, "Primary recency timestamp"), FieldDescriptor("intel_interactions", "summary", Confidence.RELIABLE, "Interaction summary"), ], deprecated_fields=[ FieldDescriptor("edge_communication_events", "timestamp", Confidence.SPARSE, "Do not use for recency"), FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.SPARSE, "Do not use for recency"), ], preferred_join_paths=[ JoinPath("crm_people", "person_id", "intel_interactions", "person_id", "LEFT"), JoinPath("intel_interactions", "interaction_id", "intel_calls", "interaction_id", "LEFT"), JoinPath("intel_interactions", "interaction_id", "intel_messages", "interaction_id", "LEFT"), JoinPath("intel_interactions", "interaction_id", "intel_emails", "interaction_id", "LEFT"), ], usage_notes=( "For recent contact, last contact, or contacted us, prefer intel_interactions.happened_at. " "Use read_last_contacted if available for precomputed summaries." ), ), ConceptDescriptor( concept_id="last_contact_read_model", label="Last Contact Read Model", description="Per-person last-contact summary materialization.", authoritative_fields=[ FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("read_last_contacted", "last_contact_at", Confidence.RELIABLE, "Last contact time"), FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"), FieldDescriptor("read_last_contacted", "days_since_contact", Confidence.RELIABLE, "Recency in days"), FieldDescriptor("read_last_contacted", "interactions_last_90d", Confidence.RELIABLE, "Recent interaction volume"), ], deprecated_fields=[ FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"), ], usage_notes=( "If this table exists, prefer it for last-contact prompts over rebuilding recency from raw interactions." ), ), ConceptDescriptor( concept_id="next_best_action", label="Next Best Action", description="Precomputed follow-up action recommendations.", authoritative_fields=[ FieldDescriptor("read_next_best_action", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("read_next_best_action", "action_label", Confidence.RELIABLE, "Human-readable action"), FieldDescriptor("read_next_best_action", "urgency", Confidence.RELIABLE, "Urgency"), FieldDescriptor("read_next_best_action", "recommended_channel", Confidence.RELIABLE, "Suggested channel"), FieldDescriptor("read_next_best_action", "execute_within_hours", Confidence.RELIABLE, "Action SLA"), ], ), ConceptDescriptor( concept_id="property_interest", label="Property Interest", description="Client-level project or unit interest records.", authoritative_fields=[ FieldDescriptor("crm_property_interests", "interest_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("crm_property_interests", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("crm_property_interests", "project_id", Confidence.PARTIAL, "FK to inventory_projects"), FieldDescriptor("crm_property_interests", "project_name", Confidence.RELIABLE, "Primary text project scope"), FieldDescriptor("crm_property_interests", "unit_id", Confidence.PARTIAL, "FK to inventory_units"), FieldDescriptor("crm_property_interests", "interest_level", Confidence.RELIABLE, "Interest strength"), FieldDescriptor("crm_property_interests", "configuration_preference", Confidence.PARTIAL, "Configuration"), FieldDescriptor("crm_property_interests", "budget_min", Confidence.PARTIAL, "Minimum budget"), FieldDescriptor("crm_property_interests", "budget_max", Confidence.PARTIAL, "Maximum budget"), FieldDescriptor("crm_property_interests", "financing_plan", Confidence.PARTIAL, "Financing plan"), FieldDescriptor("crm_property_interests", "notes", Confidence.PARTIAL, "Free-text notes"), ], deprecated_fields=[ FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Do not use for recency"), ], preferred_join_paths=[ JoinPath("crm_people", "person_id", "crm_property_interests", "person_id", "LEFT"), JoinPath("crm_property_interests", "project_id", "inventory_projects", "project_id", "LEFT"), ], usage_notes=( "For prompts scoped to a specific property or project, filter on crm_property_interests.project_name " "case-insensitively. For top properties, group by project_name and count distinct person_id." ), ), ConceptDescriptor( concept_id="opportunities", label="Opportunities", description="Deal pipeline records.", authoritative_fields=[ FieldDescriptor("crm_opportunities", "opportunity_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("crm_opportunities", "lead_id", Confidence.RELIABLE, "FK to crm_leads"), FieldDescriptor("crm_opportunities", "project_id", Confidence.RELIABLE, "FK to inventory_projects"), FieldDescriptor("crm_opportunities", "stage", Confidence.RELIABLE, "Opportunity stage"), FieldDescriptor("crm_opportunities", "value", Confidence.RELIABLE, "Deal value"), FieldDescriptor("crm_opportunities", "probability", Confidence.PARTIAL, "Probability"), FieldDescriptor("crm_opportunities", "next_action", Confidence.RELIABLE, "Next action"), ], preferred_join_paths=[ JoinPath("crm_people", "person_id", "crm_leads", "person_id"), JoinPath("crm_leads", "lead_id", "crm_opportunities", "lead_id", "LEFT"), JoinPath("crm_opportunities", "project_id", "inventory_projects", "project_id", "LEFT"), ], ), ConceptDescriptor( concept_id="site_visits", label="Site Visits", description="Physical visit records and outcomes.", authoritative_fields=[ FieldDescriptor("intel_visits", "visit_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("intel_visits", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_visits", "project_id", Confidence.PARTIAL, "FK to inventory_projects"), FieldDescriptor("intel_visits", "project_name", Confidence.PARTIAL, "Project name"), FieldDescriptor("intel_visits", "visited_at", Confidence.RELIABLE, "Visit timestamp"), FieldDescriptor("intel_visits", "visit_notes", Confidence.RELIABLE, "Visit notes"), ], ), ConceptDescriptor( concept_id="inventory", label="Inventory", description="Project and unit master data.", authoritative_fields=[ FieldDescriptor("inventory_projects", "project_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("inventory_projects", "project_name", Confidence.RELIABLE, "Project name"), FieldDescriptor("inventory_projects", "developer_name", Confidence.RELIABLE, "Developer"), FieldDescriptor("inventory_projects", "micro_market", Confidence.RELIABLE, "Micro market"), FieldDescriptor("inventory_units", "unit_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("inventory_units", "project_id", Confidence.RELIABLE, "FK to inventory_projects"), FieldDescriptor("inventory_units", "configuration", Confidence.RELIABLE, "Configuration"), FieldDescriptor("inventory_units", "price_current", Confidence.RELIABLE, "Current price"), FieldDescriptor("inventory_units", "status", Confidence.RELIABLE, "Unit status"), ], ), ConceptDescriptor( concept_id="extracted_facts", label="Extracted Facts", description="AI-extracted CRM memory facts.", authoritative_fields=[ FieldDescriptor("intel_extracted_facts", "fact_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("intel_extracted_facts", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_extracted_facts", "fact_type", Confidence.RELIABLE, "Fact type"), FieldDescriptor("intel_extracted_facts", "fact_text", Confidence.RELIABLE, "Fact text"), FieldDescriptor("intel_extracted_facts", "confidence", Confidence.RELIABLE, "Extraction confidence"), FieldDescriptor("intel_extracted_facts", "effective_date", Confidence.PARTIAL, "Fact date"), ], ), ConceptDescriptor( concept_id="call_objections", label="Call Objections", description="Structured objections extracted from calls.", authoritative_fields=[ FieldDescriptor("intel_call_objections", "objection_id", Confidence.RELIABLE, "Primary key"), FieldDescriptor("intel_call_objections", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_call_objections", "objection_type", Confidence.RELIABLE, "Objection type"), FieldDescriptor("intel_call_objections", "objection_text", Confidence.RELIABLE, "Objection text"), FieldDescriptor("intel_call_objections", "intensity", Confidence.RELIABLE, "Intensity"), FieldDescriptor("intel_call_objections", "was_resolved", Confidence.RELIABLE, "Resolution flag"), FieldDescriptor("intel_call_objections", "raised_at", Confidence.RELIABLE, "Raised timestamp"), ], ), ] _CONCEPT_INDEX: dict[str, ConceptDescriptor] = {concept.concept_id: concept for concept in CONCEPTS} def get_concept(concept_id: str) -> ConceptDescriptor | None: return _CONCEPT_INDEX.get(concept_id) def all_concepts() -> list[ConceptDescriptor]: return CONCEPTS INTENT_CONCEPT_MAP: dict[str, list[str]] = { "last_contacted": ["last_contact_read_model", "communication_events", "person_identity"], "interested_clients": ["property_interest", "person_identity", "lead_funnel"], "qd_score": ["qd_score", "person_identity"], "pipeline": ["opportunities", "lead_funnel", "person_identity"], "site_visits": ["site_visits", "person_identity", "property_interest"], "timeline": ["communication_events", "person_identity"], "objections": ["call_objections", "communication_events", "person_identity"], "broker_performance": ["lead_funnel", "opportunities"], "next_action": ["next_best_action", "person_identity", "lead_funnel"], "inventory": ["inventory", "property_interest"], "extracted_facts": ["extracted_facts", "person_identity"], "client_360": [ "person_identity", "lead_funnel", "qd_score", "communication_events", "property_interest", "opportunities", "next_best_action", ], } def concepts_for_intent(intent: str) -> list[ConceptDescriptor]: ids = INTENT_CONCEPT_MAP.get(intent, ["person_identity", "lead_funnel"]) return [_CONCEPT_INDEX[concept_id] for concept_id in ids if concept_id in _CONCEPT_INDEX] def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]: return { "table": field.table, "column": field.column, "confidence": field.confidence, "description": field.description, **({"notes": field.notes} if field.notes else {}), **({"valid_values": list(field.valid_values)} if field.valid_values else {}), **({"examples": list(field.examples)} if field.examples else {}), } def concept_to_dict(concept: ConceptDescriptor) -> dict[str, Any]: return { "concept_id": concept.concept_id, "label": concept.label, "description": concept.description, "authoritative_fields": [_field_to_dict(field) for field in concept.authoritative_fields], "deprecated_fields": [_field_to_dict(field) for field in concept.deprecated_fields], "preferred_join_paths": [ { "from": f"{join.from_table}.{join.from_col}", "to": f"{join.to_table}.{join.to_col}", "join_type": join.join_type, **({"notes": join.notes} if join.notes else {}), } for join in concept.preferred_join_paths ], **({"usage_notes": concept.usage_notes} if concept.usage_notes else {}), } def build_semantic_context_for_planner(detected_intents: list[str], *, max_concepts: int = 5) -> str: import json seen: set[str] = set() ordered: list[ConceptDescriptor] = [] for intent in detected_intents: for concept in concepts_for_intent(intent): if concept.concept_id not in seen: seen.add(concept.concept_id) ordered.append(concept) relevant_topics = set(detected_intents) if "last_contacted" in relevant_topics or "timeline" in relevant_topics: relevant_topics.add("contact_recency") if "interested_clients" in relevant_topics or "inventory" in relevant_topics: relevant_topics.add("property_interest") if "qd_score" in relevant_topics: relevant_topics.add("qd_score") column_metadata = [ { "table": item.table, "column": item.column, "topic": item.topic, "meaning": item.meaning, "reliability": item.reliability, **({"valid_values": list(item.valid_values)} if item.valid_values else {}), **({"examples": list(item.examples)} if item.examples else {}), **({"usage": item.usage} if item.usage else {}), **({"avoid": item.avoid} if item.avoid else {}), } for item in COLUMN_METADATA if item.topic in relevant_topics or item.avoid ] return json.dumps( { "catalog_version": CATALOG_VERSION, "concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]], "column_metadata": column_metadata, "global_rules": [ "Do not invent enum values. Use only valid_values from column_metadata when filtering enum-like columns.", "Queries that return zero rows because of impossible enum filters are invalid plans.", "For contact recency, use read_last_contacted.last_contact_at or intel_interactions.happened_at.", "Do not use fields marked avoid=true for the main business filter.", ], }, separators=(",", ":"), )