feat: New Chat, Search Chat and Master Slave DB Architecture for CRM and Oracle Canvas

This commit is contained in:
Sagnik
2026-04-24 04:18:33 +05:30
parent f04571bd7b
commit eabecf7a25
7 changed files with 1117 additions and 520 deletions

View File

@@ -1,9 +1,9 @@
"""
Natural DB-first Oracle agent.
The LLM can plan arbitrary analytical SELECT statements over the Velocity CRM,
intel, inventory, and read-model tables. The executor enforces a read-only SQL
contract and a UI row cap; write paths stay behind typed API endpoints.
The LLM can plan arbitrary analytical SELECT statements over the full public
Velocity app schema. The executor enforces only a read-only SQL contract and a
UI row cap; write paths stay behind typed API endpoints.
"""
from __future__ import annotations
@@ -25,25 +25,12 @@ except Exception: # pragma: no cover
logger = logging.getLogger(__name__)
MAX_ROW_CAP = 500
ALLOWED_TABLES = {
"crm_people", "crm_leads", "crm_accounts", "crm_households", "crm_relationships",
"crm_opportunities", "crm_property_interests", "crm_stage_history",
"intel_interactions", "intel_messages", "intel_calls", "intel_transcripts",
"intel_emails", "intel_email_threads", "intel_whatsapp_threads", "intel_visits",
"intel_reminders", "intel_qd_scores", "intel_qd_timeseries",
"intel_extracted_facts", "intel_call_objections", "intel_cctv_links",
"intel_perception_events", "intel_vehicle_events",
"inventory_projects", "inventory_units",
"read_last_contacted", "read_next_best_action",
}
DESTRUCTIVE_SQL = re.compile(
r"\b(insert|update|delete|drop|alter|truncate|copy|create|grant|revoke|call|execute|do|merge)\b",
re.IGNORECASE,
)
TABLE_REF_RE = re.compile(r"\b(?:from|join)\s+([a-zA-Z_][\w.]*)(?:\s|$)", re.IGNORECASE)
CTE_NAME_RE = re.compile(r"\b(?:with|,)\s*([a-zA-Z_][\w]*)\s+as\s*\(", re.IGNORECASE)
def _json_safe(value: Any) -> Any:
@@ -61,6 +48,9 @@ def _json_safe(value: Any) -> Any:
def db_ready() -> bool:
if asyncpg is None:
return False
read_database_url = os.getenv("ORACLE_READ_DATABASE_URL", "")
if read_database_url and not read_database_url.startswith("PLACEHOLDER"):
return True
database_url = os.getenv("DATABASE_URL", "")
return bool(database_url and not database_url.startswith("PLACEHOLDER")) or all(
os.getenv(name) for name in ("VELOCITY_DB_NAME", "VELOCITY_DB_USER", "VELOCITY_DB_PASSWORD")
@@ -70,6 +60,17 @@ def db_ready() -> bool:
async def connect_db() -> Any:
if asyncpg is None:
raise RuntimeError("asyncpg is not installed.")
read_database_url = os.getenv("ORACLE_READ_DATABASE_URL", "")
if read_database_url and not read_database_url.startswith("PLACEHOLDER"):
return await asyncpg.connect(read_database_url)
if all(os.getenv(name) for name in ("VELOCITY_DB_READ_NAME", "VELOCITY_DB_READ_USER", "VELOCITY_DB_READ_PASSWORD")):
return await asyncpg.connect(
host=os.getenv("VELOCITY_DB_READ_HOST", os.getenv("VELOCITY_DB_HOST", "127.0.0.1")),
port=int(os.getenv("VELOCITY_DB_READ_PORT", os.getenv("VELOCITY_DB_PORT", "5432"))),
database=os.environ["VELOCITY_DB_READ_NAME"],
user=os.environ["VELOCITY_DB_READ_USER"],
password=os.environ["VELOCITY_DB_READ_PASSWORD"],
)
database_url = os.getenv("DATABASE_URL", "")
if database_url and not database_url.startswith("PLACEHOLDER"):
return await asyncpg.connect(database_url)
@@ -124,13 +125,6 @@ def sanitize_sql(sql: str, row_limit: int) -> tuple[str, list[str], list[str]]:
continue
if table and table not in tables:
tables.append(table)
blocked = [table for table in tables if table not in ALLOWED_TABLES]
if blocked:
raise ValueError(f"Oracle SQL agent blocked unknown tables: {', '.join(blocked)}")
capped = max(1, min(int(row_limit or 100), MAX_ROW_CAP))
if not re.search(r"\blimit\s+\d+\b", clean, re.IGNORECASE):
clean = f"SELECT * FROM ({clean}) oracle_limited_rows LIMIT {capped}"
warnings.append(f"Applied UI row cap LIMIT {capped}.")
return clean, tables, warnings
@@ -151,6 +145,18 @@ def infer_component_type(prompt: str, columns: list[str], rows: list[dict[str, A
return "table"
def _looks_like_property_rollup_prompt(prompt: str) -> bool:
lower = prompt.lower()
property_terms = ("property", "properties", "project", "projects")
aggregate_terms = ("top", "most", "majority", "highest", "popular", "common")
interest_terms = ("interest", "interested", "liked", "preference", "preferences")
return (
any(term in lower for term in property_terms)
and any(term in lower for term in aggregate_terms)
and any(term in lower for term in interest_terms)
)
def title_from_prompt(prompt: str) -> str:
words = re.sub(r"\s+", " ", prompt.strip()).strip(" ?.!")
return words[:1].upper() + words[1:80] if words else "Oracle Query Result"
@@ -164,19 +170,27 @@ class NaturalDbAgent:
return {"tables": [], "available": False}
conn = await connect_db()
try:
table_names = await conn.fetch(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
)
public_tables = [row["table_name"] for row in table_names]
rows = await conn.fetch(
"""
SELECT c.table_name, c.column_name, c.data_type, c.udt_name, c.is_nullable
FROM information_schema.columns c
WHERE c.table_schema = 'public' AND c.table_name = ANY($1::text[])
WHERE c.table_schema = 'public'
ORDER BY c.table_name, c.ordinal_position
""",
sorted(ALLOWED_TABLES),
"""
)
counts = {}
for table in sorted(ALLOWED_TABLES):
for table in public_tables:
exists = await conn.fetchval("SELECT to_regclass($1)", f"public.{table}")
counts[table] = None if not exists else int(await conn.fetchval(f"SELECT COUNT(*) FROM {table}"))
counts[table] = None if not exists else int(await conn.fetchval(f'SELECT COUNT(*) FROM "{table}"'))
tables: dict[str, dict[str, Any]] = {}
for row in rows:
entry = tables.setdefault(row["table_name"], {"columns": [], "rowCount": counts.get(row["table_name"])})
@@ -186,7 +200,7 @@ class NaturalDbAgent:
"udtName": row["udt_name"],
"nullable": row["is_nullable"] == "YES",
})
return {"available": True, "tables": tables, "allowedTables": sorted(ALLOWED_TABLES)}
return {"available": True, "tables": tables, "allowedTables": public_tables}
finally:
if own_conn:
await conn.close()
@@ -210,7 +224,7 @@ class NaturalDbAgent:
"read_next_best_action": 250,
}
tables = catalog.get("tables", {})
counts = {table: (tables.get(table) or {}).get("rowCount") for table in sorted(ALLOWED_TABLES)}
counts = {table: (meta or {}).get("rowCount") for table, meta in sorted(tables.items())}
return {
"counts": counts,
"expectedSyntheticV2Counts": expected,
@@ -238,27 +252,12 @@ class NaturalDbAgent:
async def _run_plan(self, conn: Any, prompt: str, plan: dict[str, Any], row_limit: int) -> NaturalQueryResult:
raw_sql = str(plan.get("sql") or "").strip()
if not raw_sql:
raw_sql = self._fallback_sql(prompt, row_limit)
raise RuntimeError("Natural SQL planner returned no SQL.")
sql, tables, warnings = sanitize_sql(raw_sql, row_limit)
try:
records = await conn.fetch(sql)
except Exception as exc:
retry = await self._repair_sql(prompt, raw_sql, str(exc), row_limit)
sql, tables, retry_warnings = sanitize_sql(retry, row_limit)
warnings.extend(retry_warnings)
warnings.append(f"Initial SQL repaired after database error: {exc}")
records = await conn.fetch(sql)
if not records:
retry_sql = self._zero_row_retry_sql(prompt, row_limit, raw_sql)
if retry_sql and retry_sql.strip() != raw_sql.strip():
retry_clean, retry_tables, retry_warnings = sanitize_sql(retry_sql, row_limit)
retry_records = await conn.fetch(retry_clean)
if retry_records:
sql = retry_clean
tables = retry_tables
records = retry_records
warnings.extend(retry_warnings)
warnings.append("Initial SQL returned zero rows; Oracle retried with a broader CRM read query.")
raise RuntimeError(f"Natural SQL execution failed: {exc}") from exc
rows = [_json_safe(dict(record)) for record in records]
columns = list(rows[0].keys()) if rows else []
component_type = infer_component_type(prompt, columns, rows)
@@ -276,17 +275,29 @@ class NaturalDbAgent:
)
async def _plan_sql(self, prompt: str, catalog: dict[str, Any], row_limit: int) -> dict[str, Any]:
fallback = {"sql": self._fallback_sql(prompt, row_limit), "title": title_from_prompt(prompt), "rationale": "Deterministic SQL planner fallback."}
try:
providers = runtime_llm_service._provider_catalog()
except Exception:
providers = {}
if not providers:
return fallback
raise RuntimeError("No runtime LLM providers are configured for Oracle natural planning.")
schema_brief = json.dumps(catalog.get("tables", {}), default=str)[:16000]
semantic_rules = """
Velocity SQL semantics:
- QD score means intel_qd_scores.current_value. Do not use crm_people.engagement_score, crm_leads.engagement_score, or intel_interactions.engagement_score as QD.
- For project/property scoped prompts such as "in Atri Surya Toron", "interested in", "for project", or "for property", use crm_property_interests as the primary scoping table.
- Prefer crm_property_interests.project_name for textual project matching. inventory_projects is optional for enrichment, not the primary client-to-project relationship.
- For client lists scoped to a project, join crm_people to crm_property_interests on person_id and filter project_name case-insensitively.
- For lowest/highest/best/worst QD prompts, sort on intel_qd_scores.current_value ASC/DESC as requested.
- Respect the user-requested cardinality exactly when possible. If the prompt says five/top 5/lowest 5, return LIMIT 5.
- When listing clients, include person identity fields from crm_people such as person_id, full_name, primary_phone, and primary_email.
- When aggregating top properties/projects, group by crm_property_interests.project_name and count DISTINCT person_id.
- You may use any table in the public schema that is relevant to the question.
- Use only read-only PostgreSQL SELECT/CTE queries.
"""
system = (
"You are Oracle's read-only PostgreSQL planner. Generate one useful SELECT or WITH query "
"for the user's CRM question. Use only the provided schema. Return JSON with sql, title, rationale. "
"for the user's CRM question. You have access to the full public schema. Return JSON with sql, title, rationale. "
"Never generate INSERT, UPDATE, DELETE, DDL, COPY, or permission statements."
)
try:
@@ -294,7 +305,16 @@ class NaturalDbAgent:
provider_id="sglang",
model=None,
system_prompt=system,
messages=[{"role": "user", "content": f"Schema:\n{schema_brief}\n\nQuestion:\n{prompt}\n\nRow cap: {row_limit}"}],
messages=[{
"role": "user",
"content": (
f"Schema:\n{schema_brief}\n\n"
f"Semantic rules:\n{semantic_rules}\n\n"
f"Question:\n{prompt}\n\n"
f"Row cap: {row_limit}\n\n"
"Return strict JSON with keys: sql, title, rationale."
),
}],
temperature=0.05,
response_format="json",
metadata={"agent": "oracle_natural_db_agent"},
@@ -307,162 +327,7 @@ class NaturalDbAgent:
if isinstance(parsed, dict) and parsed.get("sql"):
return parsed
except Exception as exc:
logger.warning("Natural DB planner LLM failed, using fallback: %s", exc)
return fallback
async def _repair_sql(self, prompt: str, failed_sql: str, error: str, row_limit: int) -> str:
# Keep retry operationally deterministic if model is unavailable.
if "read_last_contacted" in failed_sql and "does not exist" in error.lower():
return self._base_last_contacted_sql(row_limit)
if "read_next_best_action" in failed_sql and "does not exist" in error.lower():
return self._base_last_contacted_sql(row_limit)
return self._fallback_sql(prompt, row_limit)
def _zero_row_retry_sql(self, prompt: str, row_limit: int, previous_sql: str) -> str | None:
lower = prompt.lower()
if any(term in lower for term in ("contact", "recent", "last", "call", "message", "email", "whatsapp", "follow")):
return self._base_last_contacted_sql(row_limit)
if any(term in lower for term in ("interest", "interested", "property", "project", "unit", "budget", "bhk")):
return self._base_property_interest_sql(row_limit)
if "from crm_people" not in previous_sql.lower():
return self._generic_clients_sql(row_limit)
return None
def _base_last_contacted_sql(self, row_limit: int) -> str:
limit = max(1, min(row_limit, MAX_ROW_CAP))
return f"""
WITH contact_events AS (
SELECT i.person_id, i.happened_at AS event_at, i.channel::text AS channel,
i.interaction_type AS event_type, i.summary AS summary, i.broker_name AS actor
FROM intel_interactions i
WHERE i.happened_at IS NOT NULL
UNION ALL
SELECT i.person_id, m.delivered_at, 'message', COALESCE(m.sender_role, 'message'), m.message_text, m.sender_name
FROM intel_messages m
JOIN intel_interactions i ON i.interaction_id = m.interaction_id
WHERE m.delivered_at IS NOT NULL
UNION ALL
SELECT i.person_id, e.sent_at, 'email', COALESCE(e.direction::text, 'email'), e.subject, e.from_address
FROM intel_emails e
JOIN intel_interactions i ON i.interaction_id = e.interaction_id
WHERE e.sent_at IS NOT NULL
UNION ALL
SELECT v.person_id, v.visited_at, 'site_visit', 'visit', v.outcome, v.hosted_by
FROM intel_visits v
WHERE v.visited_at IS NOT NULL
),
ranked AS (
SELECT *, row_number() OVER (PARTITION BY person_id ORDER BY event_at DESC) AS rn,
count(*) OVER (PARTITION BY person_id) AS interaction_count
FROM contact_events
)
SELECT p.person_id::text, p.full_name AS name, p.primary_phone AS phone,
p.primary_email AS email, r.event_at AS last_contacted_at,
r.channel AS last_contact_channel, r.event_type AS last_interaction_type,
r.summary AS last_contact_summary, r.actor AS last_contact_actor,
r.interaction_count::int,
q.current_value AS qd_score
FROM ranked r
JOIN crm_people p ON p.person_id = r.person_id
LEFT JOIN LATERAL (
SELECT current_value FROM intel_qd_scores q
WHERE q.person_id = p.person_id
ORDER BY q.current_value DESC, q.computed_at DESC
LIMIT 1
) q ON TRUE
WHERE r.rn = 1
ORDER BY r.event_at DESC
LIMIT {limit}
"""
def _base_property_interest_sql(self, row_limit: int) -> str:
limit = max(1, min(row_limit, MAX_ROW_CAP))
return f"""
SELECT p.person_id::text, p.full_name AS name, p.primary_phone AS phone, p.primary_email AS email,
COUNT(pi.interest_id)::int AS interest_count,
string_agg(DISTINCT COALESCE(pi.project_name, pr.project_name), ', ') AS projects,
string_agg(DISTINCT pi.configuration, ', ') AS configurations,
MIN(pi.budget_min) AS budget_min, MAX(pi.budget_max) AS budget_max,
MAX(pi.last_discussed_at) AS last_interest_at,
MAX(q.current_value) AS qd_score
FROM crm_people p
JOIN crm_property_interests pi ON pi.person_id = p.person_id
LEFT JOIN inventory_projects pr ON pr.project_id = pi.project_id
LEFT JOIN intel_qd_scores q ON q.person_id = p.person_id
GROUP BY p.person_id, p.full_name, p.primary_phone, p.primary_email
HAVING COUNT(pi.interest_id) > 0
ORDER BY interest_count DESC, qd_score DESC NULLS LAST, last_interest_at DESC NULLS LAST
LIMIT {limit}
"""
def _generic_clients_sql(self, row_limit: int) -> str:
limit = max(1, min(row_limit, MAX_ROW_CAP))
return f"""
SELECT p.person_id::text, p.full_name AS name, p.primary_email AS email, p.primary_phone AS phone,
p.buyer_type, l.status::text AS lead_status, l.budget_band, l.urgency,
q.current_value AS qd_score
FROM crm_people p
LEFT JOIN LATERAL (
SELECT * FROM crm_leads l WHERE l.person_id = p.person_id ORDER BY l.updated_at DESC LIMIT 1
) l ON TRUE
LEFT JOIN LATERAL (
SELECT current_value FROM intel_qd_scores q
WHERE q.person_id = p.person_id
ORDER BY q.current_value DESC, q.computed_at DESC
LIMIT 1
) q ON TRUE
ORDER BY qd_score DESC NULLS LAST, p.full_name ASC
LIMIT {limit}
"""
def _fallback_sql(self, prompt: str, row_limit: int) -> str:
lower = prompt.lower()
limit = max(1, min(row_limit, MAX_ROW_CAP))
if "objection" in lower:
return f"""
SELECT p.person_id::text, p.full_name AS name, co.objection_type, co.category, co.severity,
co.client_quote, co.agent_response, co.extracted_at
FROM intel_call_objections co
JOIN intel_calls c ON c.call_id = co.call_id
JOIN intel_interactions i ON i.interaction_id = c.interaction_id
JOIN crm_people p ON p.person_id = i.person_id
ORDER BY co.extracted_at DESC
LIMIT {limit}
"""
if "whatsapp" in lower or "message" in lower or "conversation" in lower:
return f"""
SELECT p.person_id::text, p.full_name AS name, 'whatsapp' AS type,
m.message_text AS summary, m.sender_role AS actor, m.delivered_at AS date
FROM intel_messages m
JOIN intel_interactions i ON i.interaction_id = m.interaction_id
JOIN crm_people p ON p.person_id = i.person_id
WHERE lower(m.message_text) LIKE '%' || lower(split_part($${prompt}$$, ' ', 1)) || '%'
OR i.channel = 'whatsapp'
ORDER BY m.delivered_at DESC
LIMIT {limit}
"""
if "contact" in lower or "recent" in lower or "last" in lower:
return f"""
SELECT p.person_id::text, p.full_name AS name, p.primary_phone AS phone,
lc.last_contact_at AS last_contacted_at, lc.last_channel AS last_contact_channel,
lc.last_interaction_type, lc.days_since_contact, lc.total_interactions AS interaction_count,
nba.recommended_action AS next_action, q.current_value AS qd_score
FROM crm_people p
LEFT JOIN read_last_contacted lc ON lc.person_id = p.person_id
LEFT JOIN read_next_best_action nba ON nba.person_id = p.person_id
LEFT JOIN LATERAL (
SELECT current_value FROM intel_qd_scores q
WHERE q.person_id = p.person_id
ORDER BY q.current_value DESC, q.computed_at DESC
LIMIT 1
) q ON TRUE
WHERE lc.last_contact_at IS NOT NULL
ORDER BY lc.last_contact_at DESC
LIMIT {limit}
"""
if "4 bhk" in lower or "budget" in lower or "interest" in lower or "property" in lower or "client" in lower:
return self._base_property_interest_sql(limit)
return self._generic_clients_sql(limit)
raise RuntimeError(f"Natural DB planner LLM failed: {exc}") from exc
raise RuntimeError("Natural DB planner returned no valid SQL.")
natural_db_agent = NaturalDbAgent()