fix: stabilize oracle and bundled pillar routes

This commit is contained in:
2026-05-03 01:06:50 +05:30
parent 600716a69d
commit f495b513ff
3 changed files with 245 additions and 84 deletions

View File

@@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -11,6 +13,7 @@ from backend.services.nemoclaw_runtime import nemoclaw_runtime
from backend.services.runtime_llm_service import runtime_llm_service from backend.services.runtime_llm_service import runtime_llm_service
router = APIRouter() router = APIRouter()
logger = logging.getLogger(__name__)
class WorkflowPreviewRequest(BaseModel): class WorkflowPreviewRequest(BaseModel):
@@ -79,7 +82,11 @@ async def oracle_query(request: Request, payload: OracleQueryRequest) -> dict:
if pool is None: if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.") raise HTTPException(status_code=503, detail="Database unavailable.")
async with pool.acquire() as conn: async with pool.acquire() as conn:
try:
result = await natural_db_agent.execute_prompt(payload.prompt, row_limit=payload.row_limit, conn=conn) result = await natural_db_agent.execute_prompt(payload.prompt, row_limit=payload.row_limit, conn=conn)
except Exception as exc:
logger.exception("oracle_query_failed")
raise HTTPException(status_code=502, detail=f"Oracle query failed: {exc}") from exc
return {"status": "ok", "data": result.as_dict()} return {"status": "ok", "data": result.as_dict()}

View File

@@ -221,6 +221,58 @@ def title_from_prompt(prompt: str) -> str:
return (words[:1].upper() + words[1:80]) if words else "Oracle Query Result" return (words[:1].upper() + words[1:80]) if words else "Oracle Query Result"
def _extract_requested_limit(prompt: str, default: int) -> int:
lowered = prompt.lower()
numeric_match = re.search(
r"\b(?:top|last|latest|recent|first|lowest|highest|best|worst|three|five|ten|eight)\s+(\d{1,3})\b"
r"|\b(\d{1,3})\s+(?:clients|leads|properties|projects|records|rows)\b",
lowered,
)
numeric_limit = int(numeric_match.group(1) or numeric_match.group(2)) if numeric_match else 0
if numeric_limit > 0:
return max(1, min(default, numeric_limit))
words = {
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5,
"six": 6,
"seven": 7,
"eight": 8,
"nine": 9,
"ten": 10,
"eleven": 11,
"twelve": 12,
"fifteen": 15,
"twenty": 20,
}
for word, value in words.items():
if re.search(
rf"\b(?:top|last|latest|recent|first|lowest|highest|best|worst)\s+{word}\b"
rf"|\b{word}\s+(?:clients|leads|properties|projects|records|rows)\b",
lowered,
):
return max(1, min(default, value))
return default
def _extract_project_hint(prompt: str) -> str | None:
quoted = re.search(r"['\"]([^'\"]{3,80})['\"]", prompt)
if quoted:
return quoted.group(1).strip()
lowered = prompt.lower()
match = re.search(r"\b(?:in|for|at)\s+([a-z0-9][a-z0-9\s&.-]{2,80})(?:\?|$)", lowered, re.IGNORECASE)
if match:
candidate = match.group(1).strip(" .?")
stop_words = {"last three months", "last month", "last week", "the database", "crm"}
if candidate and candidate not in stop_words:
return candidate
return None
class NaturalDbAgent: class NaturalDbAgent:
async def schema_catalog(self, conn: Any | None = None) -> dict[str, Any]: async def schema_catalog(self, conn: Any | None = None) -> dict[str, Any]:
own_conn = conn is None own_conn = conn is None
@@ -311,6 +363,16 @@ class NaturalDbAgent:
try: try:
catalog = await self.schema_catalog(conn) catalog = await self.schema_catalog(conn)
detected_intents = _detect_intents(prompt) detected_intents = _detect_intents(prompt)
local_plan = self._deterministic_plan(prompt=prompt, row_limit=row_limit)
if local_plan:
return await self._execute_verified_plan(
conn=conn,
prompt=prompt,
plan=local_plan,
row_limit=row_limit,
detected_intents=detected_intents,
replan_count=0,
)
return await self._pipeline( return await self._pipeline(
conn=conn, conn=conn,
prompt=prompt, prompt=prompt,
@@ -324,29 +386,20 @@ class NaturalDbAgent:
if own_conn: if own_conn:
await conn.close() await conn.close()
async def _pipeline( async def _execute_verified_plan(
self, self,
*, *,
conn: Any, conn: Any,
prompt: str, prompt: str,
catalog: dict[str, Any], plan: dict[str, Any],
detected_intents: list[str],
row_limit: int, row_limit: int,
attempt: int, detected_intents: list[str],
prior_feedback: str | None, replan_count: int,
) -> NaturalQueryResult: ) -> NaturalQueryResult:
warnings: list[str] = [] warnings: list[str] = []
plan = await self._plan_sql(
prompt=prompt,
catalog=catalog,
detected_intents=detected_intents,
row_limit=row_limit,
prior_feedback=prior_feedback,
)
raw_sql = str(plan.get("sql") or "").strip() raw_sql = str(plan.get("sql") or "").strip()
if not raw_sql: if not raw_sql:
raise RuntimeError("Natural SQL planner returned no SQL.") raise RuntimeError("Oracle planner returned no SQL.")
verification = await plan_verifier.verify_and_repair( verification = await plan_verifier.verify_and_repair(
sql=raw_sql, sql=raw_sql,
@@ -380,6 +433,158 @@ class NaturalDbAgent:
rows = [_json_safe(dict(record)) for record in records] rows = [_json_safe(dict(record)) for record in records]
columns = list(rows[0].keys()) if rows else [] columns = list(rows[0].keys()) if rows else []
profile = execution_profiler.profile(
rows=rows,
columns=columns,
sql=effective_sql,
prompt=prompt,
source_tables=source_tables,
row_limit=row_limit,
)
if not profile.passed:
warnings.extend(f"[{issue.code}] {issue.description}" for issue in profile.issues)
visualization_decision = visualization_planner.plan(
rows=rows,
columns=columns,
prompt=prompt,
source_tables=source_tables,
profile_suggested_type=profile.suggested_component_type,
title_from_planner=str(plan.get("title") or ""),
)
title = visualization_decision.title or str(plan.get("title") or title_from_prompt(prompt))
summary = str(plan.get("rationale") or f"SQL-backed Oracle result from {', '.join(source_tables) or 'Velocity CRM'}.")
return NaturalQueryResult(
prompt=prompt,
sql=effective_sql,
title=title,
summary=summary,
columns=columns,
rows=rows,
row_count=len(rows),
source_tables=source_tables,
component_type=visualization_decision.component_type,
warnings=warnings,
visualization_decision=visualization_decision,
replan_count=replan_count,
semantic_catalog_version=CATALOG_VERSION,
)
def _deterministic_plan(self, *, prompt: str, row_limit: int) -> dict[str, Any] | None:
lowered = prompt.lower()
limit = _extract_requested_limit(prompt, row_limit)
if "qd" in lowered and any(token in lowered for token in ("highest", "top", "best")):
return {
"title": f"Top {limit} Clients by QD Score",
"rationale": "Uses intel_qd_scores.overall joined to crm_people so the score is sourced from the canonical QD table.",
"sql": (
"SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, "
"q.current_value AS qd_score, q.score_type, q.computed_at "
"FROM intel_qd_scores q "
"JOIN crm_people p ON p.person_id = q.person_id "
"WHERE q.score_type = 'overall' "
"ORDER BY q.current_value DESC NULLS LAST "
f"LIMIT {limit}"
),
}
project_hint = _extract_project_hint(prompt)
if "qd" in lowered and any(token in lowered for token in ("lowest", "low", "worst")):
where = "q.score_type = 'overall'"
if project_hint:
safe_project = project_hint.replace("'", "''")
where += f" AND pi.project_name ILIKE '%{safe_project}%'"
return {
"title": f"Lowest {limit} QD Clients" + (f" in {project_hint}" if project_hint else ""),
"rationale": "Ranks clients by canonical overall QD score, scoped to property interest when a project is named.",
"sql": (
"SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, "
"pi.project_name, q.current_value AS qd_score, q.score_type, q.computed_at "
"FROM intel_qd_scores q "
"JOIN crm_people p ON p.person_id = q.person_id "
"LEFT JOIN crm_property_interests pi ON pi.person_id = p.person_id "
f"WHERE {where} "
"ORDER BY q.current_value ASC NULLS LAST "
f"LIMIT {limit}"
),
}
if any(token in lowered for token in ("majority", "most clients", "most interested", "top")) and any(
token in lowered for token in ("property", "properties", "project", "projects", "interested")
):
return {
"title": f"Top {limit} Properties by Client Interest",
"rationale": "Counts distinct clients in crm_property_interests by project to show where demand is concentrated.",
"sql": (
"SELECT COALESCE(pi.project_name, pr.project_name, 'Unknown project') AS project_name, "
"COUNT(DISTINCT pi.person_id) AS interested_clients, "
"COUNT(*) AS interest_records, "
"ROUND(AVG(q.current_value)::numeric, 2) AS average_qd_score "
"FROM crm_property_interests pi "
"LEFT JOIN inventory_projects pr ON pr.project_id = pi.project_id "
"LEFT JOIN intel_qd_scores q ON q.person_id = pi.person_id AND q.score_type = 'overall' "
"GROUP BY COALESCE(pi.project_name, pr.project_name, 'Unknown project') "
"ORDER BY interested_clients DESC, average_qd_score DESC NULLS LAST "
f"LIMIT {limit}"
),
}
if any(token in lowered for token in ("last contacted", "contacted us", "recently contacted", "last contact")):
return {
"title": f"Last {limit} Contacted Clients",
"rationale": "Uses the read_last_contacted model as the canonical contact recency source.",
"sql": (
"SELECT p.person_id, p.full_name, p.primary_phone, p.primary_email, "
"lc.last_contact_at, lc.last_channel, lc.days_since_contact AS days_since_last_contact, "
"q.current_value AS qd_score "
"FROM read_last_contacted lc "
"JOIN crm_people p ON p.person_id = lc.person_id "
"LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id AND q.score_type = 'overall' "
"ORDER BY lc.last_contact_at DESC NULLS LAST "
f"LIMIT {limit}"
),
}
return None
async def _pipeline(
self,
*,
conn: Any,
prompt: str,
catalog: dict[str, Any],
detected_intents: list[str],
row_limit: int,
attempt: int,
prior_feedback: str | None,
) -> NaturalQueryResult:
warnings: list[str] = []
plan = await self._plan_sql(
prompt=prompt,
catalog=catalog,
detected_intents=detected_intents,
row_limit=row_limit,
prior_feedback=prior_feedback,
)
raw_sql = str(plan.get("sql") or "").strip()
if not raw_sql:
raise RuntimeError("Natural SQL planner returned no SQL.")
result = await self._execute_verified_plan(
conn=conn,
prompt=prompt,
plan=plan,
row_limit=row_limit,
detected_intents=detected_intents,
replan_count=attempt,
)
effective_sql = result.sql
source_tables = result.source_tables
rows = result.rows
columns = result.columns
warnings.extend(result.warnings)
profile = execution_profiler.profile( profile = execution_profiler.profile(
rows=rows, rows=rows,

View File

@@ -3,35 +3,21 @@ import { AuthGuard } from './shared/layout/AuthGuard';
import { AdminGuard } from './shared/layout/AdminGuard'; import { AdminGuard } from './shared/layout/AdminGuard';
import { AuthenticatedShell } from './shared/layout/AuthenticatedShell'; import { AuthenticatedShell } from './shared/layout/AuthenticatedShell';
import { LoginPage } from './shared/layout/LoginPage'; import { LoginPage } from './shared/layout/LoginPage';
import CommandPillar from './pillars/command/CommandPillar';
import OracleWorkspacePage from './pillars/command/OracleWorkspacePage';
import PipelinePillar from './pillars/pipeline/PipelinePillar';
import Client360 from './pillars/pipeline/client360/Client360';
import ShowroomMode from './pillars/pipeline/ShowroomMode';
import StudioPillar from './pillars/studio/StudioPillar';
import PropertyEntity from './pillars/studio/PropertyEntity';
import ControlRoom from './control-room/ControlRoom';
import VaultPublicPage from './shared/layout/VaultPublicPage';
// ── Pillar pages (lazy loaded for performance) ────────────────
import { lazy, Suspense } from 'react';
import { PillarSkeleton } from './shared/layout/PillarSkeleton';
const CommandPillar = lazy(() => import('./pillars/command/CommandPillar'));
const OracleWorkspacePage = lazy(() => import('./pillars/command/OracleWorkspacePage'));
const PipelinePillar = lazy(() => import('./pillars/pipeline/PipelinePillar'));
const Client360 = lazy(() => import('./pillars/pipeline/client360/Client360'));
const ShowroomMode = lazy(() => import('./pillars/pipeline/ShowroomMode'));
const StudioPillar = lazy(() => import('./pillars/studio/StudioPillar'));
const PropertyEntity = lazy(() => import('./pillars/studio/PropertyEntity'));
const ControlRoom = lazy(() => import('./control-room/ControlRoom'));
const VaultPublicPage = lazy(() => import('./shared/layout/VaultPublicPage'));
// ── Lazy wrapper with branded skeleton ───────────────────────
const Lazy = ({ children }: { children: React.ReactNode }) => (
<Suspense fallback={<PillarSkeleton />}>{children}</Suspense>
);
// ── Router definition ─────────────────────────────────────────
const router = createBrowserRouter([ const router = createBrowserRouter([
// ── Unauthenticated ──
{ {
path: '/login', path: '/login',
element: <LoginPage />, element: <LoginPage />,
}, },
// ── Authenticated WebOS Shell ──
{ {
path: '/', path: '/',
element: ( element: (
@@ -40,67 +26,30 @@ const router = createBrowserRouter([
</AuthGuard> </AuthGuard>
), ),
children: [ children: [
// Default redirect
{ index: true, element: <Navigate to="/command" replace /> }, { index: true, element: <Navigate to="/command" replace /> },
{ path: 'command', element: <CommandPillar /> },
// Pillar 1: COMMAND — Morning Briefing (Dashboard + Oracle) { path: 'oracle', element: <OracleWorkspacePage /> },
{ { path: 'pipeline', element: <PipelinePillar /> },
path: 'command', { path: 'pipeline/:personId', element: <Client360 /> },
element: <Lazy><CommandPillar /></Lazy>, { path: 'showroom', element: <ShowroomMode /> },
}, { path: 'studio', element: <StudioPillar /> },
{ { path: 'studio/:propertyId', element: <PropertyEntity /> },
path: 'oracle',
element: <Lazy><OracleWorkspacePage /></Lazy>,
},
// Pillar 2: PIPELINE — Deal Intelligence (CRM + Comms + Sentinel)
{
path: 'pipeline',
element: <Lazy><PipelinePillar /></Lazy>,
},
// Client 360 entity — drills in over Pipeline
{
path: 'pipeline/:personId',
element: <Lazy><Client360 /></Lazy>,
},
// Showroom Mode — contextual full-screen overlay
{
path: 'showroom',
element: <Lazy><ShowroomMode /></Lazy>,
},
// Pillar 3: STUDIO — Asset + Marketing Hub (Inventory + Catalyst)
{
path: 'studio',
element: <Lazy><StudioPillar /></Lazy>,
},
// Property Entity — drills in over Studio
{
path: 'studio/:propertyId',
element: <Lazy><PropertyEntity /></Lazy>,
},
], ],
}, },
// ── Admin-Only Control Room (RBAC gated at component + API level) ──
{ {
path: '/control-room/:panel?', path: '/control-room/:panel?',
element: ( element: (
<AuthGuard> <AuthGuard>
<AdminGuard> <AdminGuard>
<Lazy><ControlRoom /></Lazy> <ControlRoom />
</AdminGuard> </AdminGuard>
</AuthGuard> </AuthGuard>
), ),
}, },
// ── Public vault links (no auth) ──
{ {
path: '/vault/:trackingHash', path: '/vault/:trackingHash',
element: <Lazy><VaultPublicPage /></Lazy>, element: <VaultPublicPage />,
}, },
// ── 404 fallback ──
{ {
path: '*', path: '*',
element: <Navigate to="/command" replace />, element: <Navigate to="/command" replace />,