forked from sagnik/Project_Velocity
1322 lines
55 KiB
Python
1322 lines
55 KiB
Python
"""
|
|
oracle/prompt_orchestrator.py
|
|
Accepts a user prompt, assembles context, calls the Nemoclaw model runtime
|
|
(or uses a deterministic fallback), validates the generated plan via policy,
|
|
triggers the data access gateway, and produces a PromptExecution.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from .policy_service import PolicyContext, PolicyService
|
|
from .canvas_service import canvas_service
|
|
from .data_access_gateway import data_access_gateway
|
|
from .persona_service import persona_service
|
|
from .codebook_service import codebook_service, CodebookExample
|
|
from .natural_db_agent import natural_db_agent
|
|
from backend.services.runtime_llm_service import runtime_llm_service
|
|
from backend.services.nemoclaw_runtime import nemoclaw_runtime
|
|
|
|
try:
|
|
import asyncpg # type: ignore
|
|
except Exception: # pragma: no cover
|
|
asyncpg = None # type: ignore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DB_URL = os.getenv("DATABASE_URL", "")
|
|
|
|
policy_svc = PolicyService()
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _iso(value: datetime | None) -> str | None:
|
|
if value is None:
|
|
return None
|
|
return value.isoformat()
|
|
|
|
|
|
def _coerce_datetime(value: datetime | str | None) -> datetime | None:
|
|
if value is None or isinstance(value, datetime):
|
|
return value
|
|
if isinstance(value, str) and value.strip():
|
|
try:
|
|
return datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
# ── Execution store ───────────────────────────────────────────────────────────
|
|
|
|
def _json_safe(value: Any) -> Any:
|
|
if isinstance(value, datetime):
|
|
return value.isoformat()
|
|
if isinstance(value, uuid.UUID):
|
|
return str(value)
|
|
if isinstance(value, dict):
|
|
return {str(key): _json_safe(val) for key, val in value.items()}
|
|
if isinstance(value, list):
|
|
return [_json_safe(item) for item in value]
|
|
if isinstance(value, tuple):
|
|
return [_json_safe(item) for item in value]
|
|
return value
|
|
|
|
|
|
_DEMO_EXECUTIONS: dict[str, dict[str, Any]] = {}
|
|
|
|
|
|
def _db_ready() -> bool:
|
|
return bool(_DB_URL and not _DB_URL.startswith("PLACEHOLDER") and asyncpg is not None)
|
|
|
|
|
|
# ── Semantic intent detection (simplified) ────────────────────────────────────
|
|
|
|
_INTENT_KEYWORDS: dict[str, list[str]] = {
|
|
"pipeline_board": ["pipeline", "stage", "kanban", "deal", "funnel"],
|
|
"bar_chart": ["bar", "compare", "source", "channel", "distribution", "ranked", "lead", "whale"],
|
|
"geo_map": ["map", "geographic", "location", "district", "region", "area", "dubai"],
|
|
"table": ["table", "list", "broker", "performance", "leaderboard", "rank", "top", "contact", "client", "account", "crm"],
|
|
"line_chart": ["trend", "time", "monthly", "weekly", "absorption", "forecast"],
|
|
"kpi_tile": ["kpi", "total", "summary", "attainment", "quota", "how many"],
|
|
"activity_stream": ["timeline", "activity", "history", "follow-up", "queue", "contact", "interaction", "message", "call", "email"],
|
|
}
|
|
|
|
|
|
def _detect_component_types(prompt: str) -> list[str]:
|
|
lower = prompt.lower()
|
|
types: list[str] = []
|
|
for comp_type, keywords in _INTENT_KEYWORDS.items():
|
|
if any(k in lower for k in keywords):
|
|
types.append(comp_type)
|
|
return types or ["bar_chart"]
|
|
|
|
|
|
def _build_demo_retrieval_plan(
|
|
prompt: str,
|
|
tenant_id: str,
|
|
actor_role: str,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Deterministic plan builder for demo mode.
|
|
Produces a valid retrieval plan that passes policy validation.
|
|
"""
|
|
component_types = _detect_component_types(prompt)
|
|
row_limit = _parse_prompt_row_limit(prompt, actor_role)
|
|
|
|
return {
|
|
"planId": str(uuid.uuid4()),
|
|
"components": [
|
|
{
|
|
"suggestedType": ct,
|
|
"dataset": _DATASET_MAP.get(ct, "aggregated_results"),
|
|
"privacyTier": "standard",
|
|
"rowLimit": row_limit,
|
|
"joins": [],
|
|
"queryTemplate": f"SELECT * FROM {_DATASET_MAP.get(ct, 'aggregated_results')} WHERE tenant_id = :tenant_id LIMIT :limit",
|
|
"queryParameters": {"tenant_id": tenant_id, "limit": row_limit},
|
|
}
|
|
for ct in component_types
|
|
],
|
|
"semanticModelVersion": "oracle_semantic_v2026_04_08_01",
|
|
"intentClass": "analytical",
|
|
}
|
|
|
|
|
|
def _infer_chart_axes(rows: list[dict[str, Any]], columns: list[str]) -> tuple[str | None, str | None]:
|
|
if not rows or not columns:
|
|
return None, None
|
|
|
|
sample = rows[0]
|
|
string_columns = [
|
|
column for column in columns
|
|
if isinstance(sample.get(column), str) and sample.get(column) not in (None, "")
|
|
]
|
|
numeric_columns = [
|
|
column for column in columns
|
|
if isinstance(sample.get(column), (int, float))
|
|
]
|
|
|
|
preferred_dimension_keys = (
|
|
"property_name",
|
|
"project_name",
|
|
"projects",
|
|
"name",
|
|
"category",
|
|
"label",
|
|
)
|
|
preferred_measure_keys = (
|
|
"interested_clients",
|
|
"interest_count",
|
|
"total_interest_events",
|
|
"count",
|
|
"value",
|
|
"avg_qd_score",
|
|
"qd_score",
|
|
)
|
|
|
|
x_axis = next((key for key in preferred_dimension_keys if key in string_columns), None)
|
|
if x_axis is None and string_columns:
|
|
x_axis = string_columns[0]
|
|
|
|
y_axis = next((key for key in preferred_measure_keys if key in numeric_columns), None)
|
|
if y_axis is None and numeric_columns:
|
|
y_axis = numeric_columns[0]
|
|
|
|
return x_axis, y_axis
|
|
|
|
|
|
def _canonical_plan_type(plan_type: str) -> str:
|
|
normalized = str(plan_type or "").strip()
|
|
mapping = {
|
|
"pipeline_board": "pipeline_board",
|
|
"pipelineBoard": "pipeline_board",
|
|
"bar_chart": "bar_chart",
|
|
"barChart": "bar_chart",
|
|
"geo_map": "geo_map",
|
|
"geoMap": "geo_map",
|
|
"table": "table",
|
|
"line_chart": "line_chart",
|
|
"lineChart": "line_chart",
|
|
"kpi_tile": "kpi_tile",
|
|
"kpiTile": "kpi_tile",
|
|
"activity_stream": "activity_stream",
|
|
"activityStream": "activity_stream",
|
|
"timeline": "activity_stream",
|
|
}
|
|
return mapping.get(normalized, normalized or "table")
|
|
|
|
|
|
_DATASET_MAP: dict[str, str] = {
|
|
"pipeline_board": "crm_opportunity_pipeline",
|
|
"bar_chart": "oracle_property_interest_rollup",
|
|
"geo_map": "lead_geo_interest_rollup",
|
|
"table": "crm_contacts_overview",
|
|
"line_chart": "oracle_property_interest_rollup",
|
|
"kpi_tile": "oracle_aggregated_metric",
|
|
"activity_stream": "oracle_client_interaction_timeline",
|
|
}
|
|
|
|
_CODEBOOK_COMPONENT_MAP: dict[str, str] = {
|
|
"summary_card": "kpi_tile",
|
|
"summary_strip": "kpi_tile",
|
|
"metric_card_group": "kpi_tile",
|
|
"compact_alert_card": "kpi_tile",
|
|
"gauge_stack": "kpi_tile",
|
|
"lead_profile_card": "table",
|
|
"property_card": "table",
|
|
"data_table": "table",
|
|
"leaderboard_table": "table",
|
|
"matrix_grid": "table",
|
|
"interaction_timeline": "activity_stream",
|
|
"message_thread_summary": "activity_stream",
|
|
"timeline": "activity_stream",
|
|
"bar_chart": "bar_chart",
|
|
"line_chart": "line_chart",
|
|
"heatmap": "geo_map",
|
|
"geo_map": "geo_map",
|
|
"pipeline_board": "pipeline_board",
|
|
}
|
|
|
|
|
|
def _component_plan_type_from_codebook(example: CodebookExample) -> str:
|
|
return _CODEBOOK_COMPONENT_MAP.get(example.component_type, "table")
|
|
|
|
|
|
def _parse_prompt_row_limit(prompt: str, actor_role: str) -> int:
|
|
default_limit = 50 if actor_role in ("senior_broker", "junior_broker") else 200
|
|
lowered = prompt.lower()
|
|
match = re.search(r"\b(?:top|last|latest|recent|first|show|name of the last|which)\s+(\d{1,4})\b", lowered)
|
|
if match:
|
|
requested = max(1, int(match.group(1)))
|
|
return min(requested, default_limit)
|
|
|
|
word_to_number = {
|
|
"one": 1,
|
|
"two": 2,
|
|
"three": 3,
|
|
"four": 4,
|
|
"five": 5,
|
|
"six": 6,
|
|
"seven": 7,
|
|
"eight": 8,
|
|
"nine": 9,
|
|
"ten": 10,
|
|
"eleven": 11,
|
|
"twelve": 12,
|
|
"thirteen": 13,
|
|
"fourteen": 14,
|
|
"fifteen": 15,
|
|
"sixteen": 16,
|
|
"seventeen": 17,
|
|
"eighteen": 18,
|
|
"nineteen": 19,
|
|
"twenty": 20,
|
|
}
|
|
word_match = re.search(
|
|
r"\b(?:top|last|latest|recent|first|show|name of the last|which)\s+"
|
|
r"(one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|thirteen|fourteen|fifteen|sixteen|seventeen|eighteen|nineteen|twenty)\b",
|
|
lowered,
|
|
)
|
|
if not word_match:
|
|
return default_limit
|
|
requested = word_to_number[word_match.group(1)]
|
|
return min(requested, default_limit)
|
|
|
|
|
|
def _prompt_data_intent(prompt: str) -> str | None:
|
|
lowered = prompt.lower()
|
|
contact_terms = (
|
|
"last contacted", "last contact", "last contacted us", "recently contacted",
|
|
"recent contacts", "last call", "last called", "last message", "last messaged",
|
|
"last whatsapp", "who contacted us", "contacted us", "contacted clients",
|
|
"client contacted", "clients contacted", "follow-up", "follow up",
|
|
)
|
|
interest_terms = (
|
|
"shown interest", "showed interest", "interested clients", "interested client",
|
|
"property interest", "project interest", "interested in any", "interest in any",
|
|
"interested in our properties", "interested in properties",
|
|
)
|
|
timeline_terms = (
|
|
"conversation", "timeline", "whatsapp", "messages", "message history",
|
|
"call history", "transcript", "email", "visit history", "interaction history",
|
|
)
|
|
client_360_terms = ("client 360", "client dossier", "highest intent buyer", "client profile")
|
|
if any(term in lowered for term in contact_terms) or re.search(r"\blast\s+\d+\s+contacted\b", lowered):
|
|
return "last_contacted"
|
|
if any(term in lowered for term in interest_terms) or (
|
|
any(term in lowered for term in ("interest", "interested", "project", "property", "properties"))
|
|
and any(term in lowered for term in ("client", "clients", "contact", "contacts"))
|
|
):
|
|
return "interested_clients"
|
|
if any(term in lowered for term in client_360_terms):
|
|
return "client_360"
|
|
if any(term in lowered for term in timeline_terms):
|
|
return "timeline"
|
|
return None
|
|
|
|
|
|
def _dataset_for_codebook(example: CodebookExample, prompt: str, component_plan_type: str | None = None) -> str:
|
|
chapter = example.chapter_name.lower()
|
|
subchapter = example.subchapter_name.lower()
|
|
component_plan_type = component_plan_type or _component_plan_type_from_codebook(example)
|
|
lowered_prompt = prompt.lower()
|
|
data_intent = _prompt_data_intent(prompt)
|
|
|
|
if data_intent == "last_contacted":
|
|
return "oracle_last_contacted_clients" if component_plan_type != "activity_stream" else "oracle_client_interaction_timeline"
|
|
if data_intent == "interested_clients":
|
|
return "oracle_top_interested_clients" if component_plan_type == "table" else "oracle_property_interest_rollup"
|
|
if data_intent == "client_360":
|
|
return "oracle_client_360_summary"
|
|
if data_intent == "timeline":
|
|
return "oracle_client_interaction_timeline"
|
|
|
|
if component_plan_type == "activity_stream":
|
|
return "oracle_client_interaction_timeline"
|
|
if component_plan_type == "pipeline_board":
|
|
return "crm_opportunity_pipeline"
|
|
if component_plan_type == "table" and any(term in lowered_prompt for term in ("last interacted", "last interaction", "recently contacted", "recent interaction")):
|
|
return "oracle_last_contacted_clients"
|
|
if component_plan_type == "table" and any(term in lowered_prompt for term in ("interest", "interested", "project", "property", "properties")) and any(term in lowered_prompt for term in ("client", "clients", "contact", "contacts")):
|
|
return "oracle_top_interested_clients"
|
|
if component_plan_type == "line_chart" and any(term in lowered_prompt for term in ("trend", "time", "history", "growth")):
|
|
return "oracle_property_interest_rollup"
|
|
|
|
if any(term in lowered_prompt for term in ("contact", "client 360", "crm", "account", "lead")):
|
|
if "timeline" in lowered_prompt or "message" in lowered_prompt or "call" in lowered_prompt or "email" in lowered_prompt:
|
|
return "oracle_client_interaction_timeline"
|
|
if "pipeline" in lowered_prompt or "opportunit" in lowered_prompt:
|
|
return "crm_opportunity_pipeline"
|
|
if ("interest" in lowered_prompt or "project" in lowered_prompt or "property" in lowered_prompt) and ("client" in lowered_prompt or "contact" in lowered_prompt):
|
|
return "oracle_top_interested_clients"
|
|
if "interest" in lowered_prompt or "project" in lowered_prompt or "property" in lowered_prompt:
|
|
return "oracle_property_interest_rollup"
|
|
if "last interacted" in lowered_prompt or "recently contacted" in lowered_prompt or "recent interaction" in lowered_prompt:
|
|
return "oracle_last_contacted_clients"
|
|
return "crm_contacts_overview"
|
|
|
|
if "client" in chapter or "client" in subchapter or "contact" in subchapter:
|
|
return "crm_contacts_overview"
|
|
if "opportun" in chapter or "pipeline" in subchapter:
|
|
return "crm_opportunity_pipeline"
|
|
if "interaction" in chapter or "communication" in chapter or "timeline" in subchapter:
|
|
return "oracle_client_interaction_timeline"
|
|
if "property" in chapter or "inventory" in chapter or "interest" in subchapter:
|
|
return "oracle_property_interest_rollup"
|
|
return _DATASET_MAP.get(component_plan_type, "oracle_aggregated_metric")
|
|
|
|
|
|
def _build_codebook_retrieval_plan(
|
|
prompt: str,
|
|
tenant_id: str,
|
|
actor_role: str,
|
|
matches: list[CodebookExample],
|
|
) -> dict[str, Any]:
|
|
row_limit = _parse_prompt_row_limit(prompt, actor_role)
|
|
desired_types = _detect_component_types(prompt)
|
|
if not desired_types:
|
|
desired_types = [_component_plan_type_from_codebook(matches[0])] if matches else ["table"]
|
|
|
|
title_hints: dict[str, str] = {}
|
|
for example in matches:
|
|
mapped = _component_plan_type_from_codebook(example)
|
|
title_hints.setdefault(mapped, example.title)
|
|
|
|
components: list[dict[str, Any]] = []
|
|
exemplar = matches[0]
|
|
for component_plan_type in desired_types[:4]:
|
|
dataset = _dataset_for_codebook(exemplar, prompt, component_plan_type)
|
|
title_hint = _title_for_dataset(dataset, component_plan_type, prompt) or title_hints.get(component_plan_type, exemplar.title)
|
|
components.append(
|
|
{
|
|
"suggestedType": component_plan_type,
|
|
"dataset": dataset,
|
|
"privacyTier": "standard",
|
|
"rowLimit": row_limit,
|
|
"joins": [],
|
|
"queryTemplate": f"SELECT * FROM {dataset} WHERE tenant_id = :tenant_id LIMIT :limit",
|
|
"queryParameters": {"tenant_id": tenant_id, "limit": row_limit},
|
|
"templateRef": {
|
|
"exampleId": exemplar.example_id,
|
|
"templateName": exemplar.template_name,
|
|
"componentType": exemplar.component_type,
|
|
"chapterName": exemplar.chapter_name,
|
|
"subchapterName": exemplar.subchapter_name,
|
|
"sourcePack": exemplar.source_pack,
|
|
},
|
|
"titleHint": title_hint,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"planId": str(uuid.uuid4()),
|
|
"components": components,
|
|
"semanticModelVersion": "oracle_codebook_v2026_04_19_01",
|
|
"intentClass": "analytical",
|
|
"planner": "codebook_retrieval",
|
|
}
|
|
|
|
|
|
def _title_for_dataset(dataset: str, component_plan_type: str, prompt: str) -> str | None:
|
|
lowered_prompt = prompt.lower()
|
|
dataset_titles = {
|
|
"crm_contacts_overview": "CRM Contacts Overview",
|
|
"crm_opportunity_pipeline": "Opportunity Pipeline",
|
|
"crm_property_interest_rollup": "Property Interest Rollup",
|
|
"crm_interaction_timeline": "Client Interaction Timeline",
|
|
"crm_last_interacted_clients": "Last Interacted Clients",
|
|
"crm_top_interested_clients": "Top Interested Clients",
|
|
"oracle_property_interest_rollup": "Property Interest Rollup",
|
|
"oracle_client_interaction_timeline": "Client Interaction Timeline",
|
|
"oracle_last_contacted_clients": "Last Contacted Clients",
|
|
"oracle_top_interested_clients": "Top Interested Clients",
|
|
"oracle_client_360_summary": "Client 360 Summary",
|
|
"broker_performance": "Broker Performance",
|
|
}
|
|
if dataset in {"crm_top_interested_clients", "oracle_top_interested_clients"} and "top" in lowered_prompt:
|
|
return "Top Interested Clients"
|
|
if dataset in {"crm_last_interacted_clients", "oracle_last_contacted_clients"} and ("top" in lowered_prompt or "last" in lowered_prompt):
|
|
return "Last Contacted Clients"
|
|
return dataset_titles.get(dataset)
|
|
|
|
|
|
_RUNTIME_ALLOWED_DATASETS = {
|
|
"deals",
|
|
"lead_daily_snapshot",
|
|
"lead_geo_interest_rollup",
|
|
"broker_performance",
|
|
"inventory_absorption",
|
|
"oracle_aggregated_metric",
|
|
"lead_activity_log",
|
|
"crm_contacts_overview",
|
|
"crm_opportunity_pipeline",
|
|
"crm_property_interest_rollup",
|
|
"crm_interaction_timeline",
|
|
"crm_last_interacted_clients",
|
|
"crm_top_interested_clients",
|
|
"oracle_property_interest_rollup",
|
|
"oracle_client_interaction_timeline",
|
|
"oracle_last_contacted_clients",
|
|
"oracle_top_interested_clients",
|
|
"oracle_client_360_summary",
|
|
}
|
|
|
|
|
|
class PromptOrchestrator:
|
|
"""
|
|
Orchestrates the full prompt-to-canvas pipeline:
|
|
1. Intent classification
|
|
2. Retrieval plan construction (Nemoclaw or fallback)
|
|
3. Policy validation
|
|
4. Component plan construction
|
|
5. Execution record persistence
|
|
"""
|
|
|
|
async def execute(
|
|
self,
|
|
*,
|
|
tenant_id: str,
|
|
page_id: str,
|
|
branch_id: str,
|
|
actor_id: str,
|
|
actor_role: str,
|
|
prompt: str,
|
|
conversation_context: list[dict[str, str]] | None = None,
|
|
client_request_id: str,
|
|
placement_mode: str = "append_after_last_visible_component",
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Full orchestration flow. Returns a PromptExecution dict.
|
|
"""
|
|
execution_id = str(uuid.uuid4())
|
|
now = _now()
|
|
warnings: list[str] = []
|
|
|
|
ctx = PolicyContext(
|
|
tenant_id=tenant_id,
|
|
actor_id=actor_id,
|
|
actor_role=actor_role,
|
|
)
|
|
|
|
execution: dict[str, Any] = {
|
|
"executionId": execution_id,
|
|
"tenantId": tenant_id,
|
|
"pageId": page_id,
|
|
"branchId": branch_id,
|
|
"actorId": actor_id,
|
|
"prompt": prompt,
|
|
"intentClass": "analytical",
|
|
"status": "planning",
|
|
"modelRuntime": "runtime_llm" if runtime_llm_service._provider_catalog() else "deterministic_fallback",
|
|
"semanticModelVersion": "oracle_semantic_v2026_04_08_01",
|
|
"warnings": warnings,
|
|
"componentsCreated": [],
|
|
"clientRequestId": client_request_id,
|
|
"createdAt": now,
|
|
"codebookMatches": [],
|
|
}
|
|
_DEMO_EXECUTIONS[execution_id] = execution
|
|
await self._persist_execution(execution)
|
|
|
|
# ── Step 1: Build retrieval plan ──────────────────────────────────────
|
|
page = await canvas_service.get_page(page_id, tenant_id)
|
|
existing_comps = page.get("components", []) if page else []
|
|
next_order_base = self._next_order_base(existing_comps)
|
|
section_id = f"sec_prompt_generated_{execution_id.replace('-', '')[:12]}"
|
|
|
|
try:
|
|
natural_result = await natural_db_agent.execute_prompt(
|
|
prompt,
|
|
row_limit=_parse_prompt_row_limit(prompt, actor_role),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("ORCH natural DB agent failed with no fallback enabled: %s", exc)
|
|
execution["status"] = "failed"
|
|
execution["summary"] = f"Oracle planner failed: {exc}"
|
|
execution["completedAt"] = _now()
|
|
execution["warnings"] = warnings + [f"No fallback enabled. Natural planner failure: {exc}"]
|
|
await self._persist_execution(execution)
|
|
return execution
|
|
|
|
execution["status"] = "executing"
|
|
execution["retrievalPlan"] = {
|
|
"planId": str(uuid.uuid4()),
|
|
"planner": "oracle_natural_db_agent",
|
|
"sql": natural_result.sql,
|
|
"sourceTables": natural_result.source_tables,
|
|
"rowCount": natural_result.row_count,
|
|
}
|
|
viz_plan = self._build_natural_visualization_plan(
|
|
result=natural_result.as_dict(),
|
|
prompt=prompt,
|
|
execution_id=execution_id,
|
|
actor_id=actor_id,
|
|
branch_id=branch_id,
|
|
base_order=next_order_base,
|
|
section_id=section_id,
|
|
)
|
|
execution["visualizationPlan"] = viz_plan
|
|
execution["componentsCreated"] = [c["componentId"] for c in viz_plan.get("components", [])]
|
|
try:
|
|
if page:
|
|
revision = await canvas_service.commit_revision(
|
|
page_id=page_id,
|
|
tenant_id=tenant_id,
|
|
actor_id=actor_id,
|
|
commit_kind="prompt",
|
|
commit_summary=f"Oracle: {prompt[:80]}",
|
|
components=existing_comps + viz_plan.get("components", []),
|
|
execution_id=execution_id,
|
|
idempotency_key=client_request_id,
|
|
)
|
|
execution["headRevision"] = revision["revisionNumber"]
|
|
except Exception as exc:
|
|
logger.warning("ORCH natural revision_commit failed (non-fatal): %s", exc)
|
|
warnings.append("Revision commit deferred; will retry on next sync.")
|
|
execution["status"] = "completed"
|
|
execution["summary"] = self._generate_summary(prompt, viz_plan)
|
|
execution["completedAt"] = _now()
|
|
execution["warnings"] = warnings + natural_result.warnings
|
|
await self._persist_execution(execution)
|
|
return execution
|
|
|
|
codebook_matches = codebook_service.search_examples(prompt, limit=4)
|
|
execution["codebookMatches"] = [
|
|
{
|
|
"exampleId": match.example_id,
|
|
"templateName": match.template_name,
|
|
"componentType": match.component_type,
|
|
"chapterName": match.chapter_name,
|
|
"subchapterName": match.subchapter_name,
|
|
"sourcePack": match.source_pack,
|
|
}
|
|
for match in codebook_matches
|
|
]
|
|
|
|
if codebook_matches:
|
|
retrieval_plan = _build_codebook_retrieval_plan(prompt, tenant_id, actor_role, codebook_matches)
|
|
execution["status"] = "validated"
|
|
elif runtime_llm_service._provider_catalog():
|
|
try:
|
|
retrieval_plan = await self._call_nemoclaw(prompt, conversation_context or [], ctx)
|
|
execution["status"] = "validated"
|
|
except Exception as exc:
|
|
logger.warning("ORCH Nemoclaw call failed, using fallback: %s", exc)
|
|
warnings.append(f"Model runtime unavailable ({exc}); using deterministic fallback.")
|
|
retrieval_plan = _build_demo_retrieval_plan(prompt, tenant_id, actor_role)
|
|
else:
|
|
retrieval_plan = _build_demo_retrieval_plan(prompt, tenant_id, actor_role)
|
|
|
|
execution["retrievalPlan"] = retrieval_plan
|
|
|
|
persona_plan = await persona_service.plan_for_prompt(
|
|
prompt=prompt,
|
|
tenant_id=tenant_id,
|
|
actor_role=actor_role,
|
|
)
|
|
execution["personaPlan"] = persona_plan
|
|
execution["workflowDispatch"] = nemoclaw_runtime.build_workflow_dispatch(
|
|
prompt=prompt,
|
|
tenant_id=tenant_id,
|
|
actor_role=actor_role,
|
|
component_templates=persona_plan["recommendedTemplates"],
|
|
)
|
|
|
|
# ── Step 2: Policy validation ─────────────────────────────────────────
|
|
policy_errors = []
|
|
for component_plan in retrieval_plan.get("components", []):
|
|
result = policy_svc.validate_retrieval_plan(component_plan, ctx)
|
|
if not result.passed:
|
|
policy_errors.extend(result.errors)
|
|
if result.warnings:
|
|
warnings.extend(result.warnings)
|
|
|
|
if policy_errors:
|
|
execution["status"] = "failed"
|
|
execution["warnings"] = warnings + policy_errors
|
|
execution["completedAt"] = _now()
|
|
logger.warning(
|
|
"ORCH policy_denial execution_id=%s actor=%s errors=%s",
|
|
execution_id, actor_id, policy_errors,
|
|
)
|
|
return execution
|
|
|
|
execution["status"] = "executing"
|
|
await self._persist_execution(execution)
|
|
|
|
page = await canvas_service.get_page(page_id, tenant_id)
|
|
existing_comps = page.get("components", []) if page else []
|
|
next_order_base = self._next_order_base(existing_comps)
|
|
section_id = f"sec_prompt_generated_{execution_id.replace('-', '')[:12]}"
|
|
|
|
# ── Step 3: Build visualization plan (component descriptors) ──────────
|
|
viz_plan = await self._build_visualization_plan(
|
|
retrieval_plan=retrieval_plan,
|
|
prompt=prompt,
|
|
execution_id=execution_id,
|
|
actor_id=actor_id,
|
|
tenant_id=tenant_id,
|
|
branch_id=branch_id,
|
|
placement_mode=placement_mode,
|
|
ctx=ctx,
|
|
persona_plan=persona_plan,
|
|
base_order=next_order_base,
|
|
section_id=section_id,
|
|
)
|
|
execution["visualizationPlan"] = viz_plan
|
|
|
|
# ── Step 4: Commit revision ───────────────────────────────────────────
|
|
component_ids = [c["componentId"] for c in viz_plan.get("components", [])]
|
|
execution["componentsCreated"] = component_ids
|
|
|
|
# Commit a revision bump with the new components
|
|
try:
|
|
if page:
|
|
new_comps = existing_comps + viz_plan.get("components", [])
|
|
revision = await canvas_service.commit_revision(
|
|
page_id=page_id,
|
|
tenant_id=tenant_id,
|
|
actor_id=actor_id,
|
|
commit_kind="prompt",
|
|
commit_summary=f"Oracle: {prompt[:80]}",
|
|
components=new_comps,
|
|
execution_id=execution_id,
|
|
idempotency_key=client_request_id,
|
|
)
|
|
execution["headRevision"] = revision["revisionNumber"]
|
|
except Exception as exc:
|
|
logger.warning("ORCH revision_commit failed (non-fatal): %s", exc)
|
|
warnings.append("Revision commit deferred — will retry on next sync.")
|
|
|
|
execution["status"] = "completed"
|
|
execution["summary"] = self._generate_summary(prompt, viz_plan)
|
|
execution["completedAt"] = _now()
|
|
execution["warnings"] = warnings
|
|
await self._persist_execution(execution)
|
|
return execution
|
|
|
|
async def _build_visualization_plan(
|
|
self,
|
|
*,
|
|
retrieval_plan: dict[str, Any],
|
|
prompt: str,
|
|
execution_id: str,
|
|
actor_id: str,
|
|
tenant_id: str,
|
|
branch_id: str,
|
|
placement_mode: str,
|
|
ctx: PolicyContext,
|
|
persona_plan: dict[str, Any],
|
|
base_order: int,
|
|
section_id: str,
|
|
) -> dict[str, Any]:
|
|
"""Converts a retrieval plan into a list of CanvasComponent descriptors."""
|
|
components = [
|
|
self._persona_text_canvas(
|
|
execution_id=execution_id,
|
|
actor_id=actor_id,
|
|
branch_id=branch_id,
|
|
prompt=prompt,
|
|
persona_plan=persona_plan,
|
|
order_index=base_order + 100,
|
|
section_id=section_id,
|
|
)
|
|
]
|
|
|
|
component_plans = retrieval_plan.get("components", [])
|
|
for i, plan in enumerate(component_plans):
|
|
ctype = plan["suggestedType"]
|
|
dataset = plan["dataset"]
|
|
component_id = str(uuid.uuid4())
|
|
query_result = await data_access_gateway.execute_component_plan(plan, ctx, prompt)
|
|
component_warnings = query_result.warnings
|
|
mapped_type = self._map_type(ctype)
|
|
data_rows = query_result.rows
|
|
|
|
comp: dict[str, Any] = {
|
|
"componentId": component_id,
|
|
"type": mapped_type,
|
|
"title": str(plan.get("titleHint") or self._generate_title(prompt, ctype)),
|
|
"description": f"Generated from: \"{prompt[:80]}\"",
|
|
"dataSourceDescriptor": {
|
|
"descriptorId": str(uuid.uuid4()),
|
|
"sourceType": "postgres",
|
|
"connectorId": "velocity-core-postgres",
|
|
"dataset": dataset,
|
|
"authContextRef": f"authctx_{actor_id}_scope",
|
|
"queryTemplate": plan.get("queryTemplate", f"SELECT * FROM {dataset} WHERE tenant_id = :tenant_id"),
|
|
"queryParameters": plan.get("queryParameters", {"tenant_id": tenant_id}),
|
|
"rowLimit": plan.get("rowLimit", 50),
|
|
"privacyTier": plan.get("privacyTier", "standard"),
|
|
"cachePolicy": {"mode": "ttl", "ttlSeconds": 120},
|
|
},
|
|
"visualizationParameters": self._default_viz_params(ctype, dataset, data_rows),
|
|
"dataBindings": self._default_bindings(ctype),
|
|
"version": 1,
|
|
"lifecycleState": "active",
|
|
"provenance": {
|
|
"originType": "prompt_generated",
|
|
"promptExecutionId": execution_id,
|
|
"sourceBranchId": branch_id,
|
|
"createdBy": actor_id,
|
|
"createdAt": _iso(_now()),
|
|
},
|
|
"renderingHints": self._rendering_hints(ctype),
|
|
"layout": {
|
|
"orderIndex": base_order + (i + 1) * 100,
|
|
"sectionId": section_id,
|
|
"widthMode": "full" if ctype in ("pipeline_board", "table", "geo_map") else "half",
|
|
"minHeightPx": 300,
|
|
"stickyHeader": False,
|
|
},
|
|
"accessControls": {
|
|
"visibilityScope": "private",
|
|
"allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"],
|
|
"redactionPolicy": "none",
|
|
},
|
|
"styleSignature": {
|
|
"theme": "velocity_glass",
|
|
"paletteToken": "ocean_signal",
|
|
"motionProfile": "calm_reveal",
|
|
"density": "comfortable",
|
|
"radiusScale": "lg",
|
|
"typographyScale": "balanced",
|
|
},
|
|
"validationState": {
|
|
"schema": "pass",
|
|
"policy": "pass",
|
|
"a11y": "pass",
|
|
"performance": "pass",
|
|
"status": "validated",
|
|
},
|
|
"auditLog": [f"aud_{execution_id}_create"],
|
|
"dataRows": data_rows,
|
|
}
|
|
if component_warnings and not data_rows:
|
|
comp = self._error_component(
|
|
component_id=component_id,
|
|
execution_id=execution_id,
|
|
actor_id=actor_id,
|
|
branch_id=branch_id,
|
|
dataset=dataset,
|
|
warnings=component_warnings,
|
|
order_index=base_order + (i + 1) * 100,
|
|
section_id=section_id,
|
|
)
|
|
components.append(comp)
|
|
|
|
if len(components) > 1:
|
|
planning_component = components.pop(0)
|
|
planning_component["layout"]["orderIndex"] = base_order + (len(component_plans) + 1) * 100
|
|
components.append(planning_component)
|
|
|
|
return {"components": components}
|
|
|
|
def _build_natural_visualization_plan(
|
|
self,
|
|
*,
|
|
result: dict[str, Any],
|
|
prompt: str,
|
|
execution_id: str,
|
|
actor_id: str,
|
|
branch_id: str,
|
|
base_order: int,
|
|
section_id: str,
|
|
) -> dict[str, Any]:
|
|
rows = result.get("rows") or []
|
|
columns = result.get("columns") or (list(rows[0].keys()) if rows else [])
|
|
ctype_raw = str(result.get("componentType") or "table")
|
|
ctype = _canonical_plan_type(ctype_raw)
|
|
mapped_type = self._map_type(ctype_raw)
|
|
dataset = "oracle_natural_sql"
|
|
component_id = str(uuid.uuid4())
|
|
viz_decision = result.get("visualizationDecision") or {}
|
|
x_axis, y_axis = _infer_chart_axes(rows, columns)
|
|
bindings = dict(viz_decision.get("dataBindings") or self._default_bindings(ctype))
|
|
viz_params = {
|
|
**self._default_viz_params(ctype, dataset, rows),
|
|
**dict(viz_decision.get("vizParams") or {}),
|
|
"columns": columns,
|
|
"sqlSummary": result.get("summary"),
|
|
"sourceTables": result.get("sourceTables", []),
|
|
"rowCount": result.get("rowCount", len(rows)),
|
|
}
|
|
if mapped_type in {"barChart", "lineChart"}:
|
|
if not viz_params.get("xAxis") and x_axis:
|
|
viz_params["xAxis"] = x_axis
|
|
if not viz_params.get("yAxis") and y_axis:
|
|
viz_params["yAxis"] = y_axis
|
|
if not bindings.get("dimensions") and x_axis:
|
|
bindings["dimensions"] = [x_axis]
|
|
if not bindings.get("measures") and y_axis:
|
|
bindings["measures"] = [y_axis]
|
|
comp: dict[str, Any] = {
|
|
"componentId": component_id,
|
|
"type": mapped_type,
|
|
"title": result.get("title") or self._generate_title(prompt, ctype_raw),
|
|
"description": f"SQL-backed Oracle result from: \"{prompt[:96]}\"",
|
|
"dataSourceDescriptor": {
|
|
"descriptorId": str(uuid.uuid4()),
|
|
"sourceType": "postgres",
|
|
"connectorId": "velocity-core-postgres",
|
|
"dataset": dataset,
|
|
"authContextRef": f"authctx_{actor_id}_scope",
|
|
"queryTemplate": result.get("sql", ""),
|
|
"queryParameters": {},
|
|
"rowLimit": len(rows),
|
|
"privacyTier": "standard",
|
|
"cachePolicy": {"mode": "revision_scoped"},
|
|
},
|
|
"visualizationParameters": viz_params,
|
|
"dataBindings": bindings,
|
|
"version": 1,
|
|
"lifecycleState": "active",
|
|
"provenance": {
|
|
"originType": "prompt_generated",
|
|
"promptExecutionId": execution_id,
|
|
"sourceBranchId": branch_id,
|
|
"createdBy": actor_id,
|
|
"createdAt": _iso(_now()),
|
|
"sourceTables": result.get("sourceTables", []),
|
|
"sqlSummary": result.get("summary"),
|
|
},
|
|
"renderingHints": {
|
|
**self._rendering_hints(ctype),
|
|
**(
|
|
{
|
|
"estimatedHeightPx": int(viz_decision.get("minHeightPx", 0) or 0),
|
|
"skeletonVariant": str(viz_decision.get("skeletonVariant") or ""),
|
|
}
|
|
if viz_decision
|
|
else {}
|
|
),
|
|
},
|
|
"layout": {
|
|
"orderIndex": base_order + 100,
|
|
"sectionId": section_id,
|
|
"widthMode": str(viz_decision.get("widthMode") or ("full" if mapped_type in ("table", "pipelineBoard", "timeline", "activityStream") else "half")),
|
|
"minHeightPx": int(viz_decision.get("minHeightPx") or 320),
|
|
"stickyHeader": False,
|
|
},
|
|
"accessControls": {
|
|
"visibilityScope": "private",
|
|
"allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"],
|
|
"redactionPolicy": "none",
|
|
},
|
|
"styleSignature": {
|
|
"theme": "velocity_glass",
|
|
"paletteToken": "ocean_signal",
|
|
"motionProfile": "calm_reveal",
|
|
"density": "comfortable",
|
|
"radiusScale": "lg",
|
|
"typographyScale": "balanced",
|
|
},
|
|
"validationState": {
|
|
"schema": "pass",
|
|
"policy": "pass",
|
|
"a11y": "pass",
|
|
"performance": "pass",
|
|
"status": "validated",
|
|
},
|
|
"auditLog": [f"aud_{execution_id}_natural_sql"],
|
|
"dataRows": rows,
|
|
}
|
|
return {"components": [comp]}
|
|
|
|
@staticmethod
|
|
def _next_order_base(existing_components: list[dict[str, Any]]) -> int:
|
|
max_existing = 0
|
|
for component in existing_components:
|
|
try:
|
|
order_index = int((component.get("layout") or {}).get("orderIndex", 0))
|
|
except (TypeError, ValueError):
|
|
order_index = 0
|
|
if order_index > max_existing:
|
|
max_existing = order_index
|
|
return ((max_existing // 100) + 1) * 100
|
|
|
|
@staticmethod
|
|
def _persona_text_canvas(
|
|
*,
|
|
execution_id: str,
|
|
actor_id: str,
|
|
branch_id: str,
|
|
prompt: str,
|
|
persona_plan: dict[str, Any],
|
|
order_index: int,
|
|
section_id: str,
|
|
) -> dict[str, Any]:
|
|
content = (
|
|
f"Oracle received: {prompt}\n\n"
|
|
"Execution policy: query live CRM data first, pick the strongest-fitting canvas components, "
|
|
"and synthesize any missing UI blocks before rendering the result."
|
|
)
|
|
return {
|
|
"componentId": str(uuid.uuid4()),
|
|
"type": "textCanvas",
|
|
"title": "Oracle Planning Notes",
|
|
"description": "Persona-driven guidance generated before data-bound components.",
|
|
"dataSourceDescriptor": {
|
|
"descriptorId": str(uuid.uuid4()),
|
|
"sourceType": "inline",
|
|
"connectorId": "oracle-persona",
|
|
"dataset": "oracle_persona_plan",
|
|
"authContextRef": f"authctx_{actor_id}_scope",
|
|
"queryTemplate": "",
|
|
"queryParameters": {},
|
|
"rowLimit": 1,
|
|
"privacyTier": "standard",
|
|
},
|
|
"visualizationParameters": {
|
|
"content": content,
|
|
"widthMode": "full",
|
|
"adjustableHeight": True,
|
|
},
|
|
"dataBindings": {"dimensions": [], "measures": [], "series": [], "filters": []},
|
|
"version": 1,
|
|
"lifecycleState": "active",
|
|
"provenance": {
|
|
"originType": "prompt_generated",
|
|
"promptExecutionId": execution_id,
|
|
"sourceBranchId": branch_id,
|
|
"createdBy": actor_id,
|
|
"createdAt": _iso(_now()),
|
|
},
|
|
"renderingHints": {"estimatedHeightPx": 180, "skeletonVariant": "text", "virtualizationPriority": 4},
|
|
"layout": {
|
|
"orderIndex": order_index,
|
|
"sectionId": section_id,
|
|
"widthMode": "full",
|
|
"minHeightPx": 180,
|
|
"stickyHeader": False,
|
|
},
|
|
"accessControls": {
|
|
"visibilityScope": "private",
|
|
"allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"],
|
|
"redactionPolicy": "none",
|
|
},
|
|
"styleSignature": {
|
|
"theme": "velocity_glass",
|
|
"paletteToken": "ocean_signal",
|
|
"motionProfile": "calm_reveal",
|
|
"density": "comfortable",
|
|
"radiusScale": "lg",
|
|
"typographyScale": "balanced",
|
|
},
|
|
"validationState": {
|
|
"schema": "pass",
|
|
"policy": "pass",
|
|
"a11y": "pass",
|
|
"performance": "pass",
|
|
"status": "validated",
|
|
},
|
|
"auditLog": [f"aud_{execution_id}_persona"],
|
|
"dataRows": [],
|
|
}
|
|
|
|
@staticmethod
|
|
def _map_type(plan_type: str) -> str:
|
|
plan_type = _canonical_plan_type(plan_type)
|
|
mapping = {
|
|
"pipeline_board": "pipelineBoard",
|
|
"bar_chart": "barChart",
|
|
"geo_map": "geoMap",
|
|
"table": "table",
|
|
"line_chart": "lineChart",
|
|
"kpi_tile": "kpiTile",
|
|
"activity_stream": "activityStream",
|
|
}
|
|
return mapping.get(plan_type, "barChart")
|
|
|
|
@staticmethod
|
|
def _generate_title(prompt: str, comp_type: str) -> str:
|
|
comp_type = _canonical_plan_type(comp_type)
|
|
labels = {
|
|
"pipeline_board": "Pipeline View",
|
|
"bar_chart": "Comparative Analysis",
|
|
"geo_map": "Geographic Distribution",
|
|
"table": "Performance Table",
|
|
"line_chart": "Trend Analysis",
|
|
"kpi_tile": "Key Metric",
|
|
"activity_stream": "Activity Stream",
|
|
}
|
|
return labels.get(comp_type, "Oracle Canvas Component")
|
|
|
|
@staticmethod
|
|
def _default_viz_params(comp_type: str, dataset: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
|
|
comp_type = _canonical_plan_type(comp_type)
|
|
first_row = rows[0] if rows else {}
|
|
inferred_columns = [key for key in first_row.keys() if key not in {"avatar"}] or ["name", "status"]
|
|
table_columns_by_dataset: dict[str, list[str]] = {
|
|
"broker_performance": ["name", "deals_closed", "revenue_generated"],
|
|
"crm_contacts_overview": ["name", "email", "phone", "city", "buyer_type", "qd_score"],
|
|
"crm_last_interacted_clients": ["name", "email", "phone", "last_interaction_at", "interaction_count", "qd_score"],
|
|
"crm_top_interested_clients": ["name", "email", "phone", "interest_count", "projects", "qd_score"],
|
|
"oracle_last_contacted_clients": ["name", "phone", "last_contacted_at", "last_contact_channel", "last_contact_summary", "interaction_count", "qd_score", "next_action"],
|
|
"oracle_top_interested_clients": ["name", "phone", "interest_count", "projects", "last_interest_at", "qd_score"],
|
|
"oracle_client_360_summary": ["name", "phone", "lead_status", "budget_band", "urgency", "qd_score", "interest_count", "interaction_count", "projects"],
|
|
}
|
|
defaults: dict[str, dict[str, Any]] = {
|
|
"bar_chart": {"xAxis": "category", "yAxis": "value", "sort": "desc", "showLabels": True, "legend": False},
|
|
"line_chart": {"showPoints": True, "smooth": True},
|
|
"kpi_tile": {
|
|
"label": first_row.get("metric_label", "Result"),
|
|
"trend": str(first_row.get("trend_value", "")),
|
|
"comparisonLabel": first_row.get("comparison_label", ""),
|
|
},
|
|
"geo_map": {"mapStyle": "dubai_district_heat", "intensityField": "lead_count", "interactive": True, "tooltipFields": ["district", "lead_count", "avg_qd_score"]},
|
|
"table": {
|
|
"rankBy": "revenue_generated",
|
|
"showTopBadge": True,
|
|
"columns": table_columns_by_dataset.get(
|
|
dataset,
|
|
inferred_columns,
|
|
),
|
|
"emptyStateTitle": "No matching records found",
|
|
"emptyStateDescription": "The query ran successfully but returned no rows for this prompt.",
|
|
},
|
|
"pipeline_board": {"showValue": True, "colorByStage": True},
|
|
"activity_stream": {"showUrgencyIndicator": True},
|
|
}
|
|
return defaults.get(comp_type, {})
|
|
|
|
@staticmethod
|
|
def _default_bindings(comp_type: str) -> dict[str, Any]:
|
|
del comp_type
|
|
return {"dimensions": [], "measures": [], "series": [], "filters": []}
|
|
|
|
@staticmethod
|
|
def _rendering_hints(comp_type: str) -> dict[str, Any]:
|
|
comp_type = _canonical_plan_type(comp_type)
|
|
priority_map = {
|
|
"pipeline_board": ("pipeline", 9), "bar_chart": ("chart", 8),
|
|
"geo_map": ("map", 9), "table": ("table", 7),
|
|
"line_chart": ("chart", 8), "kpi_tile": ("kpi", 6),
|
|
"activity_stream": ("table", 8),
|
|
}
|
|
skeleton, priority = priority_map.get(comp_type, ("chart", 7))
|
|
height_map = {
|
|
"pipeline_board": 400, "bar_chart": 320, "geo_map": 420,
|
|
"table": 300, "line_chart": 320, "kpi_tile": 140, "activity_stream": 360,
|
|
}
|
|
return {
|
|
"estimatedHeightPx": height_map.get(comp_type, 300),
|
|
"skeletonVariant": skeleton,
|
|
"virtualizationPriority": priority,
|
|
}
|
|
|
|
@staticmethod
|
|
def _generate_summary(prompt: str, viz_plan: dict[str, Any]) -> str:
|
|
count = len([component for component in viz_plan.get("components", []) if component.get("type") != "textCanvas"])
|
|
short_prompt = prompt[:60] + ("…" if len(prompt) > 60 else "")
|
|
return f'Generated {count} component{"s" if count != 1 else ""} for: "{short_prompt}"'
|
|
|
|
@staticmethod
|
|
def _error_component(
|
|
*,
|
|
component_id: str,
|
|
execution_id: str,
|
|
actor_id: str,
|
|
branch_id: str,
|
|
dataset: str,
|
|
warnings: list[str],
|
|
order_index: int,
|
|
section_id: str,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"componentId": component_id,
|
|
"type": "errorNotice",
|
|
"title": f"{dataset} unavailable",
|
|
"description": "Oracle could not render live data for this component.",
|
|
"dataSourceDescriptor": {
|
|
"descriptorId": str(uuid.uuid4()),
|
|
"sourceType": "postgres",
|
|
"connectorId": "velocity-core-postgres",
|
|
"dataset": dataset,
|
|
"authContextRef": f"authctx_{actor_id}_scope",
|
|
"queryTemplate": "",
|
|
"queryParameters": {},
|
|
"rowLimit": 0,
|
|
"privacyTier": "standard",
|
|
},
|
|
"visualizationParameters": {
|
|
"errorCode": "oracle_live_query_failed",
|
|
"message": " | ".join(warnings[:2]),
|
|
"severity": "warning",
|
|
"retryable": True,
|
|
},
|
|
"dataBindings": {"dimensions": [], "measures": [], "series": [], "filters": []},
|
|
"version": 1,
|
|
"lifecycleState": "active",
|
|
"provenance": {
|
|
"originType": "prompt_generated",
|
|
"promptExecutionId": execution_id,
|
|
"sourceBranchId": branch_id,
|
|
"createdBy": actor_id,
|
|
"createdAt": _iso(_now()),
|
|
},
|
|
"renderingHints": {"estimatedHeightPx": 140, "skeletonVariant": "generic", "virtualizationPriority": 5},
|
|
"layout": {
|
|
"orderIndex": order_index,
|
|
"sectionId": section_id,
|
|
"widthMode": "full",
|
|
"minHeightPx": 140,
|
|
"stickyHeader": False,
|
|
},
|
|
"accessControls": {
|
|
"visibilityScope": "private",
|
|
"allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"],
|
|
"redactionPolicy": "none",
|
|
},
|
|
"styleSignature": {
|
|
"theme": "velocity_glass",
|
|
"paletteToken": "ocean_signal",
|
|
"motionProfile": "calm_reveal",
|
|
"density": "comfortable",
|
|
"radiusScale": "lg",
|
|
"typographyScale": "balanced",
|
|
},
|
|
"validationState": {
|
|
"schema": "pass",
|
|
"policy": "pass",
|
|
"a11y": "pass",
|
|
"performance": "pass",
|
|
"status": "validated",
|
|
},
|
|
"auditLog": [f"aud_{execution_id}_error"],
|
|
"dataRows": [],
|
|
}
|
|
|
|
async def _call_nemoclaw(
|
|
self,
|
|
prompt: str,
|
|
context: list[dict[str, str]],
|
|
ctx: PolicyContext,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Uses the shared runtime LLM service to propose a retrieval plan.
|
|
Raises on malformed output so the orchestrator can fall back safely.
|
|
"""
|
|
row_limit = _parse_prompt_row_limit(prompt, ctx.actor_role)
|
|
system_prompt = (
|
|
"You are the Oracle planner for Project Velocity. "
|
|
"Return JSON only. "
|
|
"Choose up to 4 analytical components for the prompt. "
|
|
"Allowed component types: pipeline_board, bar_chart, geo_map, table, line_chart, kpi_tile, activity_stream. "
|
|
"Allowed datasets: deals, lead_daily_snapshot, lead_geo_interest_rollup, broker_performance, inventory_absorption, "
|
|
"oracle_aggregated_metric, lead_activity_log, crm_contacts_overview, crm_opportunity_pipeline, "
|
|
"crm_property_interest_rollup, crm_interaction_timeline, crm_last_interacted_clients, crm_top_interested_clients, "
|
|
"oracle_property_interest_rollup, oracle_client_interaction_timeline, oracle_last_contacted_clients, "
|
|
"oracle_top_interested_clients, oracle_client_360_summary. "
|
|
"Return an object with keys semanticModelVersion, intentClass, components. "
|
|
"Each component must include suggestedType, dataset, and titleHint. "
|
|
"Do not emit SQL. Do not invent datasets outside the allowlist."
|
|
)
|
|
response = await runtime_llm_service.chat(
|
|
provider_id=None,
|
|
model=None,
|
|
system_prompt=system_prompt,
|
|
messages=[
|
|
*context,
|
|
{
|
|
"role": "user",
|
|
"content": json.dumps(
|
|
{
|
|
"prompt": prompt,
|
|
"tenantId": ctx.tenant_id,
|
|
"actorRole": ctx.actor_role,
|
|
"rowLimit": row_limit,
|
|
}
|
|
),
|
|
},
|
|
],
|
|
temperature=0.1,
|
|
response_format="json",
|
|
metadata={"planner": "oracle_canvas"},
|
|
)
|
|
payload = response.get("message", {}).get("parsedJson") or {}
|
|
components_payload = payload.get("components")
|
|
if not isinstance(components_payload, list) or not components_payload:
|
|
raise ValueError("Runtime LLM planner returned no components.")
|
|
|
|
normalized_components: list[dict[str, Any]] = []
|
|
for raw_component in components_payload[:4]:
|
|
if not isinstance(raw_component, dict):
|
|
continue
|
|
suggested_type = str(raw_component.get("suggestedType", "")).strip()
|
|
dataset = str(raw_component.get("dataset", "")).strip()
|
|
if suggested_type not in _DATASET_MAP or dataset not in _RUNTIME_ALLOWED_DATASETS:
|
|
continue
|
|
normalized_components.append(
|
|
{
|
|
"suggestedType": suggested_type,
|
|
"dataset": dataset,
|
|
"privacyTier": "standard",
|
|
"rowLimit": row_limit,
|
|
"joins": [],
|
|
"queryTemplate": f"SELECT * FROM {dataset} WHERE tenant_id = :tenant_id LIMIT :limit",
|
|
"queryParameters": {"tenant_id": ctx.tenant_id, "limit": row_limit},
|
|
"titleHint": str(raw_component.get("titleHint", "")).strip() or self._generate_title(prompt, suggested_type),
|
|
}
|
|
)
|
|
|
|
if not normalized_components:
|
|
raise ValueError("Runtime LLM planner returned no valid whitelisted components.")
|
|
|
|
return {
|
|
"planId": str(uuid.uuid4()),
|
|
"components": normalized_components,
|
|
"semanticModelVersion": str(payload.get("semanticModelVersion") or "oracle_runtime_llm_v2026_04_19_01"),
|
|
"intentClass": str(payload.get("intentClass") or "analytical"),
|
|
"planner": "runtime_llm",
|
|
}
|
|
|
|
async def get_execution(self, execution_id: str) -> dict[str, Any] | None:
|
|
return _DEMO_EXECUTIONS.get(execution_id)
|
|
|
|
async def _persist_execution(self, execution: dict[str, Any]) -> None:
|
|
_DEMO_EXECUTIONS[execution["executionId"]] = execution
|
|
if not _db_ready():
|
|
return
|
|
assert asyncpg is not None
|
|
conn = await asyncpg.connect(_DB_URL)
|
|
try:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO oracle_prompt_executions (
|
|
execution_id, tenant_id, page_id, branch_id, actor_id, prompt, intent_class,
|
|
status, model_runtime, semantic_model_version, retrieval_plan, visualization_plan,
|
|
warnings, summary, components_created, client_request_id, created_at, completed_at
|
|
)
|
|
VALUES (
|
|
$1::uuid, $2, $3::uuid, $4, $5, $6, $7,
|
|
$8, $9, $10, $11::jsonb, $12::jsonb,
|
|
$13::text[], $14, $15::text[], $16, $17::timestamptz, $18::timestamptz
|
|
)
|
|
ON CONFLICT (execution_id)
|
|
DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
retrieval_plan = EXCLUDED.retrieval_plan,
|
|
visualization_plan = EXCLUDED.visualization_plan,
|
|
warnings = EXCLUDED.warnings,
|
|
summary = EXCLUDED.summary,
|
|
components_created = EXCLUDED.components_created,
|
|
completed_at = EXCLUDED.completed_at
|
|
""",
|
|
execution["executionId"],
|
|
execution["tenantId"],
|
|
execution["pageId"],
|
|
execution["branchId"],
|
|
execution["actorId"],
|
|
execution["prompt"],
|
|
execution["intentClass"],
|
|
execution["status"],
|
|
execution["modelRuntime"],
|
|
execution["semanticModelVersion"],
|
|
json.dumps(_json_safe(execution.get("retrievalPlan") or {})),
|
|
json.dumps(_json_safe(execution.get("visualizationPlan") or {})),
|
|
execution.get("warnings", []),
|
|
execution.get("summary"),
|
|
execution.get("componentsCreated", []),
|
|
execution.get("clientRequestId"),
|
|
_coerce_datetime(execution["createdAt"]),
|
|
_coerce_datetime(execution.get("completedAt")),
|
|
)
|
|
finally:
|
|
await conn.close()
|
|
|
|
|
|
# ── Singleton ─────────────────────────────────────────────────────────────────
|
|
|
|
prompt_orchestrator = PromptOrchestrator()
|