361 lines
18 KiB
Python
361 lines
18 KiB
Python
"""
|
|
oracle/semantic_catalog.py
|
|
|
|
Business-semantic layer for Oracle's natural DB planner.
|
|
|
|
This sits between raw schema introspection and SQL generation. It defines:
|
|
- authoritative tables and columns for business concepts
|
|
- deprecated or sparse fields the planner should avoid
|
|
- preferred join paths
|
|
- compact semantic context for the planner prompt
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
|
|
class Confidence:
|
|
RELIABLE = "reliable"
|
|
PARTIAL = "partial"
|
|
SPARSE = "sparse"
|
|
DEPRECATED = "deprecated"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FieldDescriptor:
|
|
table: str
|
|
column: str
|
|
confidence: str
|
|
description: str
|
|
notes: str = ""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JoinPath:
|
|
from_table: str
|
|
from_col: str
|
|
to_table: str
|
|
to_col: str
|
|
join_type: str = "INNER"
|
|
notes: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ConceptDescriptor:
|
|
concept_id: str
|
|
label: str
|
|
description: str
|
|
authoritative_fields: list[FieldDescriptor]
|
|
deprecated_fields: list[FieldDescriptor] = field(default_factory=list)
|
|
preferred_join_paths: list[JoinPath] = field(default_factory=list)
|
|
usage_notes: str = ""
|
|
|
|
|
|
CATALOG_VERSION = "velocity_semantic_v2026_04_25_01"
|
|
|
|
CONCEPTS: list[ConceptDescriptor] = [
|
|
ConceptDescriptor(
|
|
concept_id="person_identity",
|
|
label="Client Identity",
|
|
description="Canonical identity record for a person in CRM.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("crm_people", "person_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("crm_people", "full_name", Confidence.RELIABLE, "Display name"),
|
|
FieldDescriptor("crm_people", "primary_email", Confidence.RELIABLE, "Email"),
|
|
FieldDescriptor("crm_people", "primary_phone", Confidence.RELIABLE, "Phone"),
|
|
FieldDescriptor("crm_people", "persona_labels", Confidence.PARTIAL, "Buyer persona labels"),
|
|
],
|
|
usage_notes=(
|
|
"Anchor client-level queries on crm_people.person_id. "
|
|
"Treat crm_people as the identity source of truth."
|
|
),
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="lead_funnel",
|
|
label="Lead Funnel",
|
|
description="Lead ownership, stage, status, and urgency.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("crm_leads", "lead_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("crm_leads", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("crm_leads", "stage", Confidence.RELIABLE, "Current funnel stage"),
|
|
FieldDescriptor("crm_leads", "status", Confidence.RELIABLE, "Lead status"),
|
|
FieldDescriptor("crm_leads", "assigned_user_id", Confidence.RELIABLE, "Owning user"),
|
|
FieldDescriptor("crm_leads", "budget_band", Confidence.PARTIAL, "Budget band"),
|
|
FieldDescriptor("crm_leads", "urgency", Confidence.PARTIAL, "Urgency tag"),
|
|
],
|
|
preferred_join_paths=[
|
|
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="qd_score",
|
|
label="QD Score",
|
|
description="Qualification / Desire score source of truth.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"),
|
|
FieldDescriptor("intel_qd_scores", "score_type", Confidence.RELIABLE, "Score family"),
|
|
FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"),
|
|
],
|
|
deprecated_fields=[
|
|
FieldDescriptor("crm_people", "engagement_score", Confidence.DEPRECATED, "Not QD"),
|
|
FieldDescriptor("crm_leads", "engagement_score", Confidence.DEPRECATED, "Not QD"),
|
|
FieldDescriptor("intel_interactions", "engagement_score", Confidence.DEPRECATED, "Not QD"),
|
|
],
|
|
usage_notes=(
|
|
"When a prompt mentions QD, qualification, desire, or intent score, "
|
|
"use intel_qd_scores.current_value. Do not substitute engagement_score."
|
|
),
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="communication_events",
|
|
label="Communication Events",
|
|
description="Authoritative recent-contact and interaction history source.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("intel_interactions", "interaction_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("intel_interactions", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("intel_interactions", "channel", Confidence.RELIABLE, "Interaction channel"),
|
|
FieldDescriptor("intel_interactions", "interaction_type", Confidence.RELIABLE, "Interaction type"),
|
|
FieldDescriptor("intel_interactions", "happened_at", Confidence.RELIABLE, "Primary recency timestamp"),
|
|
FieldDescriptor("intel_interactions", "summary", Confidence.RELIABLE, "Interaction summary"),
|
|
],
|
|
deprecated_fields=[
|
|
FieldDescriptor("edge_communication_events", "timestamp", Confidence.SPARSE, "Do not use for recency"),
|
|
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.SPARSE, "Do not use for recency"),
|
|
],
|
|
preferred_join_paths=[
|
|
JoinPath("crm_people", "person_id", "intel_interactions", "person_id", "LEFT"),
|
|
JoinPath("intel_interactions", "interaction_id", "intel_calls", "interaction_id", "LEFT"),
|
|
JoinPath("intel_interactions", "interaction_id", "intel_messages", "interaction_id", "LEFT"),
|
|
JoinPath("intel_interactions", "interaction_id", "intel_emails", "interaction_id", "LEFT"),
|
|
],
|
|
usage_notes=(
|
|
"For recent contact, last contact, or contacted us, prefer intel_interactions.happened_at. "
|
|
"Use read_last_contacted if available for precomputed summaries."
|
|
),
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="last_contact_read_model",
|
|
label="Last Contact Read Model",
|
|
description="Per-person last-contact summary materialization.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("read_last_contacted", "last_contacted_at", Confidence.RELIABLE, "Last contact time"),
|
|
FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"),
|
|
FieldDescriptor("read_last_contacted", "days_since_last_contact", Confidence.RELIABLE, "Recency in days"),
|
|
FieldDescriptor("read_last_contacted", "staleness_label", Confidence.RELIABLE, "Hot/warm/cold bucket"),
|
|
],
|
|
deprecated_fields=[
|
|
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"),
|
|
],
|
|
usage_notes=(
|
|
"If this table exists, prefer it for last-contact prompts over rebuilding recency from raw interactions."
|
|
),
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="next_best_action",
|
|
label="Next Best Action",
|
|
description="Precomputed follow-up action recommendations.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("read_next_best_action", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("read_next_best_action", "action_label", Confidence.RELIABLE, "Human-readable action"),
|
|
FieldDescriptor("read_next_best_action", "urgency", Confidence.RELIABLE, "Urgency"),
|
|
FieldDescriptor("read_next_best_action", "recommended_channel", Confidence.RELIABLE, "Suggested channel"),
|
|
FieldDescriptor("read_next_best_action", "execute_within_hours", Confidence.RELIABLE, "Action SLA"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="property_interest",
|
|
label="Property Interest",
|
|
description="Client-level project or unit interest records.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("crm_property_interests", "interest_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("crm_property_interests", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("crm_property_interests", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
|
|
FieldDescriptor("crm_property_interests", "project_name", Confidence.RELIABLE, "Primary text project scope"),
|
|
FieldDescriptor("crm_property_interests", "unit_id", Confidence.PARTIAL, "FK to inventory_units"),
|
|
FieldDescriptor("crm_property_interests", "interest_level", Confidence.RELIABLE, "Interest strength"),
|
|
FieldDescriptor("crm_property_interests", "configuration_preference", Confidence.PARTIAL, "Configuration"),
|
|
FieldDescriptor("crm_property_interests", "budget_min", Confidence.PARTIAL, "Minimum budget"),
|
|
FieldDescriptor("crm_property_interests", "budget_max", Confidence.PARTIAL, "Maximum budget"),
|
|
FieldDescriptor("crm_property_interests", "financing_plan", Confidence.PARTIAL, "Financing plan"),
|
|
FieldDescriptor("crm_property_interests", "notes", Confidence.PARTIAL, "Free-text notes"),
|
|
],
|
|
deprecated_fields=[
|
|
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Do not use for recency"),
|
|
],
|
|
preferred_join_paths=[
|
|
JoinPath("crm_people", "person_id", "crm_property_interests", "person_id", "LEFT"),
|
|
JoinPath("crm_property_interests", "project_id", "inventory_projects", "project_id", "LEFT"),
|
|
],
|
|
usage_notes=(
|
|
"For prompts scoped to a specific property or project, filter on crm_property_interests.project_name "
|
|
"case-insensitively. For top properties, group by project_name and count distinct person_id."
|
|
),
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="opportunities",
|
|
label="Opportunities",
|
|
description="Deal pipeline records.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("crm_opportunities", "opportunity_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("crm_opportunities", "lead_id", Confidence.RELIABLE, "FK to crm_leads"),
|
|
FieldDescriptor("crm_opportunities", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
|
|
FieldDescriptor("crm_opportunities", "stage", Confidence.RELIABLE, "Opportunity stage"),
|
|
FieldDescriptor("crm_opportunities", "value", Confidence.RELIABLE, "Deal value"),
|
|
FieldDescriptor("crm_opportunities", "probability", Confidence.PARTIAL, "Probability"),
|
|
FieldDescriptor("crm_opportunities", "next_action", Confidence.RELIABLE, "Next action"),
|
|
],
|
|
preferred_join_paths=[
|
|
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
|
|
JoinPath("crm_leads", "lead_id", "crm_opportunities", "lead_id", "LEFT"),
|
|
JoinPath("crm_opportunities", "project_id", "inventory_projects", "project_id", "LEFT"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="site_visits",
|
|
label="Site Visits",
|
|
description="Physical visit records and outcomes.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("intel_visits", "visit_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("intel_visits", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("intel_visits", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
|
|
FieldDescriptor("intel_visits", "project_name", Confidence.PARTIAL, "Project name"),
|
|
FieldDescriptor("intel_visits", "visited_at", Confidence.RELIABLE, "Visit timestamp"),
|
|
FieldDescriptor("intel_visits", "visit_notes", Confidence.RELIABLE, "Visit notes"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="inventory",
|
|
label="Inventory",
|
|
description="Project and unit master data.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("inventory_projects", "project_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("inventory_projects", "project_name", Confidence.RELIABLE, "Project name"),
|
|
FieldDescriptor("inventory_projects", "developer_name", Confidence.RELIABLE, "Developer"),
|
|
FieldDescriptor("inventory_projects", "micro_market", Confidence.RELIABLE, "Micro market"),
|
|
FieldDescriptor("inventory_units", "unit_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("inventory_units", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
|
|
FieldDescriptor("inventory_units", "configuration", Confidence.RELIABLE, "Configuration"),
|
|
FieldDescriptor("inventory_units", "price_current", Confidence.RELIABLE, "Current price"),
|
|
FieldDescriptor("inventory_units", "status", Confidence.RELIABLE, "Unit status"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="extracted_facts",
|
|
label="Extracted Facts",
|
|
description="AI-extracted CRM memory facts.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("intel_extracted_facts", "fact_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("intel_extracted_facts", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("intel_extracted_facts", "fact_type", Confidence.RELIABLE, "Fact type"),
|
|
FieldDescriptor("intel_extracted_facts", "fact_text", Confidence.RELIABLE, "Fact text"),
|
|
FieldDescriptor("intel_extracted_facts", "confidence", Confidence.RELIABLE, "Extraction confidence"),
|
|
FieldDescriptor("intel_extracted_facts", "effective_date", Confidence.PARTIAL, "Fact date"),
|
|
],
|
|
),
|
|
ConceptDescriptor(
|
|
concept_id="call_objections",
|
|
label="Call Objections",
|
|
description="Structured objections extracted from calls.",
|
|
authoritative_fields=[
|
|
FieldDescriptor("intel_call_objections", "objection_id", Confidence.RELIABLE, "Primary key"),
|
|
FieldDescriptor("intel_call_objections", "person_id", Confidence.RELIABLE, "FK to crm_people"),
|
|
FieldDescriptor("intel_call_objections", "objection_type", Confidence.RELIABLE, "Objection type"),
|
|
FieldDescriptor("intel_call_objections", "objection_text", Confidence.RELIABLE, "Objection text"),
|
|
FieldDescriptor("intel_call_objections", "intensity", Confidence.RELIABLE, "Intensity"),
|
|
FieldDescriptor("intel_call_objections", "was_resolved", Confidence.RELIABLE, "Resolution flag"),
|
|
FieldDescriptor("intel_call_objections", "raised_at", Confidence.RELIABLE, "Raised timestamp"),
|
|
],
|
|
),
|
|
]
|
|
|
|
_CONCEPT_INDEX: dict[str, ConceptDescriptor] = {concept.concept_id: concept for concept in CONCEPTS}
|
|
|
|
|
|
def get_concept(concept_id: str) -> ConceptDescriptor | None:
|
|
return _CONCEPT_INDEX.get(concept_id)
|
|
|
|
|
|
def all_concepts() -> list[ConceptDescriptor]:
|
|
return CONCEPTS
|
|
|
|
|
|
INTENT_CONCEPT_MAP: dict[str, list[str]] = {
|
|
"last_contacted": ["last_contact_read_model", "communication_events", "person_identity"],
|
|
"interested_clients": ["property_interest", "person_identity", "lead_funnel"],
|
|
"qd_score": ["qd_score", "person_identity"],
|
|
"pipeline": ["opportunities", "lead_funnel", "person_identity"],
|
|
"site_visits": ["site_visits", "person_identity", "property_interest"],
|
|
"timeline": ["communication_events", "person_identity"],
|
|
"objections": ["call_objections", "communication_events", "person_identity"],
|
|
"broker_performance": ["lead_funnel", "opportunities"],
|
|
"next_action": ["next_best_action", "person_identity", "lead_funnel"],
|
|
"inventory": ["inventory", "property_interest"],
|
|
"extracted_facts": ["extracted_facts", "person_identity"],
|
|
"client_360": [
|
|
"person_identity",
|
|
"lead_funnel",
|
|
"qd_score",
|
|
"communication_events",
|
|
"property_interest",
|
|
"opportunities",
|
|
"next_best_action",
|
|
],
|
|
}
|
|
|
|
|
|
def concepts_for_intent(intent: str) -> list[ConceptDescriptor]:
|
|
ids = INTENT_CONCEPT_MAP.get(intent, ["person_identity", "lead_funnel"])
|
|
return [_CONCEPT_INDEX[concept_id] for concept_id in ids if concept_id in _CONCEPT_INDEX]
|
|
|
|
|
|
def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]:
|
|
return {
|
|
"table": field.table,
|
|
"column": field.column,
|
|
"confidence": field.confidence,
|
|
"description": field.description,
|
|
**({"notes": field.notes} if field.notes else {}),
|
|
}
|
|
|
|
|
|
def concept_to_dict(concept: ConceptDescriptor) -> dict[str, Any]:
|
|
return {
|
|
"concept_id": concept.concept_id,
|
|
"label": concept.label,
|
|
"description": concept.description,
|
|
"authoritative_fields": [_field_to_dict(field) for field in concept.authoritative_fields],
|
|
"deprecated_fields": [_field_to_dict(field) for field in concept.deprecated_fields],
|
|
"preferred_join_paths": [
|
|
{
|
|
"from": f"{join.from_table}.{join.from_col}",
|
|
"to": f"{join.to_table}.{join.to_col}",
|
|
"join_type": join.join_type,
|
|
**({"notes": join.notes} if join.notes else {}),
|
|
}
|
|
for join in concept.preferred_join_paths
|
|
],
|
|
**({"usage_notes": concept.usage_notes} if concept.usage_notes else {}),
|
|
}
|
|
|
|
|
|
def build_semantic_context_for_planner(detected_intents: list[str], *, max_concepts: int = 5) -> str:
|
|
import json
|
|
|
|
seen: set[str] = set()
|
|
ordered: list[ConceptDescriptor] = []
|
|
for intent in detected_intents:
|
|
for concept in concepts_for_intent(intent):
|
|
if concept.concept_id not in seen:
|
|
seen.add(concept.concept_id)
|
|
ordered.append(concept)
|
|
return json.dumps(
|
|
{
|
|
"catalog_version": CATALOG_VERSION,
|
|
"concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]],
|
|
},
|
|
separators=(",", ":"),
|
|
)
|