Files
Project_Velocity/backend/oracle/semantic_catalog.py

513 lines
23 KiB
Python

"""
oracle/semantic_catalog.py
Business-semantic layer for Oracle's natural DB planner.
This sits between raw schema introspection and SQL generation. It defines:
- authoritative tables and columns for business concepts
- deprecated or sparse fields the planner should avoid
- preferred join paths
- compact semantic context for the planner prompt
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
class Confidence:
RELIABLE = "reliable"
PARTIAL = "partial"
SPARSE = "sparse"
DEPRECATED = "deprecated"
@dataclass(frozen=True)
class FieldDescriptor:
table: str
column: str
confidence: str
description: str
notes: str = ""
valid_values: tuple[str, ...] = ()
examples: tuple[str, ...] = ()
@dataclass(frozen=True)
class JoinPath:
from_table: str
from_col: str
to_table: str
to_col: str
join_type: str = "INNER"
notes: str = ""
@dataclass
class ConceptDescriptor:
concept_id: str
label: str
description: str
authoritative_fields: list[FieldDescriptor]
deprecated_fields: list[FieldDescriptor] = field(default_factory=list)
preferred_join_paths: list[JoinPath] = field(default_factory=list)
usage_notes: str = ""
CATALOG_VERSION = "velocity_semantic_v2026_04_25_01"
@dataclass(frozen=True)
class ColumnMetadata:
table: str
column: str
topic: str
meaning: str
reliability: str
valid_values: tuple[str, ...] = ()
examples: tuple[str, ...] = ()
usage: str = ""
avoid: bool = False
VALID_QD_SCORE_TYPES: tuple[str, ...] = (
"overall",
"intent",
"engagement",
"urgency",
"financial_qualification",
)
COLUMN_METADATA: list[ColumnMetadata] = [
ColumnMetadata(
"intel_qd_scores",
"score_type",
"qd_score",
"Score family/category. There is no score_type value named QD.",
Confidence.RELIABLE,
valid_values=VALID_QD_SCORE_TYPES,
examples=("overall", "intent", "engagement"),
usage=(
"For generic QD score prompts, prefer score_type = 'overall'. "
"For specific intent/engagement/urgency/financial prompts, use the matching valid value. "
"Never filter score_type = 'QD'."
),
),
ColumnMetadata(
"intel_qd_scores",
"current_value",
"qd_score",
"Authoritative numeric score value for the selected score_type.",
Confidence.RELIABLE,
examples=("98.0", "72.4"),
usage="Rank, sort, average, or threshold QD-style scores with this column.",
),
ColumnMetadata(
"intel_qd_scores",
"computed_at",
"qd_score",
"Timestamp when the score was computed.",
Confidence.RELIABLE,
examples=("2026-04-18T00:00:00"),
usage="Use for score freshness, not client contact recency.",
),
ColumnMetadata(
"intel_interactions",
"happened_at",
"contact_recency",
"Primary timestamp for client contact and interaction recency.",
Confidence.RELIABLE,
usage="Use for contacted, last contacted, recent contact, activity, and timeline prompts.",
),
ColumnMetadata(
"read_last_contacted",
"last_contact_at",
"contact_recency",
"Precomputed per-client last contact timestamp.",
Confidence.RELIABLE,
usage="Prefer for client-level last-contact summaries when this read model is available.",
),
ColumnMetadata(
"edge_communication_events",
"timestamp",
"contact_recency",
"Legacy/sparse event timestamp that is not reliable for Oracle CRM recency.",
Confidence.SPARSE,
usage="Do not use for contact prompts.",
avoid=True,
),
ColumnMetadata(
"crm_property_interests",
"last_discussed_at",
"contact_recency",
"Sparse legacy field; property interest does not prove recent contact.",
Confidence.SPARSE,
usage="Do not use as the primary recency filter.",
avoid=True,
),
ColumnMetadata(
"crm_property_interests",
"project_name",
"property_interest",
"Human-readable project/property name attached to a client's interest.",
Confidence.RELIABLE,
examples=("Atri Surya Toron", "Godrej Elevate"),
usage="Use ILIKE filters for property/project scoped prompts.",
),
ColumnMetadata(
"crm_property_interests",
"interest_level",
"property_interest",
"Interest strength label or score imported from CRM enrichment.",
Confidence.RELIABLE,
usage="Use with project_name and person_id to rank interested clients or properties.",
),
]
CONCEPTS: list[ConceptDescriptor] = [
ConceptDescriptor(
concept_id="person_identity",
label="Client Identity",
description="Canonical identity record for a person in CRM.",
authoritative_fields=[
FieldDescriptor("crm_people", "person_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_people", "full_name", Confidence.RELIABLE, "Display name"),
FieldDescriptor("crm_people", "primary_email", Confidence.RELIABLE, "Email"),
FieldDescriptor("crm_people", "primary_phone", Confidence.RELIABLE, "Phone"),
FieldDescriptor("crm_people", "persona_labels", Confidence.PARTIAL, "Buyer persona labels"),
],
usage_notes=(
"Anchor client-level queries on crm_people.person_id. "
"Treat crm_people as the identity source of truth."
),
),
ConceptDescriptor(
concept_id="lead_funnel",
label="Lead Funnel",
description="Lead ownership, stage, status, and urgency.",
authoritative_fields=[
FieldDescriptor("crm_leads", "lead_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_leads", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("crm_leads", "stage", Confidence.RELIABLE, "Current funnel stage"),
FieldDescriptor("crm_leads", "status", Confidence.RELIABLE, "Lead status"),
FieldDescriptor("crm_leads", "assigned_user_id", Confidence.RELIABLE, "Owning user"),
FieldDescriptor("crm_leads", "budget_band", Confidence.PARTIAL, "Budget band"),
FieldDescriptor("crm_leads", "urgency", Confidence.PARTIAL, "Urgency tag"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
],
),
ConceptDescriptor(
concept_id="qd_score",
label="QD Score",
description="Qualification / Desire score source of truth.",
authoritative_fields=[
FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"),
FieldDescriptor(
"intel_qd_scores",
"score_type",
Confidence.RELIABLE,
"Score family",
notes="Valid values are overall, intent, engagement, urgency, financial_qualification. There is no value named QD.",
valid_values=VALID_QD_SCORE_TYPES,
),
FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"),
],
deprecated_fields=[
FieldDescriptor("crm_people", "engagement_score", Confidence.DEPRECATED, "Not QD"),
FieldDescriptor("crm_leads", "engagement_score", Confidence.DEPRECATED, "Not QD"),
FieldDescriptor("intel_interactions", "engagement_score", Confidence.DEPRECATED, "Not QD"),
],
usage_notes=(
"When a prompt mentions QD, qualification, desire, or intent score, "
"use intel_qd_scores.current_value. Do not substitute engagement_score. "
"Do not filter score_type = 'QD'. For generic QD prompts, use score_type = 'overall'. "
"Use intent, engagement, urgency, or financial_qualification only when the prompt asks for that specific family."
),
),
ConceptDescriptor(
concept_id="communication_events",
label="Communication Events",
description="Authoritative recent-contact and interaction history source.",
authoritative_fields=[
FieldDescriptor("intel_interactions", "interaction_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_interactions", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_interactions", "channel", Confidence.RELIABLE, "Interaction channel"),
FieldDescriptor("intel_interactions", "interaction_type", Confidence.RELIABLE, "Interaction type"),
FieldDescriptor("intel_interactions", "happened_at", Confidence.RELIABLE, "Primary recency timestamp"),
FieldDescriptor("intel_interactions", "summary", Confidence.RELIABLE, "Interaction summary"),
],
deprecated_fields=[
FieldDescriptor("edge_communication_events", "timestamp", Confidence.SPARSE, "Do not use for recency"),
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.SPARSE, "Do not use for recency"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "intel_interactions", "person_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_calls", "interaction_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_messages", "interaction_id", "LEFT"),
JoinPath("intel_interactions", "interaction_id", "intel_emails", "interaction_id", "LEFT"),
],
usage_notes=(
"For recent contact, last contact, or contacted us, prefer intel_interactions.happened_at. "
"Use read_last_contacted if available for precomputed summaries."
),
),
ConceptDescriptor(
concept_id="last_contact_read_model",
label="Last Contact Read Model",
description="Per-person last-contact summary materialization.",
authoritative_fields=[
FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("read_last_contacted", "last_contact_at", Confidence.RELIABLE, "Last contact time"),
FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"),
FieldDescriptor("read_last_contacted", "days_since_contact", Confidence.RELIABLE, "Recency in days"),
FieldDescriptor("read_last_contacted", "interactions_last_90d", Confidence.RELIABLE, "Recent interaction volume"),
],
deprecated_fields=[
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"),
],
usage_notes=(
"If this table exists, prefer it for last-contact prompts over rebuilding recency from raw interactions."
),
),
ConceptDescriptor(
concept_id="next_best_action",
label="Next Best Action",
description="Precomputed follow-up action recommendations.",
authoritative_fields=[
FieldDescriptor("read_next_best_action", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("read_next_best_action", "action_label", Confidence.RELIABLE, "Human-readable action"),
FieldDescriptor("read_next_best_action", "urgency", Confidence.RELIABLE, "Urgency"),
FieldDescriptor("read_next_best_action", "recommended_channel", Confidence.RELIABLE, "Suggested channel"),
FieldDescriptor("read_next_best_action", "execute_within_hours", Confidence.RELIABLE, "Action SLA"),
],
),
ConceptDescriptor(
concept_id="property_interest",
label="Property Interest",
description="Client-level project or unit interest records.",
authoritative_fields=[
FieldDescriptor("crm_property_interests", "interest_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_property_interests", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("crm_property_interests", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
FieldDescriptor("crm_property_interests", "project_name", Confidence.RELIABLE, "Primary text project scope"),
FieldDescriptor("crm_property_interests", "unit_id", Confidence.PARTIAL, "FK to inventory_units"),
FieldDescriptor("crm_property_interests", "interest_level", Confidence.RELIABLE, "Interest strength"),
FieldDescriptor("crm_property_interests", "configuration_preference", Confidence.PARTIAL, "Configuration"),
FieldDescriptor("crm_property_interests", "budget_min", Confidence.PARTIAL, "Minimum budget"),
FieldDescriptor("crm_property_interests", "budget_max", Confidence.PARTIAL, "Maximum budget"),
FieldDescriptor("crm_property_interests", "financing_plan", Confidence.PARTIAL, "Financing plan"),
FieldDescriptor("crm_property_interests", "notes", Confidence.PARTIAL, "Free-text notes"),
],
deprecated_fields=[
FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Do not use for recency"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_property_interests", "person_id", "LEFT"),
JoinPath("crm_property_interests", "project_id", "inventory_projects", "project_id", "LEFT"),
],
usage_notes=(
"For prompts scoped to a specific property or project, filter on crm_property_interests.project_name "
"case-insensitively. For top properties, group by project_name and count distinct person_id."
),
),
ConceptDescriptor(
concept_id="opportunities",
label="Opportunities",
description="Deal pipeline records.",
authoritative_fields=[
FieldDescriptor("crm_opportunities", "opportunity_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("crm_opportunities", "lead_id", Confidence.RELIABLE, "FK to crm_leads"),
FieldDescriptor("crm_opportunities", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
FieldDescriptor("crm_opportunities", "stage", Confidence.RELIABLE, "Opportunity stage"),
FieldDescriptor("crm_opportunities", "value", Confidence.RELIABLE, "Deal value"),
FieldDescriptor("crm_opportunities", "probability", Confidence.PARTIAL, "Probability"),
FieldDescriptor("crm_opportunities", "next_action", Confidence.RELIABLE, "Next action"),
],
preferred_join_paths=[
JoinPath("crm_people", "person_id", "crm_leads", "person_id"),
JoinPath("crm_leads", "lead_id", "crm_opportunities", "lead_id", "LEFT"),
JoinPath("crm_opportunities", "project_id", "inventory_projects", "project_id", "LEFT"),
],
),
ConceptDescriptor(
concept_id="site_visits",
label="Site Visits",
description="Physical visit records and outcomes.",
authoritative_fields=[
FieldDescriptor("intel_visits", "visit_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_visits", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_visits", "project_id", Confidence.PARTIAL, "FK to inventory_projects"),
FieldDescriptor("intel_visits", "project_name", Confidence.PARTIAL, "Project name"),
FieldDescriptor("intel_visits", "visited_at", Confidence.RELIABLE, "Visit timestamp"),
FieldDescriptor("intel_visits", "visit_notes", Confidence.RELIABLE, "Visit notes"),
],
),
ConceptDescriptor(
concept_id="inventory",
label="Inventory",
description="Project and unit master data.",
authoritative_fields=[
FieldDescriptor("inventory_projects", "project_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("inventory_projects", "project_name", Confidence.RELIABLE, "Project name"),
FieldDescriptor("inventory_projects", "developer_name", Confidence.RELIABLE, "Developer"),
FieldDescriptor("inventory_projects", "micro_market", Confidence.RELIABLE, "Micro market"),
FieldDescriptor("inventory_units", "unit_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("inventory_units", "project_id", Confidence.RELIABLE, "FK to inventory_projects"),
FieldDescriptor("inventory_units", "configuration", Confidence.RELIABLE, "Configuration"),
FieldDescriptor("inventory_units", "price_current", Confidence.RELIABLE, "Current price"),
FieldDescriptor("inventory_units", "status", Confidence.RELIABLE, "Unit status"),
],
),
ConceptDescriptor(
concept_id="extracted_facts",
label="Extracted Facts",
description="AI-extracted CRM memory facts.",
authoritative_fields=[
FieldDescriptor("intel_extracted_facts", "fact_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_extracted_facts", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_extracted_facts", "fact_type", Confidence.RELIABLE, "Fact type"),
FieldDescriptor("intel_extracted_facts", "fact_text", Confidence.RELIABLE, "Fact text"),
FieldDescriptor("intel_extracted_facts", "confidence", Confidence.RELIABLE, "Extraction confidence"),
FieldDescriptor("intel_extracted_facts", "effective_date", Confidence.PARTIAL, "Fact date"),
],
),
ConceptDescriptor(
concept_id="call_objections",
label="Call Objections",
description="Structured objections extracted from calls.",
authoritative_fields=[
FieldDescriptor("intel_call_objections", "objection_id", Confidence.RELIABLE, "Primary key"),
FieldDescriptor("intel_call_objections", "person_id", Confidence.RELIABLE, "FK to crm_people"),
FieldDescriptor("intel_call_objections", "objection_type", Confidence.RELIABLE, "Objection type"),
FieldDescriptor("intel_call_objections", "objection_text", Confidence.RELIABLE, "Objection text"),
FieldDescriptor("intel_call_objections", "intensity", Confidence.RELIABLE, "Intensity"),
FieldDescriptor("intel_call_objections", "was_resolved", Confidence.RELIABLE, "Resolution flag"),
FieldDescriptor("intel_call_objections", "raised_at", Confidence.RELIABLE, "Raised timestamp"),
],
),
]
_CONCEPT_INDEX: dict[str, ConceptDescriptor] = {concept.concept_id: concept for concept in CONCEPTS}
def get_concept(concept_id: str) -> ConceptDescriptor | None:
return _CONCEPT_INDEX.get(concept_id)
def all_concepts() -> list[ConceptDescriptor]:
return CONCEPTS
INTENT_CONCEPT_MAP: dict[str, list[str]] = {
"last_contacted": ["last_contact_read_model", "communication_events", "person_identity"],
"interested_clients": ["property_interest", "person_identity", "lead_funnel"],
"qd_score": ["qd_score", "person_identity"],
"pipeline": ["opportunities", "lead_funnel", "person_identity"],
"site_visits": ["site_visits", "person_identity", "property_interest"],
"timeline": ["communication_events", "person_identity"],
"objections": ["call_objections", "communication_events", "person_identity"],
"broker_performance": ["lead_funnel", "opportunities"],
"next_action": ["next_best_action", "person_identity", "lead_funnel"],
"inventory": ["inventory", "property_interest"],
"extracted_facts": ["extracted_facts", "person_identity"],
"client_360": [
"person_identity",
"lead_funnel",
"qd_score",
"communication_events",
"property_interest",
"opportunities",
"next_best_action",
],
}
def concepts_for_intent(intent: str) -> list[ConceptDescriptor]:
ids = INTENT_CONCEPT_MAP.get(intent, ["person_identity", "lead_funnel"])
return [_CONCEPT_INDEX[concept_id] for concept_id in ids if concept_id in _CONCEPT_INDEX]
def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]:
return {
"table": field.table,
"column": field.column,
"confidence": field.confidence,
"description": field.description,
**({"notes": field.notes} if field.notes else {}),
**({"valid_values": list(field.valid_values)} if field.valid_values else {}),
**({"examples": list(field.examples)} if field.examples else {}),
}
def concept_to_dict(concept: ConceptDescriptor) -> dict[str, Any]:
return {
"concept_id": concept.concept_id,
"label": concept.label,
"description": concept.description,
"authoritative_fields": [_field_to_dict(field) for field in concept.authoritative_fields],
"deprecated_fields": [_field_to_dict(field) for field in concept.deprecated_fields],
"preferred_join_paths": [
{
"from": f"{join.from_table}.{join.from_col}",
"to": f"{join.to_table}.{join.to_col}",
"join_type": join.join_type,
**({"notes": join.notes} if join.notes else {}),
}
for join in concept.preferred_join_paths
],
**({"usage_notes": concept.usage_notes} if concept.usage_notes else {}),
}
def build_semantic_context_for_planner(detected_intents: list[str], *, max_concepts: int = 5) -> str:
import json
seen: set[str] = set()
ordered: list[ConceptDescriptor] = []
for intent in detected_intents:
for concept in concepts_for_intent(intent):
if concept.concept_id not in seen:
seen.add(concept.concept_id)
ordered.append(concept)
relevant_topics = set(detected_intents)
if "last_contacted" in relevant_topics or "timeline" in relevant_topics:
relevant_topics.add("contact_recency")
if "interested_clients" in relevant_topics or "inventory" in relevant_topics:
relevant_topics.add("property_interest")
if "qd_score" in relevant_topics:
relevant_topics.add("qd_score")
column_metadata = [
{
"table": item.table,
"column": item.column,
"topic": item.topic,
"meaning": item.meaning,
"reliability": item.reliability,
**({"valid_values": list(item.valid_values)} if item.valid_values else {}),
**({"examples": list(item.examples)} if item.examples else {}),
**({"usage": item.usage} if item.usage else {}),
**({"avoid": item.avoid} if item.avoid else {}),
}
for item in COLUMN_METADATA
if item.topic in relevant_topics or item.avoid
]
return json.dumps(
{
"catalog_version": CATALOG_VERSION,
"concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]],
"column_metadata": column_metadata,
"global_rules": [
"Do not invent enum values. Use only valid_values from column_metadata when filtering enum-like columns.",
"Queries that return zero rows because of impossible enum filters are invalid plans.",
"For contact recency, use read_last_contacted.last_contact_at or intel_interactions.happened_at.",
"Do not use fields marked avoid=true for the main business filter.",
],
},
separators=(",", ":"),
)