From cf602822b0c4c2baf5b1fa1efefcc8a36ffee531 Mon Sep 17 00:00:00 2001 From: Sagnik Date: Fri, 24 Apr 2026 05:14:11 +0530 Subject: [PATCH] fix: Oracle Canvas JSON Component Generation planning and orchestration logic --- backend/oracle/execution_profiler.py | 202 +++++++++++ backend/oracle/natural_db_agent.py | 425 ++++++++++++++++++------ backend/oracle/plan_verifier.py | 235 +++++++++++++ backend/oracle/prompt_orchestrator.py | 66 +++- backend/oracle/semantic_catalog.py | 360 ++++++++++++++++++++ backend/oracle/visualization_planner.py | 382 +++++++++++++++++++++ 6 files changed, 1555 insertions(+), 115 deletions(-) create mode 100644 backend/oracle/execution_profiler.py create mode 100644 backend/oracle/plan_verifier.py create mode 100644 backend/oracle/semantic_catalog.py create mode 100644 backend/oracle/visualization_planner.py diff --git a/backend/oracle/execution_profiler.py b/backend/oracle/execution_profiler.py new file mode 100644 index 00000000..f7cb291a --- /dev/null +++ b/backend/oracle/execution_profiler.py @@ -0,0 +1,202 @@ +""" +oracle/execution_profiler.py + +Post-execution quality checks for Oracle natural DB queries. +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + +_STALE_THRESHOLD_DAYS = 365 + + +@dataclass +class QualityIssue: + code: str + description: str + severity: str + replan_hint: str + + +@dataclass +class ProfileResult: + passed: bool + row_count: int + issues: list[QualityIssue] = field(default_factory=list) + replan_hints: list[str] = field(default_factory=list) + suggested_component_type: str | None = None + + +def _extract_cardinality_from_prompt(prompt: str) -> int | None: + lowered = prompt.lower() + numeric_match = re.search(r"\b(?:top|last|latest|recent|first|show|which)\s+(\d{1,4})\b", lowered) + if numeric_match: + return int(numeric_match.group(1)) + + words = { + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5, + "six": 6, + "seven": 7, + "eight": 8, + "nine": 9, + "ten": 10, + "eleven": 11, + "twelve": 12, + "fifteen": 15, + "twenty": 20, + } + word_match = re.search( + r"\b(?:top|last|latest|recent|first|show|which)\s+" + r"(one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|fifteen|twenty)\b", + lowered, + ) + if word_match: + return words.get(word_match.group(1)) + return None + + +def _all_null_measures(rows: list[dict[str, Any]], columns: list[str]) -> bool: + if not rows or not columns: + return False + + numeric_columns: list[str] = [] + for column in columns: + saw_numeric = False + all_null = True + for row in rows[:20]: + value = row.get(column) + if value is not None: + all_null = False + if isinstance(value, (int, float)): + saw_numeric = True + if saw_numeric: + numeric_columns.append(column) + if not all_null: + return False + + if numeric_columns: + return True + + return all(all(value is None for value in row.values()) for row in rows[:5]) + + +def _timestamps_are_stale(rows: list[dict[str, Any]], columns: list[str]) -> bool: + timestamp_columns = [ + column for column in columns if any(token in column for token in ("_at", "date", "timestamp", "when", "time")) + ] + if not timestamp_columns or not rows: + return False + + now = datetime.now(timezone.utc) + checked = 0 + stale = 0 + for row in rows[:20]: + for column in timestamp_columns: + value = row.get(column) + if value is None or not isinstance(value, str): + continue + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + continue + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + checked += 1 + if (now - parsed).days > _STALE_THRESHOLD_DAYS: + stale += 1 + return checked > 0 and stale == checked + + +class ExecutionProfiler: + def profile( + self, + *, + rows: list[dict[str, Any]], + columns: list[str], + sql: str, + prompt: str, + source_tables: list[str], + row_limit: int, + ) -> ProfileResult: + del source_tables, row_limit + issues: list[QualityIssue] = [] + sql_lower = sql.lower() + + if len(rows) == 0: + issues.append( + QualityIssue( + code="zero_rows", + description="Query returned zero rows.", + severity="blocking", + replan_hint=( + "The query returned zero rows. Use authoritative recency and business-semantic columns " + "from the semantic catalog. Avoid sparse or deprecated timestamp fields." + ), + ) + ) + elif _all_null_measures(rows, columns): + issues.append( + QualityIssue( + code="all_null_measures", + description="Rows returned but numeric measure columns are null.", + severity="blocking", + replan_hint=( + "The query returned rows but numeric measures are null. " + "Check join keys and metric source columns." + ), + ) + ) + + requested_n = _extract_cardinality_from_prompt(prompt) + if requested_n is not None and len(rows) > requested_n * 3: + issues.append( + QualityIssue( + code="cardinality_mismatch", + description=f"Prompt asked for about {requested_n} rows but query returned {len(rows)}.", + severity="warning", + replan_hint=f"Respect the requested result count and add LIMIT {requested_n}.", + ) + ) + + if rows and _timestamps_are_stale(rows, columns): + issues.append( + QualityIssue( + code="stale_timestamps", + description="Returned timestamps appear stale.", + severity="warning", + replan_hint="The result timestamps are stale. Use authoritative recency fields.", + ) + ) + + suggested_type: str | None = None + if len(rows) == 1 and len(columns) <= 4: + non_null_values = [value for value in rows[0].values() if value is not None] + if non_null_values and all(isinstance(value, (int, float)) for value in non_null_values): + suggested_type = "kpiTile" + issues.append( + QualityIssue( + code="single_row_scalar", + description="Single scalar row is better rendered as KPI tile.", + severity="warning", + replan_hint="", + ) + ) + + blocking = [issue for issue in issues if issue.severity == "blocking"] + return ProfileResult( + passed=len(blocking) == 0, + row_count=len(rows), + issues=issues, + replan_hints=[issue.replan_hint for issue in issues if issue.replan_hint], + suggested_component_type=suggested_type, + ) + + +execution_profiler = ExecutionProfiler() diff --git a/backend/oracle/natural_db_agent.py b/backend/oracle/natural_db_agent.py index fb86995c..1d0bef15 100644 --- a/backend/oracle/natural_db_agent.py +++ b/backend/oracle/natural_db_agent.py @@ -1,9 +1,13 @@ """ Natural DB-first Oracle agent. -The LLM can plan arbitrary analytical SELECT statements over the full public -Velocity app schema. The executor enforces only a read-only SQL contract and a -UI row cap; write paths stay behind typed API endpoints. +Pipeline: +1. schema introspection +2. semantic SQL planning +3. plan verification and optional repair +4. SQL execution +5. execution quality profiling and auto-replan +6. visualization planning from actual result shape """ from __future__ import annotations @@ -17,6 +21,10 @@ from decimal import Decimal from typing import Any from backend.services.runtime_llm_service import runtime_llm_service +from .execution_profiler import execution_profiler +from .plan_verifier import plan_verifier +from .semantic_catalog import CATALOG_VERSION, build_semantic_context_for_planner +from .visualization_planner import VisualizationDecision, visualization_planner try: import asyncpg # type: ignore @@ -30,7 +38,7 @@ DESTRUCTIVE_SQL = re.compile( re.IGNORECASE, ) TABLE_REF_RE = re.compile(r"\b(?:from|join)\s+([a-zA-Z_][\w.]*)(?:\s|$)", re.IGNORECASE) -CTE_NAME_RE = re.compile(r"\b(?:with|,)\s*([a-zA-Z_][\w]*)\s+as\s*\(", re.IGNORECASE) +_MAX_REPLAN_ATTEMPTS = 2 def _json_safe(value: Any) -> Any: @@ -39,9 +47,9 @@ def _json_safe(value: Any) -> Any: if isinstance(value, Decimal): return float(value) if isinstance(value, (list, tuple)): - return [_json_safe(v) for v in value] + return [_json_safe(item) for item in value] if isinstance(value, dict): - return {str(k): _json_safe(v) for k, v in value.items()} + return {str(key): _json_safe(item) for key, item in value.items()} return value @@ -60,9 +68,11 @@ def db_ready() -> bool: async def connect_db() -> Any: if asyncpg is None: raise RuntimeError("asyncpg is not installed.") + read_database_url = os.getenv("ORACLE_READ_DATABASE_URL", "") if read_database_url and not read_database_url.startswith("PLACEHOLDER"): return await asyncpg.connect(read_database_url) + if all(os.getenv(name) for name in ("VELOCITY_DB_READ_NAME", "VELOCITY_DB_READ_USER", "VELOCITY_DB_READ_PASSWORD")): return await asyncpg.connect( host=os.getenv("VELOCITY_DB_READ_HOST", os.getenv("VELOCITY_DB_HOST", "127.0.0.1")), @@ -71,9 +81,11 @@ async def connect_db() -> Any: user=os.environ["VELOCITY_DB_READ_USER"], password=os.environ["VELOCITY_DB_READ_PASSWORD"], ) + database_url = os.getenv("DATABASE_URL", "") if database_url and not database_url.startswith("PLACEHOLDER"): return await asyncpg.connect(database_url) + return await asyncpg.connect( host=os.getenv("VELOCITY_DB_HOST", "127.0.0.1"), port=int(os.getenv("VELOCITY_DB_PORT", "5432")), @@ -95,8 +107,12 @@ class NaturalQueryResult: source_tables: list[str] component_type: str warnings: list[str] + visualization_decision: VisualizationDecision | None = None + replan_count: int = 0 + semantic_catalog_version: str = CATALOG_VERSION def as_dict(self) -> dict[str, Any]: + decision = self.visualization_decision return { "prompt": self.prompt, "sql": self.sql, @@ -108,6 +124,23 @@ class NaturalQueryResult: "sourceTables": self.source_tables, "componentType": self.component_type, "warnings": self.warnings, + "semanticCatalogVersion": self.semantic_catalog_version, + "replanCount": self.replan_count, + "visualizationDecision": { + "xAxis": decision.x_axis, + "yAxis": decision.y_axis, + "dimensionCols": decision.dimension_cols, + "measureCols": decision.measure_cols, + "widthMode": decision.width_mode, + "minHeightPx": decision.min_height_px, + "skeletonVariant": decision.skeleton_variant, + "vizParams": decision.viz_params, + "dataBindings": decision.data_bindings, + "confidence": decision.confidence, + "reasoning": decision.reasoning, + } + if decision + else {}, } @@ -118,48 +151,74 @@ def sanitize_sql(sql: str, row_limit: int) -> tuple[str, list[str], list[str]]: raise ValueError("Oracle SQL agent only accepts SELECT or WITH queries.") if DESTRUCTIVE_SQL.search(clean): raise ValueError("Oracle SQL agent blocked non-read SQL.") - tables = [] + + tables: list[str] = [] for match in TABLE_REF_RE.finditer(clean): table = match.group(1).split(".")[-1].strip('"').lower() if table in {"lateral", "select"}: continue if table and table not in tables: tables.append(table) + + if "limit" not in clean.lower(): + clean += f" LIMIT {row_limit}" + warnings.append(f"Row cap {row_limit} auto-applied (query had no LIMIT).") return clean, tables, warnings -def infer_component_type(prompt: str, columns: list[str], rows: list[dict[str, Any]]) -> str: - lower = prompt.lower() - if any(term in lower for term in ("timeline", "conversation", "whatsapp", "message", "call", "email", "history")): - return "activity_stream" - if len(rows) == 1 and len(columns) <= 5 and any(isinstance(rows[0].get(c), (int, float)) for c in columns): - return "kpi_tile" - if any(c.endswith("_at") or c in {"date", "when", "timestamp", "happened_at"} for c in columns): - if len(rows) > 1 and any(term in lower for term in ("trend", "over time", "timeseries")): - return "line_chart" - if any(term in lower for term in ("timeline", "activity", "last", "recent")): - return "activity_stream" - numeric_cols = [c for c in columns if rows and isinstance(rows[0].get(c), (int, float))] - if numeric_cols and any(term in lower for term in ("count", "compare", "distribution", "most", "top", "by ")): - return "bar_chart" - return "table" +def _detect_intents(prompt: str) -> list[str]: + lowered = prompt.lower() + intents: list[str] = [] + if any(token in lowered for token in ( + "last contact", "last contacted", "recently contacted", "last call", + "last message", "last whatsapp", "contacted us", "follow-up", "follow up", + "days since", "no contact", + )): + intents.append("last_contacted") -def _looks_like_property_rollup_prompt(prompt: str) -> bool: - lower = prompt.lower() - property_terms = ("property", "properties", "project", "projects") - aggregate_terms = ("top", "most", "majority", "highest", "popular", "common") - interest_terms = ("interest", "interested", "liked", "preference", "preferences") - return ( - any(term in lower for term in property_terms) - and any(term in lower for term in aggregate_terms) - and any(term in lower for term in interest_terms) - ) + if any(token in lowered for token in ( + "interested in", "shown interest", "interest in", "interested clients", + "project interest", "property interest", + )): + intents.append("interested_clients") + + if any(token in lowered for token in ("qd score", "qualification score", "desire score", "intent score", "qd")): + intents.append("qd_score") + + if any(token in lowered for token in ("pipeline", "stage", "funnel", "kanban", "deal")): + intents.append("pipeline") + + if any(token in lowered for token in ("site visit", "visited", "visit")): + intents.append("site_visits") + + if any(token in lowered for token in ("call", "transcript", "whatsapp", "email", "message", "conversation", "interaction", "timeline", "activity")): + intents.append("timeline") + + if any(token in lowered for token in ("objection", "concern", "complaint", "pushback")): + intents.append("objections") + + if any(token in lowered for token in ("broker", "agent performance", "referral")): + intents.append("broker_performance") + + if any(token in lowered for token in ("next action", "next step", "what should i do", "follow-up priority", "action queue")): + intents.append("next_action") + + if any(token in lowered for token in ("project", "unit", "inventory", "available", "price", "configuration")): + intents.append("inventory") + + if any(token in lowered for token in ("client 360", "dossier", "profile")): + intents.append("client_360") + + if any(token in lowered for token in ("fact", "memory", "promise", "commitment", "budget", "preference")): + intents.append("extracted_facts") + + return intents or ["last_contacted"] def title_from_prompt(prompt: str) -> str: words = re.sub(r"\s+", " ", prompt.strip()).strip(" ?.!") - return words[:1].upper() + words[1:80] if words else "Oracle Query Result" + return (words[:1].upper() + words[1:80]) if words else "Oracle Query Result" class NaturalDbAgent: @@ -187,19 +246,22 @@ class NaturalDbAgent: ORDER BY c.table_name, c.ordinal_position """ ) - counts = {} + counts: dict[str, int | None] = {} for table in public_tables: exists = await conn.fetchval("SELECT to_regclass($1)", f"public.{table}") counts[table] = None if not exists else int(await conn.fetchval(f'SELECT COUNT(*) FROM "{table}"')) + tables: dict[str, dict[str, Any]] = {} for row in rows: entry = tables.setdefault(row["table_name"], {"columns": [], "rowCount": counts.get(row["table_name"])}) - entry["columns"].append({ - "name": row["column_name"], - "dataType": row["data_type"], - "udtName": row["udt_name"], - "nullable": row["is_nullable"] == "YES", - }) + entry["columns"].append( + { + "name": row["column_name"], + "dataType": row["data_type"], + "udtName": row["udt_name"], + "nullable": row["is_nullable"] == "YES", + } + ) return {"available": True, "tables": tables, "allowedTables": public_tables} finally: if own_conn: @@ -228,14 +290,19 @@ class NaturalDbAgent: return { "counts": counts, "expectedSyntheticV2Counts": expected, - "missingTables": [t for t, count in counts.items() if count is None], - "emptyTables": [t for t, count in counts.items() if count == 0], - "belowExpected": {t: {"expected": e, "actual": counts.get(t)} for t, e in expected.items() if (counts.get(t) or 0) < e}, + "missingTables": [table for table, count in counts.items() if count is None], + "emptyTables": [table for table, count in counts.items() if count == 0], + "belowExpected": { + table: {"expected": expected_count, "actual": counts.get(table)} + for table, expected_count in expected.items() + if (counts.get(table) or 0) < expected_count + }, } async def execute_prompt(self, prompt: str, *, row_limit: int = 100, conn: Any | None = None) -> NaturalQueryResult: if not prompt.strip(): raise ValueError("Prompt is required.") + own_conn = conn is None if conn is None: if not db_ready(): @@ -243,91 +310,249 @@ class NaturalDbAgent: conn = await connect_db() try: catalog = await self.schema_catalog(conn) - plan = await self._plan_sql(prompt, catalog, row_limit) - return await self._run_plan(conn, prompt, plan, row_limit) + detected_intents = _detect_intents(prompt) + return await self._pipeline( + conn=conn, + prompt=prompt, + catalog=catalog, + detected_intents=detected_intents, + row_limit=row_limit, + attempt=0, + prior_feedback=None, + ) finally: if own_conn: await conn.close() - async def _run_plan(self, conn: Any, prompt: str, plan: dict[str, Any], row_limit: int) -> NaturalQueryResult: + async def _pipeline( + self, + *, + conn: Any, + prompt: str, + catalog: dict[str, Any], + detected_intents: list[str], + row_limit: int, + attempt: int, + prior_feedback: str | None, + ) -> NaturalQueryResult: + warnings: list[str] = [] + + plan = await self._plan_sql( + prompt=prompt, + catalog=catalog, + detected_intents=detected_intents, + row_limit=row_limit, + prior_feedback=prior_feedback, + ) raw_sql = str(plan.get("sql") or "").strip() if not raw_sql: raise RuntimeError("Natural SQL planner returned no SQL.") - sql, tables, warnings = sanitize_sql(raw_sql, row_limit) + + verification = await plan_verifier.verify_and_repair( + sql=raw_sql, + prompt=prompt, + detected_intents=detected_intents, + row_limit=row_limit, + llm_service=runtime_llm_service, + ) + if verification.was_repaired: + warnings.append( + "Plan verifier repaired violations: " + + ", ".join(violation.rule for violation in verification.violations if violation.severity == "blocking") + ) + if not verification.passed and verification.repair_failed: + warnings.append("Plan verifier found violations but repair failed. Proceeding with original SQL.") + if verification.notes: + warnings.extend(verification.notes) + + effective_sql, source_tables, sanitize_warnings = sanitize_sql(verification.sql, row_limit) + warnings.extend(sanitize_warnings) + try: - records = await conn.fetch(sql) + records = await conn.fetch(effective_sql) except Exception as exc: raise RuntimeError(f"Natural SQL execution failed: {exc}") from exc + rows = [_json_safe(dict(record)) for record in records] columns = list(rows[0].keys()) if rows else [] - component_type = infer_component_type(prompt, columns, rows) + + profile = execution_profiler.profile( + rows=rows, + columns=columns, + sql=effective_sql, + prompt=prompt, + source_tables=source_tables, + row_limit=row_limit, + ) + + if not profile.passed and attempt < _MAX_REPLAN_ATTEMPTS: + feedback = " | ".join(profile.replan_hints) + warnings.append(f"Auto-replan triggered (attempt {attempt + 1}): {feedback[:160]}") + return await self._pipeline( + conn=conn, + prompt=prompt, + catalog=catalog, + detected_intents=detected_intents, + row_limit=row_limit, + attempt=attempt + 1, + prior_feedback=feedback, + ) + + if not profile.passed: + for issue in profile.issues: + if issue.severity == "blocking": + warnings.append(f"Quality issue after {attempt} replans: [{issue.code}] {issue.description}") + + visualization_decision = visualization_planner.plan( + rows=rows, + columns=columns, + prompt=prompt, + source_tables=source_tables, + profile_suggested_type=profile.suggested_component_type, + title_from_planner=str(plan.get("title") or ""), + ) + + title = visualization_decision.title or str(plan.get("title") or title_from_prompt(prompt)) + summary = str(plan.get("rationale") or f"SQL-backed Oracle result from {', '.join(source_tables) or 'Velocity CRM'}.") + return NaturalQueryResult( prompt=prompt, - sql=sql, - title=str(plan.get("title") or title_from_prompt(prompt)), - summary=str(plan.get("rationale") or f"SQL-backed Oracle result from {', '.join(tables) or 'Velocity CRM'}."), + sql=effective_sql, + title=title, + summary=summary, columns=columns, rows=rows, row_count=len(rows), - source_tables=tables, - component_type=component_type, + source_tables=source_tables, + component_type=visualization_decision.component_type, warnings=warnings, + visualization_decision=visualization_decision, + replan_count=attempt, + semantic_catalog_version=CATALOG_VERSION, ) - async def _plan_sql(self, prompt: str, catalog: dict[str, Any], row_limit: int) -> dict[str, Any]: + async def _plan_sql( + self, + *, + prompt: str, + catalog: dict[str, Any], + detected_intents: list[str], + row_limit: int, + prior_feedback: str | None = None, + ) -> dict[str, Any]: try: providers = runtime_llm_service._provider_catalog() except Exception: providers = {} if not providers: - raise RuntimeError("No runtime LLM providers are configured for Oracle natural planning.") - schema_brief = json.dumps(catalog.get("tables", {}), default=str)[:16000] - semantic_rules = """ -Velocity SQL semantics: -- QD score means intel_qd_scores.current_value. Do not use crm_people.engagement_score, crm_leads.engagement_score, or intel_interactions.engagement_score as QD. -- For project/property scoped prompts such as "in Atri Surya Toron", "interested in", "for project", or "for property", use crm_property_interests as the primary scoping table. -- Prefer crm_property_interests.project_name for textual project matching. inventory_projects is optional for enrichment, not the primary client-to-project relationship. -- For client lists scoped to a project, join crm_people to crm_property_interests on person_id and filter project_name case-insensitively. -- For lowest/highest/best/worst QD prompts, sort on intel_qd_scores.current_value ASC/DESC as requested. -- Respect the user-requested cardinality exactly when possible. If the prompt says five/top 5/lowest 5, return LIMIT 5. -- When listing clients, include person identity fields from crm_people such as person_id, full_name, primary_phone, and primary_email. -- When aggregating top properties/projects, group by crm_property_interests.project_name and count DISTINCT person_id. -- You may use any table in the public schema that is relevant to the question. -- Use only read-only PostgreSQL SELECT/CTE queries. -""" - system = ( - "You are Oracle's read-only PostgreSQL planner. Generate one useful SELECT or WITH query " - "for the user's CRM question. You have access to the full public schema. Return JSON with sql, title, rationale. " - "Never generate INSERT, UPDATE, DELETE, DDL, COPY, or permission statements." - ) - try: - response = await runtime_llm_service.chat( - provider_id="sglang", - model=None, - system_prompt=system, - messages=[{ + raise RuntimeError("No runtime LLM providers configured for Oracle natural planning.") + + schema_full = catalog.get("tables", {}) + relevant_tables = self._relevant_tables_for_intents(detected_intents) + schema_brief_dict = { + table: meta + for table, meta in schema_full.items() + if table in relevant_tables or table in {"crm_people", "crm_leads", "inventory_projects", "inventory_units"} + } + schema_brief = json.dumps(schema_brief_dict, default=str)[:14000] + semantic_context = build_semantic_context_for_planner(detected_intents, max_concepts=5) + + replan_section = "" + if prior_feedback: + replan_section = ( + f"\n\nPREVIOUS ATTEMPT FAILED - EXECUTION FEEDBACK:\n{prior_feedback}\n" + "You must address the feedback and change the query accordingly." + ) + + response = await runtime_llm_service.chat( + provider_id="sglang", + model=None, + system_prompt=( + "You are Oracle's read-only PostgreSQL planner for Project Velocity CRM. " + "Use the semantic catalog as the business source of truth, not raw column guessing. " + "Generate exactly one SELECT or WITH query. " + "Return strict JSON with keys: sql, title, rationale. " + "Never generate INSERT, UPDATE, DELETE, DDL, COPY, or permission statements." + ), + messages=[ + { "role": "user", "content": ( - f"Schema:\n{schema_brief}\n\n" - f"Semantic rules:\n{semantic_rules}\n\n" - f"Question:\n{prompt}\n\n" - f"Row cap: {row_limit}\n\n" - "Return strict JSON with keys: sql, title, rationale." + f"SEMANTIC CATALOG:\n{semantic_context}\n\n" + f"RAW SCHEMA:\n{schema_brief}\n\n" + f"DETECTED INTENTS: {', '.join(detected_intents)}\n\n" + f"USER QUESTION:\n{prompt}\n\n" + f"ROW CAP: {row_limit}\n" + f"{replan_section}\n\n" + "Return strict JSON: {\"sql\": \"...\", \"title\": \"...\", \"rationale\": \"...\"}" ), - }], - temperature=0.05, - response_format="json", - metadata={"agent": "oracle_natural_db_agent"}, - ) - message = response.get("message") or {} - parsed = message.get("parsedJson") - content = message.get("content") or "{}" - if not isinstance(parsed, dict): - parsed = json.loads(content) if isinstance(content, str) else content - if isinstance(parsed, dict) and parsed.get("sql"): - return parsed - except Exception as exc: - raise RuntimeError(f"Natural DB planner LLM failed: {exc}") from exc + } + ], + temperature=0.05, + response_format="json", + metadata={ + "agent": "oracle_natural_db_agent_v2", + "intents": detected_intents, + "catalog_version": CATALOG_VERSION, + }, + ) + message = response.get("message") or {} + parsed = message.get("parsedJson") + content = message.get("content") or "{}" + if not isinstance(parsed, dict): + parsed = json.loads(content) if isinstance(content, str) else content + if isinstance(parsed, dict) and parsed.get("sql"): + return parsed raise RuntimeError("Natural DB planner returned no valid SQL.") + @staticmethod + def _relevant_tables_for_intents(intents: list[str]) -> set[str]: + intent_tables: dict[str, set[str]] = { + "last_contacted": { + "intel_interactions", + "crm_people", + "crm_leads", + "read_last_contacted", + "crm_last_contact_read_model", + }, + "interested_clients": { + "crm_property_interests", + "crm_people", + "inventory_projects", + "intel_qd_scores", + }, + "qd_score": {"intel_qd_scores", "crm_people"}, + "pipeline": {"crm_opportunities", "crm_leads", "crm_people", "inventory_projects"}, + "site_visits": {"intel_visits", "crm_people", "inventory_projects"}, + "timeline": { + "intel_interactions", + "intel_calls", + "intel_whatsapp_threads", + "intel_messages", + "intel_emails", + "intel_visits", + "crm_people", + }, + "objections": {"intel_call_objections", "crm_people", "inventory_projects"}, + "broker_performance": {"crm_leads", "crm_opportunities", "crm_people"}, + "next_action": {"read_next_best_action", "crm_people"}, + "inventory": {"inventory_projects", "inventory_units", "crm_property_interests"}, + "client_360": { + "crm_people", + "crm_leads", + "intel_qd_scores", + "crm_property_interests", + "crm_opportunities", + "intel_interactions", + "read_last_contacted", + "read_next_best_action", + }, + "extracted_facts": {"intel_extracted_facts", "crm_people"}, + } + tables: set[str] = set() + for intent in intents: + tables.update(intent_tables.get(intent, set())) + return tables + + natural_db_agent = NaturalDbAgent() diff --git a/backend/oracle/plan_verifier.py b/backend/oracle/plan_verifier.py new file mode 100644 index 00000000..ec1bc542 --- /dev/null +++ b/backend/oracle/plan_verifier.py @@ -0,0 +1,235 @@ +""" +oracle/plan_verifier.py + +Verify planned SQL before execution and optionally repair common semantic errors. +""" +from __future__ import annotations + +import json +import logging +import re +from dataclasses import dataclass, field +from typing import Any + +from .semantic_catalog import build_semantic_context_for_planner + +logger = logging.getLogger(__name__) + +_DESTRUCTIVE = re.compile( + r"\b(insert|update|delete|drop|alter|truncate|copy|create|grant|revoke|call|execute|do|merge)\b", + re.IGNORECASE, +) + +_BAD_TIMESTAMP_PATTERNS: list[tuple[str, str]] = [ + ("edge_communication_events", "timestamp"), + ("crm_property_interests", "last_discussed_at"), + ("crm_property_interests", "last_interaction"), +] + +_BAD_SCORE_PATTERNS: list[tuple[str, str]] = [ + ("crm_people", "engagement_score"), + ("crm_leads", "engagement_score"), + ("intel_interactions", "engagement_score"), + ("crm_people", "qd_score"), + ("crm_leads", "qd_score"), +] + +_HALLUCINATED_COLUMNS: list[tuple[str, str]] = [ + ("intel_interactions", "broker_id"), + ("intel_interactions", "sentiment"), + ("crm_leads", "last_contacted_at"), + ("crm_people", "last_contact"), +] + + +@dataclass +class VerificationViolation: + rule: str + detail: str + severity: str + + +@dataclass +class VerificationResult: + passed: bool + sql: str + original_sql: str + violations: list[VerificationViolation] = field(default_factory=list) + was_repaired: bool = False + repair_attempted: bool = False + repair_failed: bool = False + notes: list[str] = field(default_factory=list) + + +class PlanVerifier: + def verify(self, sql: str, prompt: str, detected_intents: list[str], row_limit: int) -> VerificationResult: + del prompt, detected_intents + violations: list[VerificationViolation] = [] + sql_lower = sql.lower() + + if _DESTRUCTIVE.search(sql): + violations.append( + VerificationViolation( + rule="destructive_dml", + detail="SQL contains a write or DDL statement.", + severity="blocking", + ) + ) + + for table, column in _BAD_TIMESTAMP_PATTERNS: + if table in sql_lower and column in sql_lower: + violations.append( + VerificationViolation( + rule="deprecated_timestamp", + detail=( + f"SQL references {table}.{column}, which is sparse or deprecated. " + "Use intel_interactions.happened_at or read_last_contacted.last_contacted_at." + ), + severity="blocking", + ) + ) + + for table, column in _BAD_SCORE_PATTERNS: + if table in sql_lower and column in sql_lower: + violations.append( + VerificationViolation( + rule="wrong_score_column", + detail=( + f"SQL references {table}.{column}, which is not the QD source of truth. " + "Use intel_qd_scores.current_value." + ), + severity="blocking", + ) + ) + + for table, column in _HALLUCINATED_COLUMNS: + if table in sql_lower and column in sql_lower: + violations.append( + VerificationViolation( + rule="hallucinated_column", + detail=f"SQL references {table}.{column}, which does not exist in the live schema.", + severity="blocking", + ) + ) + + if "limit" not in sql_lower: + violations.append( + VerificationViolation( + rule="missing_limit", + detail=f"SQL has no LIMIT clause; executor will enforce row cap {row_limit}.", + severity="warning", + ) + ) + + if re.search(r"\bselect\s+\*\b", sql_lower) and sql_lower.count("join") > 1: + violations.append( + VerificationViolation( + rule="select_star_join", + detail="SELECT * with multiple JOINs may create noisy wide rows.", + severity="warning", + ) + ) + + blocking = [violation for violation in violations if violation.severity == "blocking"] + return VerificationResult( + passed=len(blocking) == 0, + sql=sql, + original_sql=sql, + violations=violations, + ) + + async def verify_and_repair( + self, + sql: str, + prompt: str, + detected_intents: list[str], + row_limit: int, + llm_service: Any | None = None, + ) -> VerificationResult: + result = self.verify(sql, prompt, detected_intents, row_limit) + if result.passed: + return result + + blocking = [violation for violation in result.violations if violation.severity == "blocking"] + if not blocking: + return result + + result.repair_attempted = True + if llm_service is None: + result.repair_failed = True + result.notes.append("No LLM service available for SQL repair.") + return result + + try: + repaired_sql = await self._repair_sql( + sql=sql, + prompt=prompt, + violations=blocking, + detected_intents=detected_intents, + row_limit=row_limit, + llm_service=llm_service, + ) + except Exception as exc: + logger.warning("plan_verifier repair failed: %s", exc) + result.repair_failed = True + result.notes.append(f"Repair failed: {exc}") + return result + + recheck = self.verify(repaired_sql, prompt, detected_intents, row_limit) + recheck.original_sql = sql + recheck.was_repaired = True + recheck.repair_attempted = True + recheck.notes.append( + "Repaired violations: " + ", ".join(violation.rule for violation in blocking) + ) + return recheck + + async def _repair_sql( + self, + *, + sql: str, + prompt: str, + violations: list[VerificationViolation], + detected_intents: list[str], + row_limit: int, + llm_service: Any, + ) -> str: + semantic_ctx = build_semantic_context_for_planner(detected_intents, max_concepts=4) + violation_text = "\n".join(f"- [{violation.rule}] {violation.detail}" for violation in violations) + + response = await llm_service.chat( + provider_id="sglang", + model=None, + system_prompt=( + "You are Oracle's SQL repair agent. " + "Fix only the listed violations. Return strict JSON with key 'sql'." + ), + messages=[ + { + "role": "user", + "content": ( + f"Original prompt: {prompt}\n\n" + f"Semantic catalog:\n{semantic_ctx}\n\n" + f"Violations:\n{violation_text}\n\n" + f"Broken SQL:\n{sql}\n\n" + f"Row cap: {row_limit}\n\n" + "Return JSON: {\"sql\": \"\"}" + ), + } + ], + temperature=0.0, + response_format="json", + metadata={"agent": "oracle_plan_verifier_repair"}, + ) + message = response.get("message") or {} + parsed = message.get("parsedJson") + if not isinstance(parsed, dict): + content = message.get("content") or "{}" + parsed = json.loads(content) if isinstance(content, str) else {} + repaired = str(parsed.get("sql") or "").strip() + if not repaired: + raise ValueError("Repair LLM returned empty SQL.") + return repaired + + +plan_verifier = PlanVerifier() diff --git a/backend/oracle/prompt_orchestrator.py b/backend/oracle/prompt_orchestrator.py index 1ebe29dd..83a40e43 100644 --- a/backend/oracle/prompt_orchestrator.py +++ b/backend/oracle/prompt_orchestrator.py @@ -175,6 +175,27 @@ def _infer_chart_axes(rows: list[dict[str, Any]], columns: list[str]) -> tuple[s return x_axis, y_axis +def _canonical_plan_type(plan_type: str) -> str: + normalized = str(plan_type or "").strip() + mapping = { + "pipeline_board": "pipeline_board", + "pipelineBoard": "pipeline_board", + "bar_chart": "bar_chart", + "barChart": "bar_chart", + "geo_map": "geo_map", + "geoMap": "geo_map", + "table": "table", + "line_chart": "line_chart", + "lineChart": "line_chart", + "kpi_tile": "kpi_tile", + "kpiTile": "kpi_tile", + "activity_stream": "activity_stream", + "activityStream": "activity_stream", + "timeline": "activity_stream", + } + return mapping.get(normalized, normalized or "table") + + _DATASET_MAP: dict[str, str] = { "pipeline_board": "crm_opportunity_pipeline", "bar_chart": "oracle_property_interest_rollup", @@ -794,35 +815,35 @@ class PromptOrchestrator: ) -> dict[str, Any]: rows = result.get("rows") or [] columns = result.get("columns") or (list(rows[0].keys()) if rows else []) - ctype = str(result.get("componentType") or "table") - mapped_type = self._map_type(ctype) + ctype_raw = str(result.get("componentType") or "table") + ctype = _canonical_plan_type(ctype_raw) + mapped_type = self._map_type(ctype_raw) dataset = "oracle_natural_sql" component_id = str(uuid.uuid4()) + viz_decision = result.get("visualizationDecision") or {} x_axis, y_axis = _infer_chart_axes(rows, columns) - bindings = self._default_bindings(ctype) + bindings = dict(viz_decision.get("dataBindings") or self._default_bindings(ctype)) viz_params = { **self._default_viz_params(ctype, dataset, rows), + **dict(viz_decision.get("vizParams") or {}), "columns": columns, "sqlSummary": result.get("summary"), "sourceTables": result.get("sourceTables", []), "rowCount": result.get("rowCount", len(rows)), } - if ctype == "bar_chart": - if x_axis: + if mapped_type in {"barChart", "lineChart"}: + if not viz_params.get("xAxis") and x_axis: viz_params["xAxis"] = x_axis - bindings["dimensions"] = [x_axis] - if y_axis: + if not viz_params.get("yAxis") and y_axis: viz_params["yAxis"] = y_axis - bindings["measures"] = [y_axis] - elif ctype == "line_chart": - if x_axis: + if not bindings.get("dimensions") and x_axis: bindings["dimensions"] = [x_axis] - if y_axis: + if not bindings.get("measures") and y_axis: bindings["measures"] = [y_axis] comp: dict[str, Any] = { "componentId": component_id, "type": mapped_type, - "title": result.get("title") or self._generate_title(prompt, ctype), + "title": result.get("title") or self._generate_title(prompt, ctype_raw), "description": f"SQL-backed Oracle result from: \"{prompt[:96]}\"", "dataSourceDescriptor": { "descriptorId": str(uuid.uuid4()), @@ -849,12 +870,22 @@ class PromptOrchestrator: "sourceTables": result.get("sourceTables", []), "sqlSummary": result.get("summary"), }, - "renderingHints": self._rendering_hints(ctype), + "renderingHints": { + **self._rendering_hints(ctype), + **( + { + "estimatedHeightPx": int(viz_decision.get("minHeightPx", 0) or 0), + "skeletonVariant": str(viz_decision.get("skeletonVariant") or ""), + } + if viz_decision + else {} + ), + }, "layout": { "orderIndex": base_order + 100, "sectionId": section_id, - "widthMode": "full" if mapped_type in ("table", "pipelineBoard", "timeline", "activityStream") else "half", - "minHeightPx": 320, + "widthMode": str(viz_decision.get("widthMode") or ("full" if mapped_type in ("table", "pipelineBoard", "timeline", "activityStream") else "half")), + "minHeightPx": int(viz_decision.get("minHeightPx") or 320), "stickyHeader": False, }, "accessControls": { @@ -975,6 +1006,7 @@ class PromptOrchestrator: @staticmethod def _map_type(plan_type: str) -> str: + plan_type = _canonical_plan_type(plan_type) mapping = { "pipeline_board": "pipelineBoard", "bar_chart": "barChart", @@ -988,6 +1020,7 @@ class PromptOrchestrator: @staticmethod def _generate_title(prompt: str, comp_type: str) -> str: + comp_type = _canonical_plan_type(comp_type) labels = { "pipeline_board": "Pipeline View", "bar_chart": "Comparative Analysis", @@ -1001,6 +1034,7 @@ class PromptOrchestrator: @staticmethod def _default_viz_params(comp_type: str, dataset: str, rows: list[dict[str, Any]]) -> dict[str, Any]: + comp_type = _canonical_plan_type(comp_type) first_row = rows[0] if rows else {} inferred_columns = [key for key in first_row.keys() if key not in {"avatar"}] or ["name", "status"] table_columns_by_dataset: dict[str, list[str]] = { @@ -1038,10 +1072,12 @@ class PromptOrchestrator: @staticmethod def _default_bindings(comp_type: str) -> dict[str, Any]: + del comp_type return {"dimensions": [], "measures": [], "series": [], "filters": []} @staticmethod def _rendering_hints(comp_type: str) -> dict[str, Any]: + comp_type = _canonical_plan_type(comp_type) priority_map = { "pipeline_board": ("pipeline", 9), "bar_chart": ("chart", 8), "geo_map": ("map", 9), "table": ("table", 7), diff --git a/backend/oracle/semantic_catalog.py b/backend/oracle/semantic_catalog.py new file mode 100644 index 00000000..e5154dd1 --- /dev/null +++ b/backend/oracle/semantic_catalog.py @@ -0,0 +1,360 @@ +""" +oracle/semantic_catalog.py + +Business-semantic layer for Oracle's natural DB planner. + +This sits between raw schema introspection and SQL generation. It defines: +- authoritative tables and columns for business concepts +- deprecated or sparse fields the planner should avoid +- preferred join paths +- compact semantic context for the planner prompt +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +class Confidence: + RELIABLE = "reliable" + PARTIAL = "partial" + SPARSE = "sparse" + DEPRECATED = "deprecated" + + +@dataclass(frozen=True) +class FieldDescriptor: + table: str + column: str + confidence: str + description: str + notes: str = "" + + +@dataclass(frozen=True) +class JoinPath: + from_table: str + from_col: str + to_table: str + to_col: str + join_type: str = "INNER" + notes: str = "" + + +@dataclass +class ConceptDescriptor: + concept_id: str + label: str + description: str + authoritative_fields: list[FieldDescriptor] + deprecated_fields: list[FieldDescriptor] = field(default_factory=list) + preferred_join_paths: list[JoinPath] = field(default_factory=list) + usage_notes: str = "" + + +CATALOG_VERSION = "velocity_semantic_v2026_04_25_01" + +CONCEPTS: list[ConceptDescriptor] = [ + ConceptDescriptor( + concept_id="person_identity", + label="Client Identity", + description="Canonical identity record for a person in CRM.", + authoritative_fields=[ + FieldDescriptor("crm_people", "person_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("crm_people", "full_name", Confidence.RELIABLE, "Display name"), + FieldDescriptor("crm_people", "primary_email", Confidence.RELIABLE, "Email"), + FieldDescriptor("crm_people", "primary_phone", Confidence.RELIABLE, "Phone"), + FieldDescriptor("crm_people", "persona_labels", Confidence.PARTIAL, "Buyer persona labels"), + ], + usage_notes=( + "Anchor client-level queries on crm_people.person_id. " + "Treat crm_people as the identity source of truth." + ), + ), + ConceptDescriptor( + concept_id="lead_funnel", + label="Lead Funnel", + description="Lead ownership, stage, status, and urgency.", + authoritative_fields=[ + FieldDescriptor("crm_leads", "lead_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("crm_leads", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("crm_leads", "stage", Confidence.RELIABLE, "Current funnel stage"), + FieldDescriptor("crm_leads", "status", Confidence.RELIABLE, "Lead status"), + FieldDescriptor("crm_leads", "assigned_user_id", Confidence.RELIABLE, "Owning user"), + FieldDescriptor("crm_leads", "budget_band", Confidence.PARTIAL, "Budget band"), + FieldDescriptor("crm_leads", "urgency", Confidence.PARTIAL, "Urgency tag"), + ], + preferred_join_paths=[ + JoinPath("crm_people", "person_id", "crm_leads", "person_id"), + ], + ), + ConceptDescriptor( + concept_id="qd_score", + label="QD Score", + description="Qualification / Desire score source of truth.", + authoritative_fields=[ + FieldDescriptor("intel_qd_scores", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("intel_qd_scores", "current_value", Confidence.RELIABLE, "Authoritative QD score"), + FieldDescriptor("intel_qd_scores", "score_type", Confidence.RELIABLE, "Score family"), + FieldDescriptor("intel_qd_scores", "computed_at", Confidence.RELIABLE, "Score timestamp"), + ], + deprecated_fields=[ + FieldDescriptor("crm_people", "engagement_score", Confidence.DEPRECATED, "Not QD"), + FieldDescriptor("crm_leads", "engagement_score", Confidence.DEPRECATED, "Not QD"), + FieldDescriptor("intel_interactions", "engagement_score", Confidence.DEPRECATED, "Not QD"), + ], + usage_notes=( + "When a prompt mentions QD, qualification, desire, or intent score, " + "use intel_qd_scores.current_value. Do not substitute engagement_score." + ), + ), + ConceptDescriptor( + concept_id="communication_events", + label="Communication Events", + description="Authoritative recent-contact and interaction history source.", + authoritative_fields=[ + FieldDescriptor("intel_interactions", "interaction_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("intel_interactions", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("intel_interactions", "channel", Confidence.RELIABLE, "Interaction channel"), + FieldDescriptor("intel_interactions", "interaction_type", Confidence.RELIABLE, "Interaction type"), + FieldDescriptor("intel_interactions", "happened_at", Confidence.RELIABLE, "Primary recency timestamp"), + FieldDescriptor("intel_interactions", "summary", Confidence.RELIABLE, "Interaction summary"), + ], + deprecated_fields=[ + FieldDescriptor("edge_communication_events", "timestamp", Confidence.SPARSE, "Do not use for recency"), + FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.SPARSE, "Do not use for recency"), + ], + preferred_join_paths=[ + JoinPath("crm_people", "person_id", "intel_interactions", "person_id", "LEFT"), + JoinPath("intel_interactions", "interaction_id", "intel_calls", "interaction_id", "LEFT"), + JoinPath("intel_interactions", "interaction_id", "intel_messages", "interaction_id", "LEFT"), + JoinPath("intel_interactions", "interaction_id", "intel_emails", "interaction_id", "LEFT"), + ], + usage_notes=( + "For recent contact, last contact, or contacted us, prefer intel_interactions.happened_at. " + "Use read_last_contacted if available for precomputed summaries." + ), + ), + ConceptDescriptor( + concept_id="last_contact_read_model", + label="Last Contact Read Model", + description="Per-person last-contact summary materialization.", + authoritative_fields=[ + FieldDescriptor("read_last_contacted", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("read_last_contacted", "last_contacted_at", Confidence.RELIABLE, "Last contact time"), + FieldDescriptor("read_last_contacted", "last_channel", Confidence.RELIABLE, "Last contact channel"), + FieldDescriptor("read_last_contacted", "days_since_last_contact", Confidence.RELIABLE, "Recency in days"), + FieldDescriptor("read_last_contacted", "staleness_label", Confidence.RELIABLE, "Hot/warm/cold bucket"), + ], + deprecated_fields=[ + FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Stale field"), + ], + usage_notes=( + "If this table exists, prefer it for last-contact prompts over rebuilding recency from raw interactions." + ), + ), + ConceptDescriptor( + concept_id="next_best_action", + label="Next Best Action", + description="Precomputed follow-up action recommendations.", + authoritative_fields=[ + FieldDescriptor("read_next_best_action", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("read_next_best_action", "action_label", Confidence.RELIABLE, "Human-readable action"), + FieldDescriptor("read_next_best_action", "urgency", Confidence.RELIABLE, "Urgency"), + FieldDescriptor("read_next_best_action", "recommended_channel", Confidence.RELIABLE, "Suggested channel"), + FieldDescriptor("read_next_best_action", "execute_within_hours", Confidence.RELIABLE, "Action SLA"), + ], + ), + ConceptDescriptor( + concept_id="property_interest", + label="Property Interest", + description="Client-level project or unit interest records.", + authoritative_fields=[ + FieldDescriptor("crm_property_interests", "interest_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("crm_property_interests", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("crm_property_interests", "project_id", Confidence.PARTIAL, "FK to inventory_projects"), + FieldDescriptor("crm_property_interests", "project_name", Confidence.RELIABLE, "Primary text project scope"), + FieldDescriptor("crm_property_interests", "unit_id", Confidence.PARTIAL, "FK to inventory_units"), + FieldDescriptor("crm_property_interests", "interest_level", Confidence.RELIABLE, "Interest strength"), + FieldDescriptor("crm_property_interests", "configuration_preference", Confidence.PARTIAL, "Configuration"), + FieldDescriptor("crm_property_interests", "budget_min", Confidence.PARTIAL, "Minimum budget"), + FieldDescriptor("crm_property_interests", "budget_max", Confidence.PARTIAL, "Maximum budget"), + FieldDescriptor("crm_property_interests", "financing_plan", Confidence.PARTIAL, "Financing plan"), + FieldDescriptor("crm_property_interests", "notes", Confidence.PARTIAL, "Free-text notes"), + ], + deprecated_fields=[ + FieldDescriptor("crm_property_interests", "last_discussed_at", Confidence.DEPRECATED, "Do not use for recency"), + ], + preferred_join_paths=[ + JoinPath("crm_people", "person_id", "crm_property_interests", "person_id", "LEFT"), + JoinPath("crm_property_interests", "project_id", "inventory_projects", "project_id", "LEFT"), + ], + usage_notes=( + "For prompts scoped to a specific property or project, filter on crm_property_interests.project_name " + "case-insensitively. For top properties, group by project_name and count distinct person_id." + ), + ), + ConceptDescriptor( + concept_id="opportunities", + label="Opportunities", + description="Deal pipeline records.", + authoritative_fields=[ + FieldDescriptor("crm_opportunities", "opportunity_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("crm_opportunities", "lead_id", Confidence.RELIABLE, "FK to crm_leads"), + FieldDescriptor("crm_opportunities", "project_id", Confidence.RELIABLE, "FK to inventory_projects"), + FieldDescriptor("crm_opportunities", "stage", Confidence.RELIABLE, "Opportunity stage"), + FieldDescriptor("crm_opportunities", "value", Confidence.RELIABLE, "Deal value"), + FieldDescriptor("crm_opportunities", "probability", Confidence.PARTIAL, "Probability"), + FieldDescriptor("crm_opportunities", "next_action", Confidence.RELIABLE, "Next action"), + ], + preferred_join_paths=[ + JoinPath("crm_people", "person_id", "crm_leads", "person_id"), + JoinPath("crm_leads", "lead_id", "crm_opportunities", "lead_id", "LEFT"), + JoinPath("crm_opportunities", "project_id", "inventory_projects", "project_id", "LEFT"), + ], + ), + ConceptDescriptor( + concept_id="site_visits", + label="Site Visits", + description="Physical visit records and outcomes.", + authoritative_fields=[ + FieldDescriptor("intel_visits", "visit_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("intel_visits", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("intel_visits", "project_id", Confidence.PARTIAL, "FK to inventory_projects"), + FieldDescriptor("intel_visits", "project_name", Confidence.PARTIAL, "Project name"), + FieldDescriptor("intel_visits", "visited_at", Confidence.RELIABLE, "Visit timestamp"), + FieldDescriptor("intel_visits", "visit_notes", Confidence.RELIABLE, "Visit notes"), + ], + ), + ConceptDescriptor( + concept_id="inventory", + label="Inventory", + description="Project and unit master data.", + authoritative_fields=[ + FieldDescriptor("inventory_projects", "project_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("inventory_projects", "project_name", Confidence.RELIABLE, "Project name"), + FieldDescriptor("inventory_projects", "developer_name", Confidence.RELIABLE, "Developer"), + FieldDescriptor("inventory_projects", "micro_market", Confidence.RELIABLE, "Micro market"), + FieldDescriptor("inventory_units", "unit_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("inventory_units", "project_id", Confidence.RELIABLE, "FK to inventory_projects"), + FieldDescriptor("inventory_units", "configuration", Confidence.RELIABLE, "Configuration"), + FieldDescriptor("inventory_units", "price_current", Confidence.RELIABLE, "Current price"), + FieldDescriptor("inventory_units", "status", Confidence.RELIABLE, "Unit status"), + ], + ), + ConceptDescriptor( + concept_id="extracted_facts", + label="Extracted Facts", + description="AI-extracted CRM memory facts.", + authoritative_fields=[ + FieldDescriptor("intel_extracted_facts", "fact_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("intel_extracted_facts", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("intel_extracted_facts", "fact_type", Confidence.RELIABLE, "Fact type"), + FieldDescriptor("intel_extracted_facts", "fact_text", Confidence.RELIABLE, "Fact text"), + FieldDescriptor("intel_extracted_facts", "confidence", Confidence.RELIABLE, "Extraction confidence"), + FieldDescriptor("intel_extracted_facts", "effective_date", Confidence.PARTIAL, "Fact date"), + ], + ), + ConceptDescriptor( + concept_id="call_objections", + label="Call Objections", + description="Structured objections extracted from calls.", + authoritative_fields=[ + FieldDescriptor("intel_call_objections", "objection_id", Confidence.RELIABLE, "Primary key"), + FieldDescriptor("intel_call_objections", "person_id", Confidence.RELIABLE, "FK to crm_people"), + FieldDescriptor("intel_call_objections", "objection_type", Confidence.RELIABLE, "Objection type"), + FieldDescriptor("intel_call_objections", "objection_text", Confidence.RELIABLE, "Objection text"), + FieldDescriptor("intel_call_objections", "intensity", Confidence.RELIABLE, "Intensity"), + FieldDescriptor("intel_call_objections", "was_resolved", Confidence.RELIABLE, "Resolution flag"), + FieldDescriptor("intel_call_objections", "raised_at", Confidence.RELIABLE, "Raised timestamp"), + ], + ), +] + +_CONCEPT_INDEX: dict[str, ConceptDescriptor] = {concept.concept_id: concept for concept in CONCEPTS} + + +def get_concept(concept_id: str) -> ConceptDescriptor | None: + return _CONCEPT_INDEX.get(concept_id) + + +def all_concepts() -> list[ConceptDescriptor]: + return CONCEPTS + + +INTENT_CONCEPT_MAP: dict[str, list[str]] = { + "last_contacted": ["last_contact_read_model", "communication_events", "person_identity"], + "interested_clients": ["property_interest", "person_identity", "lead_funnel"], + "qd_score": ["qd_score", "person_identity"], + "pipeline": ["opportunities", "lead_funnel", "person_identity"], + "site_visits": ["site_visits", "person_identity", "property_interest"], + "timeline": ["communication_events", "person_identity"], + "objections": ["call_objections", "communication_events", "person_identity"], + "broker_performance": ["lead_funnel", "opportunities"], + "next_action": ["next_best_action", "person_identity", "lead_funnel"], + "inventory": ["inventory", "property_interest"], + "extracted_facts": ["extracted_facts", "person_identity"], + "client_360": [ + "person_identity", + "lead_funnel", + "qd_score", + "communication_events", + "property_interest", + "opportunities", + "next_best_action", + ], +} + + +def concepts_for_intent(intent: str) -> list[ConceptDescriptor]: + ids = INTENT_CONCEPT_MAP.get(intent, ["person_identity", "lead_funnel"]) + return [_CONCEPT_INDEX[concept_id] for concept_id in ids if concept_id in _CONCEPT_INDEX] + + +def _field_to_dict(field: FieldDescriptor) -> dict[str, Any]: + return { + "table": field.table, + "column": field.column, + "confidence": field.confidence, + "description": field.description, + **({"notes": field.notes} if field.notes else {}), + } + + +def concept_to_dict(concept: ConceptDescriptor) -> dict[str, Any]: + return { + "concept_id": concept.concept_id, + "label": concept.label, + "description": concept.description, + "authoritative_fields": [_field_to_dict(field) for field in concept.authoritative_fields], + "deprecated_fields": [_field_to_dict(field) for field in concept.deprecated_fields], + "preferred_join_paths": [ + { + "from": f"{join.from_table}.{join.from_col}", + "to": f"{join.to_table}.{join.to_col}", + "join_type": join.join_type, + **({"notes": join.notes} if join.notes else {}), + } + for join in concept.preferred_join_paths + ], + **({"usage_notes": concept.usage_notes} if concept.usage_notes else {}), + } + + +def build_semantic_context_for_planner(detected_intents: list[str], *, max_concepts: int = 5) -> str: + import json + + seen: set[str] = set() + ordered: list[ConceptDescriptor] = [] + for intent in detected_intents: + for concept in concepts_for_intent(intent): + if concept.concept_id not in seen: + seen.add(concept.concept_id) + ordered.append(concept) + return json.dumps( + { + "catalog_version": CATALOG_VERSION, + "concepts": [concept_to_dict(concept) for concept in ordered[:max_concepts]], + }, + separators=(",", ":"), + ) diff --git a/backend/oracle/visualization_planner.py b/backend/oracle/visualization_planner.py new file mode 100644 index 00000000..daf8f2f5 --- /dev/null +++ b/backend/oracle/visualization_planner.py @@ -0,0 +1,382 @@ +""" +oracle/visualization_planner.py + +Pick Oracle canvas renderer types from actual result shape. +""" +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any + + +@dataclass +class ColumnProfile: + name: str + is_numeric: bool + is_string: bool + is_datetime: bool + is_boolean: bool + null_rate: float + sample_values: list[Any] + + +@dataclass +class VisualizationDecision: + component_type: str + x_axis: str | None + y_axis: str | None + series_cols: list[str] + dimension_cols: list[str] + measure_cols: list[str] + title: str + width_mode: str + min_height_px: int + skeleton_variant: str + viz_params: dict[str, Any] + data_bindings: dict[str, Any] + confidence: float + reasoning: str + + +def _looks_like_timestamp(value: str) -> bool: + return bool(re.match(r"\d{4}-\d{2}-\d{2}", value)) + + +def _profile_columns(rows: list[dict[str, Any]], columns: list[str]) -> list[ColumnProfile]: + if not rows: + return [ColumnProfile(column, False, False, False, False, 1.0, []) for column in columns] + + sample_size = min(len(rows), 20) + profiles: list[ColumnProfile] = [] + for column in columns: + values = [rows[index].get(column) for index in range(sample_size)] + non_null = [value for value in values if value is not None] + null_rate = 1.0 - len(non_null) / sample_size if sample_size else 1.0 + profiles.append( + ColumnProfile( + name=column, + is_numeric=any(isinstance(value, (int, float)) for value in non_null), + is_string=any(isinstance(value, str) and not _looks_like_timestamp(value) for value in non_null[:5]), + is_datetime=any(isinstance(value, str) and _looks_like_timestamp(value) for value in non_null[:5]), + is_boolean=any(isinstance(value, bool) for value in non_null), + null_rate=null_rate, + sample_values=non_null[:3], + ) + ) + return profiles + + +_DIMENSION_HINTS = { + "name", "full_name", "project_name", "developer_name", "agent_name", + "broker_company", "category", "label", "stage", "channel", "type", + "micro_market", "district", "status", "persona", "nationality", +} +_MEASURE_HINTS = { + "count", "total", "sum", "avg", "average", "value", "score", "rate", + "current_value", "qd_score", "probability", "interest_count", "visit_count", + "interaction_count", "days", "amount", "revenue", +} +_TIMESTAMP_HINTS = {"at", "date", "time", "when", "timestamp"} + +_PREFERRED_X = [ + "project_name", "developer_name", "category", "stage", "channel", + "micro_market", "broker_company", "agent_name", "name", "full_name", + "label", "status", "type", +] +_PREFERRED_Y = [ + "count", "total", "interested_clients", "interest_count", "client_count", + "current_value", "qd_score", "value", "probability", "interaction_count", + "visit_count", "days_since_last_contact", +] + +_TABLE_COLUMN_PRESETS: dict[str, list[str]] = { + "crm_people": ["full_name", "primary_phone", "primary_email", "persona_labels"], + "intel_qd_scores": ["full_name", "current_value", "score_type", "computed_at"], + "crm_leads": ["full_name", "stage", "status", "budget_band", "urgency"], + "intel_interactions": ["full_name", "channel", "interaction_type", "happened_at", "summary"], + "read_last_contacted": ["full_name", "last_contacted_at", "last_channel", "days_since_last_contact", "staleness_label"], + "crm_property_interests": ["full_name", "project_name", "interest_level", "configuration_preference"], + "intel_call_objections": ["full_name", "objection_type", "intensity", "was_resolved", "raised_at"], + "intel_extracted_facts": ["full_name", "fact_type", "fact_text", "confidence", "effective_date"], + "read_next_best_action": ["full_name", "action_label", "urgency", "recommended_channel", "execute_within_hours"], +} + + +def _pick_axis(candidates: list[str], preferred: list[str]) -> str | None: + for candidate in preferred: + if candidate in candidates: + return candidate + return candidates[0] if candidates else None + + +def _title_from_prompt(prompt: str) -> str: + words = re.sub(r"\s+", " ", prompt.strip()).strip(" ?.!")[:72] + return (words[:1].upper() + words[1:]) if words else "Oracle Query Result" + + +class VisualizationPlanner: + def plan( + self, + *, + rows: list[dict[str, Any]], + columns: list[str], + prompt: str, + source_tables: list[str], + profile_suggested_type: str | None = None, + title_from_planner: str | None = None, + ) -> VisualizationDecision: + profiles = _profile_columns(rows, columns) + classifications = {profile.name: self._classify_column(profile) for profile in profiles} + + dimensions = [column for column, kind in classifications.items() if kind == "dimension"] + measures = [column for column, kind in classifications.items() if kind == "measure"] + timestamps = [column for column, kind in classifications.items() if kind == "timestamp"] + row_count = len(rows) + prompt_lower = prompt.lower() + + if profile_suggested_type: + return self._build_decision( + component_type=profile_suggested_type, + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning=f"Execution profiler suggested {profile_suggested_type}", + confidence=0.9, + ) + + timeline_terms = ("timeline", "history", "activity", "message", "call log", "whatsapp", "email", "conversation", "transcript", "interaction") + if any(term in prompt_lower for term in timeline_terms) and timestamps: + return self._build_decision( + component_type="activityStream", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Activity-like prompt plus timestamped result.", + confidence=0.88, + ) + + if row_count == 1 and measures and not dimensions: + return self._build_decision( + component_type="kpiTile", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Single numeric row.", + confidence=0.92, + ) + + if timestamps and measures and any(term in prompt_lower for term in ("trend", "over time", "monthly", "weekly", "growth", "timeseries")): + return self._build_decision( + component_type="lineChart", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Temporal series plus measure.", + confidence=0.87, + ) + + if ("stage" in columns or "pipeline" in prompt_lower) and any(term in prompt_lower for term in ("pipeline", "funnel", "stage", "kanban", "deal")): + return self._build_decision( + component_type="pipelineBoard", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Pipeline-like prompt and stage-like data.", + confidence=0.85, + ) + + if dimensions and measures and row_count <= 30 and not timestamps: + return self._build_decision( + component_type="barChart", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Categorical dimension plus measure.", + confidence=0.8, + ) + + return self._build_decision( + component_type="table", + dimensions=dimensions, + measures=measures, + timestamps=timestamps, + columns=columns, + rows=rows, + row_count=row_count, + prompt=prompt, + source_tables=source_tables, + title=title_from_planner, + reasoning="Default structured table.", + confidence=0.7, + ) + + @staticmethod + def _classify_column(profile: ColumnProfile) -> str: + lower = profile.name.lower() + if lower.endswith("_id"): + return "identity" + if profile.is_datetime or any(token in lower for token in _TIMESTAMP_HINTS): + return "timestamp" + if lower in _DIMENSION_HINTS or (profile.is_string and not profile.is_numeric): + return "dimension" + if profile.is_numeric or any(token in lower for token in _MEASURE_HINTS): + return "measure" + return "other" + + def _build_decision( + self, + *, + component_type: str, + dimensions: list[str], + measures: list[str], + timestamps: list[str], + columns: list[str], + rows: list[dict[str, Any]], + row_count: int, + prompt: str, + source_tables: list[str], + title: str | None, + reasoning: str, + confidence: float, + ) -> VisualizationDecision: + x_axis = _pick_axis(dimensions + timestamps, _PREFERRED_X + list(timestamps)) + y_axis = _pick_axis(measures, _PREFERRED_Y) + + if component_type == "table": + display_columns = self._table_columns(columns, source_tables) + else: + display_columns = columns + + viz_params = self._build_viz_params( + component_type=component_type, + x_axis=x_axis, + y_axis=y_axis, + display_columns=display_columns, + row_count=row_count, + ) + data_bindings = { + "dimensions": dimensions[:2] if dimensions else (timestamps[:1] if timestamps else []), + "measures": measures[:3], + "series": [], + "filters": [], + } + width_mode = "full" if component_type in {"table", "activityStream", "pipelineBoard"} else "half" + height_map = { + "kpiTile": 140, + "barChart": 320, + "lineChart": 320, + "activityStream": 380, + "table": 300, + "pipelineBoard": 400, + } + skeleton_map = { + "kpiTile": "kpi", + "barChart": "chart", + "lineChart": "chart", + "activityStream": "table", + "table": "table", + "pipelineBoard": "pipeline", + } + + return VisualizationDecision( + component_type=component_type, + x_axis=x_axis, + y_axis=y_axis, + series_cols=[], + dimension_cols=dimensions, + measure_cols=measures, + title=title or _title_from_prompt(prompt), + width_mode=width_mode, + min_height_px=height_map.get(component_type, 300), + skeleton_variant=skeleton_map.get(component_type, "generic"), + viz_params=viz_params, + data_bindings=data_bindings, + confidence=confidence, + reasoning=reasoning, + ) + + @staticmethod + def _table_columns(all_columns: list[str], source_tables: list[str]) -> list[str]: + for table in source_tables: + preset = _TABLE_COLUMN_PRESETS.get(table) + if preset: + matched = [column for column in preset if column in all_columns] + if matched: + return matched + return [column for column in all_columns if not column.endswith("_id") or column == "person_id"][:8] + + @staticmethod + def _build_viz_params( + *, + component_type: str, + x_axis: str | None, + y_axis: str | None, + display_columns: list[str], + row_count: int, + ) -> dict[str, Any]: + del row_count + if component_type == "barChart": + return { + "xAxis": x_axis or "category", + "yAxis": y_axis or "value", + "sort": "desc", + "showLabels": True, + "legend": False, + } + if component_type == "lineChart": + return {"showPoints": True, "smooth": True} + if component_type == "kpiTile": + return {"label": "Result", "trend": "", "comparisonLabel": ""} + if component_type == "table": + return { + "columns": display_columns, + "emptyStateTitle": "No matching records found", + "emptyStateDescription": "The query ran successfully but returned no rows for this prompt.", + "rankBy": y_axis, + "showTopBadge": False, + } + if component_type == "activityStream": + return {"showUrgencyIndicator": True} + if component_type == "pipelineBoard": + return {"showValue": True, "colorByStage": True} + return {} + + +visualization_planner = VisualizationPlanner()