""" oracle/data_access_gateway.py Read-only, policy-aware PostgreSQL query executor for Oracle datasets. Nemoclaw is treated strictly as a planner. The gateway executes only whitelisted dataset queries and always injects the actor's tenant scope. """ from __future__ import annotations import logging import os from dataclasses import dataclass from typing import Any try: import asyncpg # type: ignore except Exception: # pragma: no cover asyncpg = None # type: ignore from .policy_service import PolicyContext, PolicyService logger = logging.getLogger(__name__) _DB_URL = os.getenv("DATABASE_URL", "") _ALLOW_IN_MEMORY = os.getenv("ORACLE_ALLOW_IN_MEMORY_FALLBACK", "").lower() in {"1", "true", "yes"} @dataclass class QueryExecutionResult: rows: list[dict[str, Any]] warnings: list[str] def _db_ready() -> bool: return bool(_DB_URL and not _DB_URL.startswith("PLACEHOLDER") and asyncpg is not None) class DataAccessGateway: def __init__(self) -> None: self.policy_service = PolicyService() async def execute_component_plan( self, component_plan: dict[str, Any], ctx: PolicyContext, prompt: str, ) -> QueryExecutionResult: dataset = str(component_plan.get("dataset", "")).strip() if not dataset: return QueryExecutionResult(rows=[], warnings=["Dataset missing from retrieval plan."]) validation = self.policy_service.validate_retrieval_plan(component_plan, ctx) self.policy_service.audit_policy_check(ctx, dataset, validation) if not validation.passed: return QueryExecutionResult(rows=[], warnings=validation.errors) if not _db_ready(): if _ALLOW_IN_MEMORY or "PYTEST_CURRENT_TEST" in os.environ: return QueryExecutionResult(rows=[], warnings=[]) raise RuntimeError("Oracle requires DATABASE_URL and asyncpg for real-time data access.") try: rows = await self._query_dataset( dataset=dataset, row_limit=validation.effective_row_limit, ctx=ctx, prompt=prompt, ) except Exception as exc: logger.warning("DATA_GATEWAY query_failed dataset=%s error=%s", dataset, exc) return QueryExecutionResult(rows=[], warnings=[f"{dataset}: {exc}"]) redacted = self.policy_service.redact(rows, validation.redaction_policy) return QueryExecutionResult(rows=redacted, warnings=validation.warnings) async def _query_dataset( self, *, dataset: str, row_limit: int, ctx: PolicyContext, prompt: str, ) -> list[dict[str, Any]]: sql, params = self._build_whitelisted_query(dataset, row_limit, ctx, prompt) assert asyncpg is not None conn = await asyncpg.connect(_DB_URL) try: records = await conn.fetch(sql, *params) finally: await conn.close() return [dict(record) for record in records] def _build_whitelisted_query( self, dataset: str, row_limit: int, ctx: PolicyContext, prompt: str, ) -> tuple[str, list[Any]]: lower_prompt = prompt.lower() if dataset == "deals": sql = """ SELECT stage, COUNT(*)::int AS count, COALESCE(SUM(value), 0)::float AS value, COALESCE( json_agg( json_build_object( 'id', lead_id, 'name', lead_name, 'company', company, 'value', value_label, 'avatar', avatar_url ) ORDER BY value DESC NULLS LAST ) FILTER (WHERE lead_id IS NOT NULL), '[]'::json ) AS leads FROM deals WHERE tenant_id = $1 GROUP BY stage ORDER BY COALESCE(SUM(value), 0) DESC, stage ASC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "lead_daily_snapshot": sql = """ SELECT source, COALESCE(SUM(qd_weighted_score), 0)::float AS qd_weighted_volume FROM lead_daily_snapshot WHERE tenant_id = $1 GROUP BY source ORDER BY qd_weighted_volume DESC, source ASC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "lead_geo_interest_rollup": sql = """ SELECT district, lat, lng, COALESCE(lead_count, 0)::int AS lead_count, COALESCE(avg_qd_score, 0)::float AS avg_qd_score, COALESCE(x, 0)::float AS x, COALESCE(y, 0)::float AS y FROM lead_geo_interest_rollup WHERE tenant_id = $1 ORDER BY lead_count DESC, district ASC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "broker_performance": sql = """ SELECT ROW_NUMBER() OVER ( ORDER BY COUNT(DISTINCT l.person_id) DESC, COALESCE(u.full_name, u.email, u.id::text) ASC )::int AS rank, COALESCE(u.full_name, u.email, u.id::text) AS name, COUNT(DISTINCT l.person_id)::int AS deals_closed, COALESCE(SUM(o.value), 0)::float AS revenue_generated, u.avatar_url AS avatar FROM users_and_roles u LEFT JOIN crm_leads l ON l.assigned_user_id = u.id LEFT JOIN crm_opportunities o ON o.lead_id = l.lead_id WHERE u.is_active = TRUE GROUP BY u.id, u.full_name, u.email, u.avatar_url HAVING COUNT(DISTINCT l.person_id) > 0 OR COALESCE(SUM(o.value), 0) > 0 ORDER BY revenue_generated DESC, name ASC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "inventory_absorption": sql = """ SELECT period_label AS period, COALESCE(absorption_rate, 0)::float AS absorption_rate, COALESCE(target_rate, 0)::float AS target_rate FROM inventory_absorption WHERE tenant_id = $1 ORDER BY period_start ASC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "oracle_aggregated_metric": metric_name = "total_leads" if "pipeline" in lower_prompt: metric_name = "total_pipeline_value" elif "quota" in lower_prompt or "attainment" in lower_prompt: metric_name = "quota_attainment" sql = """ SELECT metric_value, metric_label, trend_value, comparison_label FROM oracle_aggregated_metric WHERE tenant_id = $1 AND metric_name = $2 ORDER BY observed_at DESC LIMIT 1 """ return sql, [ctx.tenant_id, metric_name] if dataset == "lead_activity_log": if "follow-up" in lower_prompt or "queue" in lower_prompt: sql = """ SELECT lead_name AS name, assigned_broker, COALESCE(last_contact_hours_ago, 0)::int AS last_contact_hours_ago, COALESCE(qd_score, 0)::float AS qd_score, urgency, avatar_url AS avatar FROM lead_activity_log WHERE tenant_id = $1 ORDER BY last_contact_hours_ago DESC, qd_score DESC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] sql = """ SELECT activity_type AS type, COALESCE(activity_title, activity_summary, activity_type) AS title, activity_summary AS summary, actor_name AS actor, TO_CHAR(activity_at, 'YYYY-MM-DD HH24:MI') AS date FROM lead_activity_log WHERE tenant_id = $1 ORDER BY activity_at DESC LIMIT $2 """ return sql, [ctx.tenant_id, row_limit] if dataset == "crm_contacts_overview": sql = """ SELECT p.person_id::text AS id, p.full_name AS name, COALESCE(p.primary_email, '') AS email, COALESCE(p.primary_phone, '') AS phone, COALESCE(p.city, '') AS city, COALESCE(p.buyer_type, 'unclassified') AS buyer_type, COALESCE(q.current_value, 0)::float AS qd_score FROM crm_people p LEFT JOIN LATERAL ( SELECT current_value FROM intel_qd_scores q WHERE q.person_id = p.person_id ORDER BY CASE WHEN q.score_type = 'engagement_score' THEN 0 WHEN q.score_type = 'intent_score' THEN 1 WHEN q.score_type = 'urgency_score' THEN 2 ELSE 3 END, q.computed_at DESC LIMIT 1 ) q ON TRUE ORDER BY qd_score DESC, p.full_name ASC LIMIT $1 """ return sql, [row_limit] if dataset == "crm_opportunity_pipeline": sql = """ SELECT o.stage::text AS stage, COUNT(*)::int AS count, COALESCE(SUM(o.value), 0)::float AS value, COALESCE( json_agg( json_build_object( 'id', o.opportunity_id, 'name', p.full_name, 'company', COALESCE(a.account_name, ''), 'value', COALESCE(o.value, 0), 'nextAction', COALESCE(o.next_action, '') ) ORDER BY o.value DESC NULLS LAST ) FILTER (WHERE o.opportunity_id IS NOT NULL), '[]'::json ) AS leads FROM crm_opportunities o JOIN crm_leads l ON l.lead_id = o.lead_id JOIN crm_people p ON p.person_id = l.person_id LEFT JOIN crm_accounts a ON a.account_id = l.account_id GROUP BY o.stage ORDER BY COALESCE(SUM(o.value), 0) DESC, o.stage::text ASC LIMIT $1 """ return sql, [row_limit] if dataset == "crm_property_interest_rollup": sql = """ SELECT project_name AS category, COUNT(*)::int AS value, ROUND(AVG(COALESCE((budget_min + budget_max) / 2.0, budget_max, budget_min, 0)), 2)::float AS average_budget FROM crm_property_interests GROUP BY project_name ORDER BY value DESC, project_name ASC LIMIT $1 """ return sql, [row_limit] if dataset == "crm_last_interacted_clients": sql = """ SELECT p.person_id::text AS id, p.full_name AS name, COALESCE(p.primary_email, '') AS email, COALESCE(p.primary_phone, '') AS phone, COALESCE(MAX(i.happened_at), p.updated_at, p.created_at) AS last_interaction_at, COUNT(i.interaction_id)::int AS interaction_count, COALESCE(q.current_value, 0)::float AS qd_score FROM crm_people p LEFT JOIN intel_interactions i ON i.person_id = p.person_id LEFT JOIN LATERAL ( SELECT current_value FROM intel_qd_scores q WHERE q.person_id = p.person_id ORDER BY CASE WHEN q.score_type = 'engagement_score' THEN 0 WHEN q.score_type = 'intent_score' THEN 1 WHEN q.score_type = 'urgency_score' THEN 2 ELSE 3 END, q.computed_at DESC LIMIT 1 ) q ON TRUE GROUP BY p.person_id, p.full_name, p.primary_email, p.primary_phone, p.updated_at, p.created_at, q.current_value ORDER BY last_interaction_at DESC NULLS LAST, interaction_count DESC, p.full_name ASC LIMIT $1 """ return sql, [row_limit] if dataset == "crm_top_interested_clients": sql = """ SELECT p.person_id::text AS id, p.full_name AS name, COALESCE(p.primary_email, '') AS email, COALESCE(p.primary_phone, '') AS phone, COUNT(pi.interest_id)::int AS interest_count, STRING_AGG(DISTINCT pi.project_name, ', ' ORDER BY pi.project_name) AS projects, COALESCE(MAX(pi.created_at), p.updated_at, p.created_at) AS last_interest_at, COALESCE(q.current_value, 0)::float AS qd_score FROM crm_people p INNER JOIN crm_property_interests pi ON pi.person_id = p.person_id LEFT JOIN LATERAL ( SELECT current_value FROM intel_qd_scores q WHERE q.person_id = p.person_id ORDER BY CASE WHEN q.score_type = 'engagement_score' THEN 0 WHEN q.score_type = 'intent_score' THEN 1 WHEN q.score_type = 'urgency_score' THEN 2 ELSE 3 END, q.computed_at DESC LIMIT 1 ) q ON TRUE GROUP BY p.person_id, p.full_name, p.primary_email, p.primary_phone, p.updated_at, p.created_at, q.current_value ORDER BY interest_count DESC, qd_score DESC, last_interest_at DESC NULLS LAST, p.full_name ASC LIMIT $1 """ return sql, [row_limit] if dataset == "crm_interaction_timeline": sql = """ SELECT i.interaction_type AS type, COALESCE(i.summary, i.interaction_type) AS title, CONCAT(p.full_name, ' ยท ', i.channel::text) AS summary, p.full_name AS actor, TO_CHAR(i.happened_at, 'YYYY-MM-DD HH24:MI') AS date FROM intel_interactions i JOIN crm_people p ON p.person_id = i.person_id ORDER BY i.happened_at DESC LIMIT $1 """ return sql, [row_limit] raise ValueError(f"Dataset '{dataset}' is not whitelisted for Oracle execution.") data_access_gateway = DataAccessGateway()