diff --git a/core/api/api/routes_oracle.py b/core/api/api/routes_oracle.py index 2761fa8..8d30647 100644 --- a/core/api/api/routes_oracle.py +++ b/core/api/api/routes_oracle.py @@ -1,5 +1,7 @@ from __future__ import annotations +import logging + from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field @@ -11,6 +13,7 @@ from backend.services.nemoclaw_runtime import nemoclaw_runtime from backend.services.runtime_llm_service import runtime_llm_service router = APIRouter() +logger = logging.getLogger(__name__) class WorkflowPreviewRequest(BaseModel): @@ -79,7 +82,11 @@ async def oracle_query(request: Request, payload: OracleQueryRequest) -> dict: if pool is None: raise HTTPException(status_code=503, detail="Database unavailable.") async with pool.acquire() as conn: - result = await natural_db_agent.execute_prompt(payload.prompt, row_limit=payload.row_limit, conn=conn) + try: + result = await natural_db_agent.execute_prompt(payload.prompt, row_limit=payload.row_limit, conn=conn) + except Exception as exc: + logger.exception("oracle_query_failed") + raise HTTPException(status_code=502, detail=f"Oracle query failed: {exc}") from exc return {"status": "ok", "data": result.as_dict()} diff --git a/core/oracle/oracle/natural_db_agent.py b/core/oracle/oracle/natural_db_agent.py index 231e185..0790dc7 100644 --- a/core/oracle/oracle/natural_db_agent.py +++ b/core/oracle/oracle/natural_db_agent.py @@ -221,6 +221,58 @@ def title_from_prompt(prompt: str) -> str: return (words[:1].upper() + words[1:80]) if words else "Oracle Query Result" +def _extract_requested_limit(prompt: str, default: int) -> int: + lowered = prompt.lower() + numeric_match = re.search( + r"\b(?:top|last|latest|recent|first|lowest|highest|best|worst|three|five|ten|eight)\s+(\d{1,3})\b" + r"|\b(\d{1,3})\s+(?:clients|leads|properties|projects|records|rows)\b", + lowered, + ) + numeric_limit = int(numeric_match.group(1) or numeric_match.group(2)) if numeric_match else 0 + if numeric_limit > 0: + return max(1, min(default, numeric_limit)) + + words = { + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5, + "six": 6, + "seven": 7, + "eight": 8, + "nine": 9, + "ten": 10, + "eleven": 11, + "twelve": 12, + "fifteen": 15, + "twenty": 20, + } + for word, value in words.items(): + if re.search( + rf"\b(?:top|last|latest|recent|first|lowest|highest|best|worst)\s+{word}\b" + rf"|\b{word}\s+(?:clients|leads|properties|projects|records|rows)\b", + lowered, + ): + return max(1, min(default, value)) + return default + + +def _extract_project_hint(prompt: str) -> str | None: + quoted = re.search(r"['\"]([^'\"]{3,80})['\"]", prompt) + if quoted: + return quoted.group(1).strip() + + lowered = prompt.lower() + match = re.search(r"\b(?:in|for|at)\s+([a-z0-9][a-z0-9\s&.-]{2,80})(?:\?|$)", lowered, re.IGNORECASE) + if match: + candidate = match.group(1).strip(" .?") + stop_words = {"last three months", "last month", "last week", "the database", "crm"} + if candidate and candidate not in stop_words: + return candidate + return None + + class NaturalDbAgent: async def schema_catalog(self, conn: Any | None = None) -> dict[str, Any]: own_conn = conn is None @@ -311,6 +363,16 @@ class NaturalDbAgent: try: catalog = await self.schema_catalog(conn) detected_intents = _detect_intents(prompt) + local_plan = self._deterministic_plan(prompt=prompt, row_limit=row_limit) + if local_plan: + return await self._execute_verified_plan( + conn=conn, + prompt=prompt, + plan=local_plan, + row_limit=row_limit, + detected_intents=detected_intents, + replan_count=0, + ) return await self._pipeline( conn=conn, prompt=prompt, @@ -324,29 +386,20 @@ class NaturalDbAgent: if own_conn: await conn.close() - async def _pipeline( + async def _execute_verified_plan( self, *, conn: Any, prompt: str, - catalog: dict[str, Any], - detected_intents: list[str], + plan: dict[str, Any], row_limit: int, - attempt: int, - prior_feedback: str | None, + detected_intents: list[str], + replan_count: int, ) -> NaturalQueryResult: warnings: list[str] = [] - - plan = await self._plan_sql( - prompt=prompt, - catalog=catalog, - detected_intents=detected_intents, - row_limit=row_limit, - prior_feedback=prior_feedback, - ) raw_sql = str(plan.get("sql") or "").strip() if not raw_sql: - raise RuntimeError("Natural SQL planner returned no SQL.") + raise RuntimeError("Oracle planner returned no SQL.") verification = await plan_verifier.verify_and_repair( sql=raw_sql, @@ -380,6 +433,158 @@ class NaturalDbAgent: rows = [_json_safe(dict(record)) for record in records] columns = list(rows[0].keys()) if rows else [] + profile = execution_profiler.profile( + rows=rows, + columns=columns, + sql=effective_sql, + prompt=prompt, + source_tables=source_tables, + row_limit=row_limit, + ) + if not profile.passed: + warnings.extend(f"[{issue.code}] {issue.description}" for issue in profile.issues) + + visualization_decision = visualization_planner.plan( + rows=rows, + columns=columns, + prompt=prompt, + source_tables=source_tables, + profile_suggested_type=profile.suggested_component_type, + title_from_planner=str(plan.get("title") or ""), + ) + title = visualization_decision.title or str(plan.get("title") or title_from_prompt(prompt)) + summary = str(plan.get("rationale") or f"SQL-backed Oracle result from {', '.join(source_tables) or 'Velocity CRM'}.") + return NaturalQueryResult( + prompt=prompt, + sql=effective_sql, + title=title, + summary=summary, + columns=columns, + rows=rows, + row_count=len(rows), + source_tables=source_tables, + component_type=visualization_decision.component_type, + warnings=warnings, + visualization_decision=visualization_decision, + replan_count=replan_count, + semantic_catalog_version=CATALOG_VERSION, + ) + + def _deterministic_plan(self, *, prompt: str, row_limit: int) -> dict[str, Any] | None: + lowered = prompt.lower() + limit = _extract_requested_limit(prompt, row_limit) + + if "qd" in lowered and any(token in lowered for token in ("highest", "top", "best")): + return { + "title": f"Top {limit} Clients by QD Score", + "rationale": "Uses intel_qd_scores.overall joined to crm_people so the score is sourced from the canonical QD table.", + "sql": ( + "SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, " + "q.current_value AS qd_score, q.score_type, q.computed_at " + "FROM intel_qd_scores q " + "JOIN crm_people p ON p.person_id = q.person_id " + "WHERE q.score_type = 'overall' " + "ORDER BY q.current_value DESC NULLS LAST " + f"LIMIT {limit}" + ), + } + + project_hint = _extract_project_hint(prompt) + if "qd" in lowered and any(token in lowered for token in ("lowest", "low", "worst")): + where = "q.score_type = 'overall'" + if project_hint: + safe_project = project_hint.replace("'", "''") + where += f" AND pi.project_name ILIKE '%{safe_project}%'" + return { + "title": f"Lowest {limit} QD Clients" + (f" in {project_hint}" if project_hint else ""), + "rationale": "Ranks clients by canonical overall QD score, scoped to property interest when a project is named.", + "sql": ( + "SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, " + "pi.project_name, q.current_value AS qd_score, q.score_type, q.computed_at " + "FROM intel_qd_scores q " + "JOIN crm_people p ON p.person_id = q.person_id " + "LEFT JOIN crm_property_interests pi ON pi.person_id = p.person_id " + f"WHERE {where} " + "ORDER BY q.current_value ASC NULLS LAST " + f"LIMIT {limit}" + ), + } + + if any(token in lowered for token in ("majority", "most clients", "most interested", "top")) and any( + token in lowered for token in ("property", "properties", "project", "projects", "interested") + ): + return { + "title": f"Top {limit} Properties by Client Interest", + "rationale": "Counts distinct clients in crm_property_interests by project to show where demand is concentrated.", + "sql": ( + "SELECT COALESCE(pi.project_name, pr.project_name, 'Unknown project') AS project_name, " + "COUNT(DISTINCT pi.person_id) AS interested_clients, " + "COUNT(*) AS interest_records, " + "ROUND(AVG(q.current_value)::numeric, 2) AS average_qd_score " + "FROM crm_property_interests pi " + "LEFT JOIN inventory_projects pr ON pr.project_id = pi.project_id " + "LEFT JOIN intel_qd_scores q ON q.person_id = pi.person_id AND q.score_type = 'overall' " + "GROUP BY COALESCE(pi.project_name, pr.project_name, 'Unknown project') " + "ORDER BY interested_clients DESC, average_qd_score DESC NULLS LAST " + f"LIMIT {limit}" + ), + } + + if any(token in lowered for token in ("last contacted", "contacted us", "recently contacted", "last contact")): + return { + "title": f"Last {limit} Contacted Clients", + "rationale": "Uses the read_last_contacted model as the canonical contact recency source.", + "sql": ( + "SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, " + "lc.last_contact_at, lc.last_channel, lc.days_since_contact AS days_since_last_contact, " + "q.current_value AS qd_score " + "FROM read_last_contacted lc " + "JOIN crm_people p ON p.person_id = lc.person_id " + "LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id AND q.score_type = 'overall' " + "ORDER BY lc.last_contact_at DESC NULLS LAST " + f"LIMIT {limit}" + ), + } + + return None + + async def _pipeline( + self, + *, + conn: Any, + prompt: str, + catalog: dict[str, Any], + detected_intents: list[str], + row_limit: int, + attempt: int, + prior_feedback: str | None, + ) -> NaturalQueryResult: + warnings: list[str] = [] + + plan = await self._plan_sql( + prompt=prompt, + catalog=catalog, + detected_intents=detected_intents, + row_limit=row_limit, + prior_feedback=prior_feedback, + ) + raw_sql = str(plan.get("sql") or "").strip() + if not raw_sql: + raise RuntimeError("Natural SQL planner returned no SQL.") + + result = await self._execute_verified_plan( + conn=conn, + prompt=prompt, + plan=plan, + row_limit=row_limit, + detected_intents=detected_intents, + replan_count=attempt, + ) + effective_sql = result.sql + source_tables = result.source_tables + rows = result.rows + columns = result.columns + warnings.extend(result.warnings) profile = execution_profiler.profile( rows=rows, diff --git a/webos/src/App.tsx b/webos/src/App.tsx index 8fc6d9f..cd9241a 100644 --- a/webos/src/App.tsx +++ b/webos/src/App.tsx @@ -3,35 +3,21 @@ import { AuthGuard } from './shared/layout/AuthGuard'; import { AdminGuard } from './shared/layout/AdminGuard'; import { AuthenticatedShell } from './shared/layout/AuthenticatedShell'; import { LoginPage } from './shared/layout/LoginPage'; +import CommandPillar from './pillars/command/CommandPillar'; +import OracleWorkspacePage from './pillars/command/OracleWorkspacePage'; +import PipelinePillar from './pillars/pipeline/PipelinePillar'; +import Client360 from './pillars/pipeline/client360/Client360'; +import ShowroomMode from './pillars/pipeline/ShowroomMode'; +import StudioPillar from './pillars/studio/StudioPillar'; +import PropertyEntity from './pillars/studio/PropertyEntity'; +import ControlRoom from './control-room/ControlRoom'; +import VaultPublicPage from './shared/layout/VaultPublicPage'; -// ── Pillar pages (lazy loaded for performance) ──────────────── -import { lazy, Suspense } from 'react'; -import { PillarSkeleton } from './shared/layout/PillarSkeleton'; - -const CommandPillar = lazy(() => import('./pillars/command/CommandPillar')); -const OracleWorkspacePage = lazy(() => import('./pillars/command/OracleWorkspacePage')); -const PipelinePillar = lazy(() => import('./pillars/pipeline/PipelinePillar')); -const Client360 = lazy(() => import('./pillars/pipeline/client360/Client360')); -const ShowroomMode = lazy(() => import('./pillars/pipeline/ShowroomMode')); -const StudioPillar = lazy(() => import('./pillars/studio/StudioPillar')); -const PropertyEntity = lazy(() => import('./pillars/studio/PropertyEntity')); -const ControlRoom = lazy(() => import('./control-room/ControlRoom')); -const VaultPublicPage = lazy(() => import('./shared/layout/VaultPublicPage')); - -// ── Lazy wrapper with branded skeleton ─────────────────────── -const Lazy = ({ children }: { children: React.ReactNode }) => ( - }>{children} -); - -// ── Router definition ───────────────────────────────────────── const router = createBrowserRouter([ - // ── Unauthenticated ── { path: '/login', element: , }, - - // ── Authenticated WebOS Shell ── { path: '/', element: ( @@ -40,67 +26,30 @@ const router = createBrowserRouter([ ), children: [ - // Default redirect { index: true, element: }, - - // Pillar 1: COMMAND — Morning Briefing (Dashboard + Oracle) - { - path: 'command', - element: , - }, - { - path: 'oracle', - element: , - }, - - // Pillar 2: PIPELINE — Deal Intelligence (CRM + Comms + Sentinel) - { - path: 'pipeline', - element: , - }, - // Client 360 entity — drills in over Pipeline - { - path: 'pipeline/:personId', - element: , - }, - // Showroom Mode — contextual full-screen overlay - { - path: 'showroom', - element: , - }, - - // Pillar 3: STUDIO — Asset + Marketing Hub (Inventory + Catalyst) - { - path: 'studio', - element: , - }, - // Property Entity — drills in over Studio - { - path: 'studio/:propertyId', - element: , - }, + { path: 'command', element: }, + { path: 'oracle', element: }, + { path: 'pipeline', element: }, + { path: 'pipeline/:personId', element: }, + { path: 'showroom', element: }, + { path: 'studio', element: }, + { path: 'studio/:propertyId', element: }, ], }, - - // ── Admin-Only Control Room (RBAC gated at component + API level) ── { path: '/control-room/:panel?', element: ( - + ), }, - - // ── Public vault links (no auth) ── { path: '/vault/:trackingHash', - element: , + element: , }, - - // ── 404 fallback ── { path: '*', element: ,