From 61258978e15ea75a23e09634f6caae3d3c42b7a4 Mon Sep 17 00:00:00 2001 From: Sagnik Date: Fri, 24 Apr 2026 15:44:00 +0530 Subject: [PATCH] fix: Oracle Canvas Metadata and deterministic semantic repair --- backend/oracle/natural_db_agent.py | 39 +++- backend/oracle/plan_verifier.py | 213 +++++++++++++++++- backend/oracle/semantic_catalog.py | 162 ++++++++++++- .../tests/oracle/test_natural_db_semantics.py | 168 ++++++++++++++ 4 files changed, 568 insertions(+), 14 deletions(-) create mode 100644 backend/tests/oracle/test_natural_db_semantics.py diff --git a/backend/oracle/natural_db_agent.py b/backend/oracle/natural_db_agent.py index 1d0bef15..231e185a 100644 --- a/backend/oracle/natural_db_agent.py +++ b/backend/oracle/natural_db_agent.py @@ -360,8 +360,13 @@ class NaturalDbAgent: "Plan verifier repaired violations: " + ", ".join(violation.rule for violation in verification.violations if violation.severity == "blocking") ) - if not verification.passed and verification.repair_failed: - warnings.append("Plan verifier found violations but repair failed. Proceeding with original SQL.") + if not verification.passed: + details = "; ".join( + f"{violation.rule}: {violation.detail}" + for violation in verification.violations + if violation.severity == "blocking" + ) + raise RuntimeError(f"Oracle SQL plan failed verification: {details}") if verification.notes: warnings.extend(verification.notes) @@ -463,6 +468,25 @@ class NaturalDbAgent: f"\n\nPREVIOUS ATTEMPT FAILED - EXECUTION FEEDBACK:\n{prior_feedback}\n" "You must address the feedback and change the query accordingly." ) + example_section = ( + "CANONICAL SQL PATTERNS:\n" + "Generic top QD clients:\n" + "SELECT p.full_name, p.primary_email, p.primary_phone, q.current_value AS qd_score, q.score_type, q.computed_at " + "FROM intel_qd_scores q JOIN crm_people p ON p.person_id = q.person_id " + "WHERE q.score_type = 'overall' ORDER BY q.current_value DESC LIMIT 8;\n" + "Property-scoped lowest QD clients:\n" + "SELECT p.full_name, p.primary_email, pi.project_name, q.current_value AS qd_score " + "FROM crm_property_interests pi JOIN crm_people p ON p.person_id = pi.person_id " + "JOIN intel_qd_scores q ON q.person_id = p.person_id " + "WHERE q.score_type = 'overall' AND pi.project_name ILIKE '%Atri Surya Toron%' " + "ORDER BY q.current_value ASC LIMIT 5;\n" + "Recently contacted high-interest clients:\n" + "SELECT p.full_name, p.primary_email, lc.last_contact_at, lc.last_channel, q.current_value AS qd_score " + "FROM read_last_contacted lc JOIN crm_people p ON p.person_id = lc.person_id " + "LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id AND q.score_type = 'overall' " + "WHERE lc.last_contact_at >= NOW() - INTERVAL '3 months' " + "ORDER BY q.current_value DESC NULLS LAST LIMIT 10;" + ) response = await runtime_llm_service.chat( provider_id="sglang", @@ -472,7 +496,8 @@ class NaturalDbAgent: "Use the semantic catalog as the business source of truth, not raw column guessing. " "Generate exactly one SELECT or WITH query. " "Return strict JSON with keys: sql, title, rationale. " - "Never generate INSERT, UPDATE, DELETE, DDL, COPY, or permission statements." + "Never generate INSERT, UPDATE, DELETE, DDL, COPY, or permission statements. " + "Never use columns that are not present in the raw schema." ), messages=[ { @@ -480,6 +505,14 @@ class NaturalDbAgent: "content": ( f"SEMANTIC CATALOG:\n{semantic_context}\n\n" f"RAW SCHEMA:\n{schema_brief}\n\n" + "NON-NEGOTIABLE DATA RULES:\n" + "- crm_people is identity only; it does not own QD scores.\n" + "- For QD score prompts, join intel_qd_scores.person_id to crm_people.person_id and use intel_qd_scores.current_value.\n" + "- Valid intel_qd_scores.score_type values are: overall, intent, engagement, urgency, financial_qualification.\n" + "- Never filter intel_qd_scores.score_type = 'QD'. For generic QD prompts use score_type = 'overall'.\n" + "- For contact recency, use read_last_contacted.last_contact_at or intel_interactions.happened_at.\n" + "- Do not use edge_communication_events.timestamp or crm_property_interests.last_discussed_at for contact recency.\n\n" + f"{example_section}\n\n" f"DETECTED INTENTS: {', '.join(detected_intents)}\n\n" f"USER QUESTION:\n{prompt}\n\n" f"ROW CAP: {row_limit}\n" diff --git a/backend/oracle/plan_verifier.py b/backend/oracle/plan_verifier.py index ec1bc542..46489250 100644 --- a/backend/oracle/plan_verifier.py +++ b/backend/oracle/plan_verifier.py @@ -11,7 +11,7 @@ import re from dataclasses import dataclass, field from typing import Any -from .semantic_catalog import build_semantic_context_for_planner +from .semantic_catalog import VALID_QD_SCORE_TYPES, build_semantic_context_for_planner logger = logging.getLogger(__name__) @@ -39,8 +39,151 @@ _HALLUCINATED_COLUMNS: list[tuple[str, str]] = [ ("intel_interactions", "sentiment"), ("crm_leads", "last_contacted_at"), ("crm_people", "last_contact"), + ("read_last_contacted", "last_contacted_at"), + ("read_last_contacted", "days_since_last_contact"), + ("read_last_contacted", "staleness_label"), ] +_CONTACT_INTENTS = {"last_contacted", "timeline"} + + +def _extract_limit_from_prompt(prompt: str, default: int) -> int: + lowered = prompt.lower() + numeric_match = re.search(r"\b(?:top|last|latest|recent|first|show|which|give me)\s+(\d{1,4})\b", lowered) + if numeric_match: + return max(1, min(int(numeric_match.group(1)), default)) + words = { + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5, + "six": 6, + "seven": 7, + "eight": 8, + "nine": 9, + "ten": 10, + "eleven": 11, + "twelve": 12, + "fifteen": 15, + "twenty": 20, + } + word_match = re.search( + r"\b(?:top|last|latest|recent|first|show|which|give me)\s+" + r"(one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|fifteen|twenty)\b", + lowered, + ) + if word_match: + return max(1, min(words[word_match.group(1)], default)) + return default + + +def _canonical_qd_sql(prompt: str, row_limit: int) -> str: + limit = _extract_limit_from_prompt(prompt, row_limit) + lowered = prompt.lower() + direction = "ASC" if any(token in lowered for token in ("lowest", "least", "bottom", "weakest")) else "DESC" + project_filter = "" + project_join = "" + project_match = re.search(r"\bin\s+([A-Za-z0-9][A-Za-z0-9 .&'-]{2,80})(?:\?|$)", prompt) + if project_match: + project_name = project_match.group(1).strip() + if not re.search(r"\b(last|month|months|week|weeks|day|days|year|years)\b", project_name, re.IGNORECASE): + project_join = "JOIN crm_property_interests pi ON pi.person_id = p.person_id " + escaped = project_name.replace("'", "''") + project_filter = f"AND pi.project_name ILIKE '%{escaped}%' " + return ( + "SELECT p.full_name, p.primary_email, p.primary_phone, " + "q.current_value AS qd_score, q.score_type, q.computed_at " + "FROM intel_qd_scores q " + "JOIN crm_people p ON p.person_id = q.person_id " + f"{project_join}" + "WHERE q.score_type = 'overall' " + f"{project_filter}" + f"ORDER BY q.current_value {direction} " + f"LIMIT {limit}" + ) + + +def _canonical_recent_contact_sql(prompt: str, row_limit: int) -> str: + limit = _extract_limit_from_prompt(prompt, row_limit) + interval = "3 months" + lowered = prompt.lower() + interval_match = re.search(r"\b(?:last|past|recent)\s+(\d{1,3})\s+(day|days|week|weeks|month|months|year|years)\b", lowered) + if interval_match: + count, unit = interval_match.groups() + interval = f"{int(count)} {unit}" + return ( + "SELECT p.full_name, p.primary_email, p.primary_phone, " + "lc.last_contact_at, lc.last_channel, lc.days_since_contact, " + "q.current_value AS qd_score " + "FROM read_last_contacted lc " + "JOIN crm_people p ON p.person_id = lc.person_id " + "LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id AND q.score_type = 'overall' " + f"WHERE lc.last_contact_at >= NOW() - INTERVAL '{interval}' " + "ORDER BY q.current_value DESC NULLS LAST, lc.last_contact_at DESC " + f"LIMIT {limit}" + ) + + +def _semantic_rule_repair( + *, + prompt: str, + detected_intents: list[str], + row_limit: int, + violations: list[VerificationViolation], +) -> str | None: + violation_rules = {violation.rule for violation in violations} + if "qd_score" in detected_intents and violation_rules.intersection({"wrong_score_column", "impossible_score_type"}): + return _canonical_qd_sql(prompt, row_limit) + if set(detected_intents).intersection(_CONTACT_INTENTS) and violation_rules.intersection( + {"deprecated_timestamp", "hallucinated_column"} + ): + return _canonical_recent_contact_sql(prompt, row_limit) + return None + + +def _extract_score_type_literals(sql: str) -> list[str]: + literals: list[str] = [] + eq_pattern = re.compile( + r"(?:\b\w+\.)?score_type\s*=\s*'([^']+)'", + re.IGNORECASE, + ) + in_pattern = re.compile( + r"(?:\b\w+\.)?score_type\s+in\s*\(([^)]*)\)", + re.IGNORECASE | re.DOTALL, + ) + literals.extend(match.group(1) for match in eq_pattern.finditer(sql)) + for match in in_pattern.finditer(sql): + literals.extend(re.findall(r"'([^']+)'", match.group(1))) + return literals + + +def _references_table(sql_lower: str, table: str) -> bool: + return bool(re.search(rf"\b(?:from|join)\s+(?:public\.)?{re.escape(table)}\b", sql_lower)) + + +def _aliases_for_table(sql: str, table: str) -> set[str]: + aliases = {table} + pattern = re.compile( + rf"\b(?:from|join)\s+(?:public\.)?{re.escape(table)}(?:\s+(?:as\s+)?([a-zA-Z_][a-zA-Z0-9_]*))?", + re.IGNORECASE, + ) + for match in pattern.finditer(sql): + alias = match.group(1) + if alias and alias.lower() not in {"on", "where", "join", "left", "right", "inner", "outer", "full", "cross"}: + aliases.add(alias) + return aliases + + +def _references_column(sql: str, sql_lower: str, table: str, column: str) -> bool: + if not _references_table(sql_lower, table): + return False + for alias in _aliases_for_table(sql, table): + qualified = re.compile(rf"\b{re.escape(alias)}\.{re.escape(column)}\b", re.IGNORECASE) + if qualified.search(sql): + return True + return False + @dataclass class VerificationViolation: @@ -63,9 +206,10 @@ class VerificationResult: class PlanVerifier: def verify(self, sql: str, prompt: str, detected_intents: list[str], row_limit: int) -> VerificationResult: - del prompt, detected_intents + del prompt violations: list[VerificationViolation] = [] sql_lower = sql.lower() + intent_set = set(detected_intents) if _DESTRUCTIVE.search(sql): violations.append( @@ -77,20 +221,35 @@ class PlanVerifier: ) for table, column in _BAD_TIMESTAMP_PATTERNS: - if table in sql_lower and column in sql_lower: + if intent_set.intersection(_CONTACT_INTENTS) and _references_column(sql, sql_lower, table, column): violations.append( VerificationViolation( rule="deprecated_timestamp", detail=( f"SQL references {table}.{column}, which is sparse or deprecated. " - "Use intel_interactions.happened_at or read_last_contacted.last_contacted_at." + "Use intel_interactions.happened_at or read_last_contacted.last_contact_at." + ), + severity="blocking", + ) + ) + + valid_score_types = {value.lower() for value in VALID_QD_SCORE_TYPES} + for literal in _extract_score_type_literals(sql): + if literal.lower() not in valid_score_types: + violations.append( + VerificationViolation( + rule="impossible_score_type", + detail=( + f"SQL filters intel_qd_scores.score_type with impossible value '{literal}'. " + "Valid values are: " + ", ".join(VALID_QD_SCORE_TYPES) + ". " + "For generic QD prompts, use score_type = 'overall'." ), severity="blocking", ) ) for table, column in _BAD_SCORE_PATTERNS: - if table in sql_lower and column in sql_lower: + if _references_column(sql, sql_lower, table, column): violations.append( VerificationViolation( rule="wrong_score_column", @@ -103,7 +262,7 @@ class PlanVerifier: ) for table, column in _HALLUCINATED_COLUMNS: - if table in sql_lower and column in sql_lower: + if _references_column(sql, sql_lower, table, column): violations.append( VerificationViolation( rule="hallucinated_column", @@ -182,6 +341,22 @@ class PlanVerifier: recheck.notes.append( "Repaired violations: " + ", ".join(violation.rule for violation in blocking) ) + if not recheck.passed: + semantic_repair = _semantic_rule_repair( + prompt=prompt, + detected_intents=detected_intents, + row_limit=row_limit, + violations=blocking, + ) + if semantic_repair: + semantic_recheck = self.verify(semantic_repair, prompt, detected_intents, row_limit) + semantic_recheck.original_sql = sql + semantic_recheck.was_repaired = True + semantic_recheck.repair_attempted = True + semantic_recheck.notes.append( + "Semantic rule repair applied: " + ", ".join(violation.rule for violation in blocking) + ) + return semantic_recheck return recheck async def _repair_sql( @@ -196,6 +371,30 @@ class PlanVerifier: ) -> str: semantic_ctx = build_semantic_context_for_planner(detected_intents, max_concepts=4) violation_text = "\n".join(f"- [{violation.rule}] {violation.detail}" for violation in violations) + hard_rules = ( + "Hard repair rules:\n" + "- crm_people is identity only. It has no QD score source-of-truth column.\n" + "- For QD score prompts, use intel_qd_scores.current_value and join crm_people on person_id.\n" + "- Valid intel_qd_scores.score_type values are: " + + ", ".join(VALID_QD_SCORE_TYPES) + + ".\n" + "- Never use score_type = 'QD'. For generic QD prompts use score_type = 'overall'.\n" + "- For recent contact prompts, use read_last_contacted.last_contact_at or intel_interactions.happened_at.\n" + "- Never use edge_communication_events.timestamp or crm_property_interests.last_discussed_at for contact recency." + ) + canonical_examples = ( + "Canonical repair examples:\n" + "Generic QD ranking:\n" + "SELECT p.full_name, p.primary_email, p.primary_phone, q.current_value AS qd_score, q.score_type, q.computed_at " + "FROM intel_qd_scores q JOIN crm_people p ON p.person_id = q.person_id " + "WHERE q.score_type = 'overall' ORDER BY q.current_value DESC LIMIT 8;\n" + "Recent contact ranking:\n" + "SELECT p.full_name, p.primary_email, lc.last_contact_at, lc.last_channel, q.current_value AS qd_score " + "FROM read_last_contacted lc JOIN crm_people p ON p.person_id = lc.person_id " + "LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id AND q.score_type = 'overall' " + "WHERE lc.last_contact_at >= NOW() - INTERVAL '3 months' " + "ORDER BY q.current_value DESC NULLS LAST LIMIT 10;" + ) response = await llm_service.chat( provider_id="sglang", @@ -210,6 +409,8 @@ class PlanVerifier: "content": ( f"Original prompt: {prompt}\n\n" f"Semantic catalog:\n{semantic_ctx}\n\n" + f"{hard_rules}\n\n" + f"{canonical_examples}\n\n" f"Violations:\n{violation_text}\n\n" f"Broken SQL:\n{sql}\n\n" f"Row cap: {row_limit}\n\n" diff --git a/backend/oracle/semantic_catalog.py b/backend/oracle/semantic_catalog.py index e5154dd1..10e82bd4 100644 --- a/backend/oracle/semantic_catalog.py +++ b/backend/oracle/semantic_catalog.py @@ -29,6 +29,8 @@ class FieldDescriptor: confidence: str description: str notes: str = "" + valid_values: tuple[str, ...] = () + examples: tuple[str, ...] = () @dataclass(frozen=True) @@ -54,6 +56,115 @@ class ConceptDescriptor: CATALOG_VERSION = "velocity_semantic_v2026_04_25_01" + +@dataclass(frozen=True) +class ColumnMetadata: + table: str + column: str + topic: str + meaning: str + reliability: str + valid_values: tuple[str, ...] = () + examples: tuple[str, ...] = () + usage: str = "" + avoid: bool = False + + +VALID_QD_SCORE_TYPES: tuple[str, ...] = ( + "overall", + "intent", + "engagement", + "urgency", + "financial_qualification", +) + + +COLUMN_METADATA: list[ColumnMetadata] = [ + ColumnMetadata( + "intel_qd_scores", + "score_type", + "qd_score", + "Score family/category. There is no score_type value named QD.", + Confidence.RELIABLE, + valid_values=VALID_QD_SCORE_TYPES, + examples=("overall", "intent", "engagement"), + usage=( + "For generic QD score prompts, prefer score_type = 'overall'. " + "For specific intent/engagement/urgency/financial prompts, use the matching valid value. " + "Never filter score_type = 'QD'." + ), + ), + ColumnMetadata( + "intel_qd_scores", + "current_value", + "qd_score", + "Authoritative numeric score value for the selected score_type.", + Confidence.RELIABLE, + examples=("98.0", "72.4"), + usage="Rank, sort, average, or threshold QD-style scores with this column.", + ), + ColumnMetadata( + "intel_qd_scores", + "computed_at", + "qd_score", + "Timestamp when the score was computed.", + Confidence.RELIABLE, + examples=("2026-04-18T00:00:00"), + usage="Use for score freshness, not client contact recency.", + ), + ColumnMetadata( + "intel_interactions", + "happened_at", + "contact_recency", + "Primary timestamp for client contact and interaction recency.", + Confidence.RELIABLE, + usage="Use for contacted, last contacted, recent contact, activity, and timeline prompts.", + ), + ColumnMetadata( + "read_last_contacted", + "last_contact_at", + "contact_recency", + "Precomputed per-client last contact timestamp.", + Confidence.RELIABLE, + usage="Prefer for client-level last-contact summaries when this read model is available.", + ), + ColumnMetadata( + "edge_communication_events", + "timestamp", + "contact_recency", + "Legacy/sparse event timestamp that is not reliable for Oracle CRM recency.", + Confidence.SPARSE, + usage="Do not use for contact prompts.", + avoid=True, + ), + ColumnMetadata( + "crm_property_interests", + "last_discussed_at", + "contact_recency", + "Sparse legacy field; property interest does not prove recent contact.", + Confidence.SPARSE, + usage="Do not use as the primary recency filter.", + avoid=True, + ), + ColumnMetadata( + "crm_property_interests", + "project_name", + "property_interest", + "Human-readable project/property name attached to a client's interest.", + Confidence.RELIABLE, + examples=("Atri Surya Toron", "Godrej Elevate"), + usage="Use ILIKE filters for property/project scoped prompts.", + ), + ColumnMetadata( + "crm_property_interests", + "interest_level", + "property_interest", + "Interest strength label or score imported from CRM enrichment.", + Confidence.RELIABLE, + usage="Use with project_name and person_id to rank interested clients or properties.", + ), +] + CONCEPTS: list[ConceptDescriptor] = [ ConceptDescriptor( concept_id="person_identity", @@ -95,7 +206,14 @@ CONCEPTS: list[ConceptDescriptor] = [ authoritative_fields=[ FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"), FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"), - FieldDescriptor("intel_qd_scores", "score_type", Confidence.RELIABLE, "Score family"), + FieldDescriptor( + "intel_qd_scores", + "score_type", + Confidence.RELIABLE, + "Score family", + notes="Valid values are overall, intent, engagement, urgency, financial_qualification. There is no value named QD.", + valid_values=VALID_QD_SCORE_TYPES, + ), FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"), ], deprecated_fields=[ @@ -105,7 +223,9 @@ CONCEPTS: list[ConceptDescriptor] = [ ], usage_notes=( "When a prompt mentions QD, qualification, desire, or intent score, " - "use intel_qd_scores.current_value. Do not substitute engagement_score." + "use intel_qd_scores.current_value. Do not substitute engagement_score. " + "Do not filter score_type = 'QD'. For generic QD prompts, use score_type = 'overall'. " + "Use intent, engagement, urgency, or financial_qualification only when the prompt asks for that specific family." ), ), ConceptDescriptor( @@ -141,10 +261,10 @@ CONCEPTS: list[ConceptDescriptor] = [ description="Per-person last-contact summary materialization.", authoritative_fields=[ FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"), - FieldDescriptor("read_last_contacted", "last_contacted_at", Confidence.RELIABLE, "Last contact time"), + FieldDescriptor("read_last_contacted", "last_contact_at", Confidence.RELIABLE, "Last contact time"), FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"), - FieldDescriptor("read_last_contacted", "days_since_last_contact", Confidence.RELIABLE, "Recency in days"), - FieldDescriptor("read_last_contacted", "staleness_label", Confidence.RELIABLE, "Hot/warm/cold bucket"), + FieldDescriptor("read_last_contacted", "days_since_contact", Confidence.RELIABLE, "Recency in days"), + FieldDescriptor("read_last_contacted", "interactions_last_90d", Confidence.RELIABLE, "Recent interaction volume"), ], deprecated_fields=[ FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"), @@ -318,6 +438,8 @@ def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]: "confidence": field.confidence, "description": field.description, **({"notes": field.notes} if field.notes else {}), + **({"valid_values": list(field.valid_values)} if field.valid_values else {}), + **({"examples": list(field.examples)} if field.examples else {}), } @@ -351,10 +473,40 @@ def build_semantic_context_for_planner(detected_intents: list[str], *, max_conce if concept.concept_id not in seen: seen.add(concept.concept_id) ordered.append(concept) + relevant_topics = set(detected_intents) + if "last_contacted" in relevant_topics or "timeline" in relevant_topics: + relevant_topics.add("contact_recency") + if "interested_clients" in relevant_topics or "inventory" in relevant_topics: + relevant_topics.add("property_interest") + if "qd_score" in relevant_topics: + relevant_topics.add("qd_score") + + column_metadata = [ + { + "table": item.table, + "column": item.column, + "topic": item.topic, + "meaning": item.meaning, + "reliability": item.reliability, + **({"valid_values": list(item.valid_values)} if item.valid_values else {}), + **({"examples": list(item.examples)} if item.examples else {}), + **({"usage": item.usage} if item.usage else {}), + **({"avoid": item.avoid} if item.avoid else {}), + } + for item in COLUMN_METADATA + if item.topic in relevant_topics or item.avoid + ] return json.dumps( { "catalog_version": CATALOG_VERSION, "concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]], + "column_metadata": column_metadata, + "global_rules": [ + "Do not invent enum values. Use only valid_values from column_metadata when filtering enum-like columns.", + "Queries that return zero rows because of impossible enum filters are invalid plans.", + "For contact recency, use read_last_contacted.last_contact_at or intel_interactions.happened_at.", + "Do not use fields marked avoid=true for the main business filter.", + ], }, separators=(",", ":"), ) diff --git a/backend/tests/oracle/test_natural_db_semantics.py b/backend/tests/oracle/test_natural_db_semantics.py new file mode 100644 index 00000000..892b89e1 --- /dev/null +++ b/backend/tests/oracle/test_natural_db_semantics.py @@ -0,0 +1,168 @@ +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + + +def test_semantic_context_exposes_qd_score_type_values(): + from oracle.semantic_catalog import VALID_QD_SCORE_TYPES, build_semantic_context_for_planner + + context = build_semantic_context_for_planner(["qd_score"]) + + assert "score_type" in context + assert "QD" in context + for score_type in VALID_QD_SCORE_TYPES: + assert score_type in context + + +def test_verifier_rejects_impossible_qd_score_type_filter(): + from oracle.plan_verifier import plan_verifier + + sql = """ + SELECT p.full_name, q.current_value + FROM intel_qd_scores q + JOIN crm_people p ON p.person_id = q.person_id + WHERE q.score_type = 'QD' + ORDER BY q.current_value DESC + LIMIT 8 + """ + + result = plan_verifier.verify(sql, "Give me top QD clients", ["qd_score"], 8) + + assert not result.passed + assert any(violation.rule == "impossible_score_type" for violation in result.violations) + + +def test_verifier_allows_valid_qd_score_type_filter(): + from oracle.plan_verifier import plan_verifier + + sql = """ + SELECT p.full_name, q.current_value AS qd_score + FROM intel_qd_scores q + JOIN crm_people p ON p.person_id = q.person_id + WHERE q.score_type = 'overall' + ORDER BY q.current_value DESC + LIMIT 8 + """ + + result = plan_verifier.verify(sql, "Give me top QD clients", ["qd_score"], 8) + + assert result.passed + + +def test_verifier_semantically_repairs_bad_qd_column_even_if_llm_repair_repeats_it(): + import asyncio + + from oracle.plan_verifier import plan_verifier + + broken_sql = """ + SELECT p.full_name, p.qd_score + FROM crm_people p + ORDER BY p.qd_score DESC + LIMIT 8 + """ + + class BadRepairService: + async def chat(self, **kwargs): + return {"message": {"parsedJson": {"sql": broken_sql}}} + + result = asyncio.run( + plan_verifier.verify_and_repair( + broken_sql, + "Give me the top eight clients which has the highest QD Score", + ["qd_score"], + 50, + BadRepairService(), + ) + ) + + assert result.passed + assert result.was_repaired + assert "intel_qd_scores" in result.sql + assert "q.score_type = 'overall'" in result.sql + assert "p.qd_score" not in result.sql + assert "LIMIT 8" in result.sql + + +def test_verifier_semantic_qd_repair_preserves_lowest_project_scope(): + import asyncio + + from oracle.plan_verifier import plan_verifier + + broken_sql = """ + SELECT p.full_name, p.qd_score + FROM crm_people p + ORDER BY p.qd_score ASC + LIMIT 50 + """ + + class BadRepairService: + async def chat(self, **kwargs): + return {"message": {"parsedJson": {"sql": broken_sql}}} + + result = asyncio.run( + plan_verifier.verify_and_repair( + broken_sql, + "Which five clients have the lowest QD Scores in Atri Surya Toron?", + ["qd_score"], + 50, + BadRepairService(), + ) + ) + + assert result.passed + assert "crm_property_interests" in result.sql + assert "pi.project_name ILIKE '%Atri Surya Toron%'" in result.sql + assert "ORDER BY q.current_value ASC" in result.sql + assert "LIMIT 5" in result.sql + + +def test_verifier_rejects_legacy_recency_columns_for_contact_prompts(): + from oracle.plan_verifier import plan_verifier + + sql = """ + SELECT p.full_name, max(e.timestamp) AS last_contacted_at + FROM edge_communication_events e + JOIN crm_people p ON p.person_id = e.person_id + WHERE e.timestamp >= now() - interval '3 months' + GROUP BY p.full_name + LIMIT 5 + """ + + result = plan_verifier.verify(sql, "Who contacted us recently?", ["last_contacted"], 5) + + assert not result.passed + assert any(violation.rule == "deprecated_timestamp" for violation in result.violations) + + +def test_verifier_repairs_contact_prompt_to_live_last_contact_column(): + import asyncio + + from oracle.plan_verifier import plan_verifier + + broken_sql = """ + SELECT p.full_name, lc.last_contacted_at + FROM read_last_contacted lc + JOIN crm_people p ON p.person_id = lc.person_id + WHERE lc.last_contacted_at >= NOW() - INTERVAL '3 months' + LIMIT 10 + """ + + class BadRepairService: + async def chat(self, **kwargs): + return {"message": {"parsedJson": {"sql": broken_sql}}} + + result = asyncio.run( + plan_verifier.verify_and_repair( + broken_sql, + "Who are the clients who contacted us in last three months which are showing most interest?", + ["last_contacted", "qd_score"], + 50, + BadRepairService(), + ) + ) + + assert result.passed + assert "lc.last_contact_at" in result.sql + assert "lc.last_contacted_at" not in result.sql + assert "INTERVAL '3 months'" in result.sql