""" oracle/execution_profiler.py Post-execution quality checks for Oracle natural DB queries. """ from __future__ import annotations import re from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any _STALE_THRESHOLD_DAYS = 365 @dataclass class QualityIssue: code: str description: str severity: str replan_hint: str @dataclass class ProfileResult: passed: bool row_count: int issues: list[QualityIssue] = field(default_factory=list) replan_hints: list[str] = field(default_factory=list) suggested_component_type: str | None = None def _extract_cardinality_from_prompt(prompt: str) -> int | None: lowered = prompt.lower() numeric_match = re.search(r"\b(?:top|last|latest|recent|first|show|which)\s+(\d{1,4})\b", lowered) if numeric_match: return int(numeric_match.group(1)) words = { "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, "eleven": 11, "twelve": 12, "fifteen": 15, "twenty": 20, } word_match = re.search( r"\b(?:top|last|latest|recent|first|show|which)\s+" r"(one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|fifteen|twenty)\b", lowered, ) if word_match: return words.get(word_match.group(1)) return None def _all_null_measures(rows: list[dict[str, Any]], columns: list[str]) -> bool: if not rows or not columns: return False numeric_columns: list[str] = [] for column in columns: saw_numeric = False all_null = True for row in rows[:20]: value = row.get(column) if value is not None: all_null = False if isinstance(value, (int, float)): saw_numeric = True if saw_numeric: numeric_columns.append(column) if not all_null: return False if numeric_columns: return True return all(all(value is None for value in row.values()) for row in rows[:5]) def _timestamps_are_stale(rows: list[dict[str, Any]], columns: list[str]) -> bool: timestamp_columns = [ column for column in columns if any(token in column for token in ("_at", "date", "timestamp", "when", "time")) ] if not timestamp_columns or not rows: return False now = datetime.now(timezone.utc) checked = 0 stale = 0 for row in rows[:20]: for column in timestamp_columns: value = row.get(column) if value is None or not isinstance(value, str): continue try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: continue if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) checked += 1 if (now - parsed).days > _STALE_THRESHOLD_DAYS: stale += 1 return checked > 0 and stale == checked class ExecutionProfiler: def profile( self, *, rows: list[dict[str, Any]], columns: list[str], sql: str, prompt: str, source_tables: list[str], row_limit: int, ) -> ProfileResult: del source_tables, row_limit issues: list[QualityIssue] = [] sql_lower = sql.lower() if len(rows) == 0: issues.append( QualityIssue( code="zero_rows", description="Query returned zero rows.", severity="blocking", replan_hint=( "The query returned zero rows. Use authoritative recency and business-semantic columns " "from the semantic catalog. Avoid sparse or deprecated timestamp fields." ), ) ) elif _all_null_measures(rows, columns): issues.append( QualityIssue( code="all_null_measures", description="Rows returned but numeric measure columns are null.", severity="blocking", replan_hint=( "The query returned rows but numeric measures are null. " "Check join keys and metric source columns." ), ) ) requested_n = _extract_cardinality_from_prompt(prompt) if requested_n is not None and len(rows) > requested_n * 3: issues.append( QualityIssue( code="cardinality_mismatch", description=f"Prompt asked for about {requested_n} rows but query returned {len(rows)}.", severity="warning", replan_hint=f"Respect the requested result count and add LIMIT {requested_n}.", ) ) if rows and _timestamps_are_stale(rows, columns): issues.append( QualityIssue( code="stale_timestamps", description="Returned timestamps appear stale.", severity="warning", replan_hint="The result timestamps are stale. Use authoritative recency fields.", ) ) suggested_type: str | None = None if len(rows) == 1 and len(columns) <= 4: non_null_values = [value for value in rows[0].values() if value is not None] if non_null_values and all(isinstance(value, (int, float)) for value in non_null_values): suggested_type = "kpiTile" issues.append( QualityIssue( code="single_row_scalar", description="Single scalar row is better rendered as KPI tile.", severity="warning", replan_hint="", ) ) blocking = [issue for issue in issues if issue.severity == "blocking"] return ProfileResult( passed=len(blocking) == 0, row_count=len(rows), issues=issues, replan_hints=[issue.replan_hint for issue in issues if issue.replan_hint], suggested_component_type=suggested_type, ) execution_profiler = ExecutionProfiler()