diff --git a/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md b/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md
new file mode 100644
index 00000000..e44d2977
--- /dev/null
+++ b/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md
@@ -0,0 +1,135 @@
+# Sourik Root Integration Closure
+
+## Summary
+
+This issue tracks the root-side integration of the useful Sourik subsystems into the current Project Velocity mainline without replacing the root FastAPI shell, root Sentinel ownership, or the existing Python-native Oracle v1 surface.
+
+The objective was to absorb the missing operational pieces into the root codebase so Sprint 1 truth is judged by current product reality rather than by stale planning artifacts.
+
+## Scope Closed In This Pass
+
+### CRM backend and sync
+
+- landed canonical root CRM routes for:
+ - `GET/POST/PUT/DELETE /api/leads`
+ - `GET /api/leads/{lead_id}`
+ - `GET /api/leads/demographics`
+ - `GET/POST /api/chat-logs`
+ - `GET /api/kanban/board`
+ - `PUT /api/kanban/move`
+ - `POST /api/leads/seed-synthetic`
+- added dedicated CRM websocket stream at `GET /ws/crm`
+- added root-side CRM event broadcasting for:
+ - lead create
+ - lead update
+ - lead delete
+ - kanban move
+ - chat log create
+ - synthetic seed runs
+ - Oracle writebacks
+
+### Oracle append and writeback contract
+
+- kept root Oracle v1 as the primary public Oracle surface
+- appended persona and workflow orchestration under the existing Oracle v1 flow
+- added real MCP execution endpoint:
+ - `POST /api/oracle/mcp/execute`
+- added Oracle action ledger and read/writeback routes:
+ - `GET /api/oracle/actions`
+ - `GET /api/oracle/actions/{action_id}`
+ - `POST /api/oracle/actions/writeback`
+- Oracle prompt submissions now create persisted action records tied to execution output
+- Oracle writebacks now support canonical lead updates for:
+ - score adjustments
+ - stage changes
+ - qualification changes
+ - metadata patching
+ - note append
+ - Oracle-generated message insertion into `chat_logs`
+
+### MCP and search
+
+- converted the MCP registry from a placeholder slot into an executable root service
+- external search now executes against:
+ - Brave Search if `BRAVE_API_KEY` is configured
+ - DuckDuckGo fallback otherwise
+- CRM and local property retrieval tools now execute against the root CRM schema through the root DB pool
+
+### Catalyst marketing backend parity
+
+- replaced hardcoded campaign summaries with unified ad-network service backed by root code
+- added root Meta plus Google Ads unified campaign listing
+- added unified insights endpoint with platform filtering
+- added budget update route for Meta plus Google
+- added bid strategy update route for Meta plus Google
+- added Google-aware campaign creation path so Catalyst campaign creation is no longer Meta-only
+
+Routes covered in this pass:
+
+- `GET /api/catalyst/campaigns`
+- `POST /api/catalyst/campaigns/create`
+- `GET /api/catalyst/insights/realtime`
+- `PUT /api/catalyst/budget`
+- `PUT /api/catalyst/bid-strategy`
+
+### Frontend carry-forward
+
+- preserved the existing root Catalyst shell
+- kept the vertically stacked `Marketing` sub-tab inside Catalyst
+- no second marketing app or second frontend API source of truth was introduced
+
+## Files Added Or Materially Updated
+
+### Backend
+
+- `backend/services/ad_network_service.py`
+- `backend/services/mcp_registry.py`
+- `backend/api/routes_catalyst.py`
+- `backend/api/routes_crm.py`
+- `backend/api/routes_oracle.py`
+- `backend/oracle/action_service.py`
+- `backend/oracle/router_v1.py`
+- `backend/main.py`
+
+### Tests
+
+- `backend/tests/test_catalyst_routes.py`
+- `backend/tests/test_oracle_routes.py`
+- `backend/tests/test_crm_websocket.py`
+
+## Verification Completed
+
+- `python -m pytest Project_Velocity/backend/tests/test_catalyst_routes.py Project_Velocity/backend/tests/test_oracle_routes.py Project_Velocity/backend/tests/test_crm_websocket.py Project_Velocity/backend/tests/test_crm_routes.py Project_Velocity/backend/tests/oracle/test_persona_service.py Project_Velocity/backend/tests/test_nemoclaw_runtime.py`
+- result: `9 passed`
+
+- `npm run build`
+- result: passed
+
+## Production Notes
+
+- Google Ads support is now integrated at the root contract level, but live mutate behavior still depends on valid provider credentials and provider-managed operations.
+- Brave Search becomes the preferred external search provider when `BRAVE_API_KEY` is present; otherwise the root falls back to DuckDuckGo.
+- Oracle writebacks currently target leads as the canonical CRM entity. Additional entity writebacks should follow the same `oracle_actions` ledger rather than introducing side-channel writes.
+
+## Residual Work After This Closure
+
+These are still separate follow-up items, not blockers for closing this integration pass:
+
+- deeper Google Ads mutate coverage beyond provider-managed passthroughs
+- frontend consumption of the CRM websocket stream
+- broader Oracle writebacks beyond `lead`
+- stricter auth and role gating for Oracle action application
+- richer Catalyst campaign creation UX for platform-specific fields
+- prompt inventory and persona-to-runtime mapping docs cleanup
+
+## Acceptance Criteria Met
+
+- root app shell preserved
+- root FastAPI entrypoint preserved
+- root Sentinel ownership preserved
+- no Go runtime adopted
+- no second backend center introduced
+- MCP external search executes for real
+- CRM has a live websocket sync surface
+- Oracle has a persisted action/writeback contract
+- Catalyst backend exposes Google-aware parity routes in the root
diff --git a/.Agent Context/Sprint 1/nemoclaw_setup_truth.md b/.Agent Context/Sprint 1/nemoclaw_setup_truth.md
index 20a1b4a7..017a66e3 100644
--- a/.Agent Context/Sprint 1/nemoclaw_setup_truth.md
+++ b/.Agent Context/Sprint 1/nemoclaw_setup_truth.md
@@ -1,6 +1,6 @@
# NemoClaw Setup Truth
-Updated: April 2, 2026
+Updated: April 12, 2026
## 1. Purpose
@@ -10,15 +10,24 @@ This is not the original intended architecture. This is the current operational
## 2. High-Level Summary
-Project Velocity uses the term "NemoClaw" for the reasoning and prompt layer attached to the Sentinel QD Engine. In practice, this is now split into two different concerns:
+Project Velocity uses the term "NemoClaw" for the reasoning and prompt layer attached to the Sentinel QD Engine. In practice, this is now split into three different concerns:
1. Prompted reasoning used by the FastAPI backend
2. OpenShell / gateway infrastructure that remains installed on the AWS node
+3. Python-native append layers used by Oracle planning, MCP-style tool registration, and workflow dispatch preview
The active FastAPI inference path is NVIDIA-hosted OpenAI-compatible chat completions.
The OpenShell gateway and Ollama are still installed and running as adjacent infrastructure, but they are not the active primary scoring path used by `backend/services/nemoclaw_client.py`.
+The root codebase now also includes Python-native compatibility layers inspired by Sourik's Go runtime:
+
+- `backend/services/nemoclaw_runtime.py`
+- `backend/services/mcp_registry.py`
+- `backend/oracle/persona_service.py`
+
+These append the current root without replacing the active NVIDIA-hosted inference path.
+
## 3. Node and Network Truth
AWS region: `us-east-1`
@@ -81,6 +90,24 @@ PostgreSQL 14 data directory.
`backend/services/nemoclaw_client.py`
Primary reasoning client used by the FastAPI backend.
+`backend/services/nemoclaw_runtime.py`
+Python-native append layer for workflow dispatch planning, webhook verification, and claim-style helper behavior.
+
+`backend/services/mcp_registry.py`
+Python-native MCP/search tool registry append layer used by Oracle helper surfaces.
+
+`backend/oracle/persona_service.py`
+Subordinate Oracle persona planning layer that recommends component templates, renders prompt assets, and augments Oracle v1.
+
+`backend/api/routes_crm.py`
+Root PostgreSQL-first CRM append layer for `leads`, `chat_logs`, `kanban`, and analytics routes.
+
+`backend/api/routes_oracle.py`
+Root Oracle helper append layer for workflow preview and MCP tool discovery.
+
+`backend/oracle/router_v1.py`
+Mounted Oracle v1 API surface for canvas, prompts, persona helpers, and collaboration.
+
`backend/routers/videos.py`
Marketing-video catalog endpoint for the Sentinel live-session picker.
@@ -182,6 +209,28 @@ No longer the primary path for backend scoring.
5. The backend calls NVIDIA hosted completions using `nvidia/nemotron-3-super-120b-a12b`
6. The result updates QD score state and is broadcast back over WebSocket
+### Current Oracle canvas planning append flow
+
+1. Frontend can call `/api/oracle/v1/canvas-pages/{pageId}/prompts`
+2. `backend/oracle/prompt_orchestrator.py` builds a retrieval plan
+3. `backend/oracle/persona_service.py` recommends reusable component templates and emits a planning note block
+4. `backend/services/nemoclaw_runtime.py` produces a workflow dispatch preview for ComfyUI-backed execution
+5. `backend/oracle/data_access_gateway.py` runs only whitelisted PostgreSQL queries
+6. Oracle commits the resulting components into the active canvas revision
+
+### Current CRM and analytics append flow
+
+1. Root FastAPI mounts `backend/api/routes_crm.py`
+2. Canonical root endpoints now exist for:
+ - `/api/leads`
+ - `/api/leads/demographics`
+ - `/api/chat-logs`
+ - `/api/kanban/board`
+ - `/api/kanban/move`
+ - `/api/analytics/sentiment-scatter`
+3. These routes use the root asyncpg pool and PostgreSQL-first storage contract
+4. CRM WebSocket sync is still intentionally deferred
+
### Current lead-tagging flow
1. Broker or system calls `/api/sentinel/tag-lead`
@@ -232,6 +281,22 @@ Why it still exists:
What it is not:
- It is not the current primary inference path for backend scoring
+- It is not the root source of truth for Oracle or CRM orchestration
+
+## 8.5 Python-Native Append Responsibilities
+
+These are now part of root truth:
+
+- Oracle persona prompt loading and render helpers live in Python, not Go
+- MCP/search registration lives in Python, not Go
+- Workflow dispatch planning for Oracle-to-Comfy orchestration lives in Python, not Go
+- Claim-style helper behavior is appended in Python as a compatibility layer, not as a second backend center
+
+What remains deferred:
+
+- Full production webhook runtime parity with Sourik's Go stack
+- Full external search provider execution inside the MCP layer
+- Autonomous posting and non-root agent/webhook services
## 9. Prompts
diff --git a/app/src/App.tsx b/app/src/App.tsx
index 7737d8d0..3c05fc6d 100644
--- a/app/src/App.tsx
+++ b/app/src/App.tsx
@@ -11,6 +11,7 @@ import { Inventory } from '@/components/modules/Inventory';
import { Settings } from '@/components/modules/Settings';
import { Catalyst } from '@/components/modules/Catalyst';
import { NotificationCenter } from '@/components/layout/NotificationCenter';
+import { useCrmBootstrap } from '@/hooks/useCrmBootstrap';
import type { ModuleId } from '@/types';
import {
@@ -75,6 +76,7 @@ function RouteModuleSync() {
function MainLayout() {
const { activeModule, setActiveModule, sidebarExpanded, logout } = useStore();
+ useCrmBootstrap();
const navigate = useNavigate();
const location = useLocation();
diff --git a/app/src/components/modules/Catalyst.tsx b/app/src/components/modules/Catalyst.tsx
index 2eb0fe58..d171a4e4 100644
--- a/app/src/components/modules/Catalyst.tsx
+++ b/app/src/components/modules/Catalyst.tsx
@@ -16,6 +16,7 @@ import { useMarketingStore } from '@/store/useMarketingStore';
import { useCurrency } from '@/store/useCurrencyStore';
import type { Campaign, MarketingAsset, LiveOptimizationEvent, LiveEventType } from '@/types';
import { GroundTruthPicker } from './GroundTruthPicker';
+import { CatalystMarketingTab } from './CatalystMarketingTab';
import type { GroundTruthSelection } from './GroundTruthPicker';
// ── Design tokens ─────────────────────────────────────────────────────────────
@@ -936,13 +937,14 @@ function WarRoom() {
// Tab Bar
// ─────────────────────────────────────────────────────────────────────────────
-type TabId = 'studio' | 'command' | 'intelligence' | 'war-room';
+type TabId = 'studio' | 'command' | 'intelligence' | 'war-room' | 'marketing';
const TABS: Array<{ id: TabId; label: string; icon: LucideIcon }> = [
{ id: 'studio', label: 'The Studio', icon: Clapperboard },
{ id: 'command', label: 'Campaign Command', icon: Megaphone },
{ id: 'intelligence', label: 'Intelligence & ROI', icon: BarChart3 },
{ id: 'war-room', label: 'War Room', icon: Globe },
+ { id: 'marketing', label: 'Marketing', icon: TrendingUp },
];
// ─────────────────────────────────────────────────────────────────────────────
@@ -957,6 +959,7 @@ export function Catalyst() {
'command': ,
'intelligence': ,
'war-room': ,
+ 'marketing': ,
};
return (
diff --git a/app/src/components/modules/CatalystMarketingTab.tsx b/app/src/components/modules/CatalystMarketingTab.tsx
new file mode 100644
index 00000000..c5963a32
--- /dev/null
+++ b/app/src/components/modules/CatalystMarketingTab.tsx
@@ -0,0 +1,263 @@
+import { useEffect, useMemo, useState } from 'react';
+import { Activity, BarChart3, DatabaseZap, Megaphone, RefreshCw, Sparkles } from 'lucide-react';
+
+import {
+ getCatalystCampaigns,
+ getLeadDemographics,
+ getSentimentScatter,
+ seedSyntheticLeads,
+ type LeadDemographics,
+ type MarketingCampaignSummary,
+ type ScatterDataPoint,
+} from '@/lib/api';
+
+function formatMoney(value: number) {
+ return new Intl.NumberFormat('en-AE', {
+ style: 'currency',
+ currency: 'AED',
+ maximumFractionDigits: 0,
+ }).format(value);
+}
+
+function SectionCard({
+ title,
+ icon: Icon,
+ children,
+ subtitle,
+}: {
+ title: string;
+ icon: typeof Activity;
+ subtitle?: string;
+ children: React.ReactNode;
+}) {
+ return (
+
+
+
+
+
+
+
{title}
+ {subtitle &&
{subtitle}
}
+
+
+ {children}
+
+ );
+}
+
+function SummaryMetric({ label, value }: { label: string; value: string | number }) {
+ return (
+
+ );
+}
+
+export function CatalystMarketingTab() {
+ const [campaigns, setCampaigns] = useState([]);
+ const [scatter, setScatter] = useState([]);
+ const [demographics, setDemographics] = useState(null);
+ const [error, setError] = useState(null);
+ const [loading, setLoading] = useState(true);
+ const [seeding, setSeeding] = useState(false);
+
+ useEffect(() => {
+ let active = true;
+ const load = async () => {
+ try {
+ const [campaignRows, scatterRows, demographicRows] = await Promise.all([
+ getCatalystCampaigns(),
+ getSentimentScatter(),
+ getLeadDemographics(),
+ ]);
+ if (!active) return;
+ setCampaigns(campaignRows);
+ setScatter(scatterRows);
+ setDemographics(demographicRows);
+ setError(null);
+ } catch (err) {
+ if (!active) return;
+ setError(err instanceof Error ? err.message : 'Failed to load marketing intelligence');
+ } finally {
+ if (active) setLoading(false);
+ }
+ };
+ void load();
+ return () => {
+ active = false;
+ };
+ }, []);
+
+ const totals = useMemo(() => {
+ const totalBudget = campaigns.reduce((sum, campaign) => sum + campaign.budget, 0);
+ const totalSpent = campaigns.reduce((sum, campaign) => sum + campaign.spent, 0);
+ const totalLeads = scatter.length;
+ const whales = scatter.filter((item) => item.qualification === 'WHALE').length;
+ const avgSentiment = totalLeads
+ ? Math.round(scatter.reduce((sum, item) => sum + item.sentiment_score, 0) / totalLeads)
+ : 0;
+ return { totalBudget, totalSpent, totalLeads, whales, avgSentiment };
+ }, [campaigns, scatter]);
+
+ const handleSeed = async () => {
+ setSeeding(true);
+ try {
+ await seedSyntheticLeads(100);
+ const [scatterRows, demographicRows] = await Promise.all([
+ getSentimentScatter(),
+ getLeadDemographics(),
+ ]);
+ setScatter(scatterRows);
+ setDemographics(demographicRows);
+ setError(null);
+ } catch (err) {
+ setError(err instanceof Error ? err.message : 'Synthetic seed failed');
+ } finally {
+ setSeeding(false);
+ }
+ };
+
+ return (
+
+
+
+
+
+
+
+
+
+
+
+
+ {loading ? (
+ Loading campaign intelligence…
+ ) : (
+
+ {campaigns.map((campaign) => (
+
+
+
+
{campaign.name}
+
+ {campaign.platform} · {campaign.status}
+
+
+
+
Budget {formatMoney(campaign.budget)}
+
Spent {formatMoney(campaign.spent)}
+
+
+
+
Impressions {campaign.impressions.toLocaleString()}
+
Clicks {campaign.clicks.toLocaleString()}
+
Conversions {campaign.conversions.toLocaleString()}
+
+ CTR {campaign.impressions ? ((campaign.clicks / campaign.impressions) * 100).toFixed(2) : '0.00'}%
+
+
+
+ ))}
+
+ )}
+
+
+
+
+
+ {scatter.slice(0, 14).map((lead) => (
+
+
+
+
{lead.name}
+
+ {lead.qualification} · {lead.kanban_status}
+
+
+
+
Score {lead.score}
+
Sentiment {lead.sentiment_score}
+
+
+
+ ))}
+ {!loading && scatter.length === 0 &&
No lead analytics available yet.
}
+
+
+
+
+
Lead Sources
+
+ {(demographics?.by_source ?? []).map((row) => (
+
+ {row.source}
+ {row.lead_count}
+
+ ))}
+
+
+
+
Qualification Mix
+
+ {(demographics?.by_qualification ?? []).map((row) => (
+
+ {row.qualification}
+ {row.lead_count}
+
+ ))}
+
+
+
+
+
+
+
+
+
+
+
CRM Analytics
+
{totals.totalLeads > 0 ? 'Live data available' : 'No seeded verification data yet'}
+
+
+
Catalyst Contracts
+
{campaigns.length > 0 ? 'Marketing tab wired to root endpoints' : 'Campaign summary unavailable'}
+
+
+
Spend Capacity
+
Total budget {formatMoney(totals.totalBudget)}
+
+
+
+
void handleSeed()}
+ disabled={seeding}
+ className="inline-flex items-center justify-center gap-2 rounded-xl border border-blue-400/25 bg-blue-500/10 px-4 py-3 text-sm font-medium text-blue-200 disabled:opacity-50"
+ >
+ {seeding ? : }
+ Seed 100 Synthetic Leads
+
+
+ {error && {error}
}
+
+
+ );
+}
diff --git a/app/src/hooks/useCrmBootstrap.ts b/app/src/hooks/useCrmBootstrap.ts
new file mode 100644
index 00000000..891388c0
--- /dev/null
+++ b/app/src/hooks/useCrmBootstrap.ts
@@ -0,0 +1,46 @@
+import { useEffect } from 'react';
+
+import { getChatLogs, getLeads } from '@/lib/api';
+import { mapLeadRecordToStoreLead } from '@/lib/crmMappers';
+import { useStore } from '@/store/useStore';
+import type { ChatMessage } from '@/types';
+
+export function useCrmBootstrap() {
+ const { setLeads, replaceMessages } = useStore();
+
+ useEffect(() => {
+ let cancelled = false;
+ const hydrate = async () => {
+ try {
+ const leads = await getLeads();
+ if (cancelled) return;
+ setLeads(leads.map(mapLeadRecordToStoreLead));
+
+ const messageEntries = await Promise.all(
+ leads.slice(0, 25).map(async (lead) => {
+ const logs = await getChatLogs(lead.id);
+ return [
+ lead.id,
+ logs.map((log): ChatMessage => ({
+ id: log.id,
+ sender: log.sender === 'lead' ? 'user' : 'oracle',
+ content: log.content,
+ timestamp: new Date(log.created_at ?? Date.now()),
+ })),
+ ] as const;
+ }),
+ );
+ if (!cancelled) {
+ replaceMessages(Object.fromEntries(messageEntries));
+ }
+ } catch {
+ // Keep the current in-app fallback state if the CRM backend is unreachable.
+ }
+ };
+
+ void hydrate();
+ return () => {
+ cancelled = true;
+ };
+ }, [replaceMessages, setLeads]);
+}
diff --git a/app/src/lib/api.ts b/app/src/lib/api.ts
index 8ce007bf..724431e5 100644
--- a/app/src/lib/api.ts
+++ b/app/src/lib/api.ts
@@ -17,3 +17,119 @@ export const API_URL = (
).replace(/\/$/, '');
export const WS_URL = API_URL.replace(/^http/, 'ws');
+
+export interface ScatterDataPoint {
+ id: string;
+ name: string;
+ sentiment_score: number;
+ response_time_ms: number;
+ score: number;
+ qualification: string;
+ kanban_status: string;
+}
+
+export interface LeadRecord {
+ id: string;
+ name: string;
+ email?: string | null;
+ phone?: string | null;
+ source: string;
+ notes: string;
+ qualification: string;
+ score: number;
+ kanban_status: string;
+ stage: string;
+ budget: string;
+ unit_interest: string;
+ metadata: Record;
+ created_at?: string | null;
+ updated_at?: string | null;
+}
+
+export interface LeadDemographics {
+ by_source: Array<{ source: string; lead_count: number; avg_score: number }>;
+ by_qualification: Array<{ qualification: string; lead_count: number }>;
+}
+
+export interface ChatLogRecord {
+ id: string;
+ lead_id: string;
+ sender: string;
+ channel: string;
+ content: string;
+ metadata: Record;
+ created_at: string | null;
+}
+
+export interface MarketingCampaignSummary {
+ id: string;
+ name: string;
+ platform: 'meta' | 'google';
+ status: 'active' | 'paused' | 'completed';
+ budget: number;
+ spent: number;
+ impressions: number;
+ clicks: number;
+ conversions: number;
+}
+
+async function requestJson(path: string): Promise {
+ const response = await fetch(`${API_URL}${path}`, {
+ headers: { Accept: 'application/json' },
+ });
+ if (!response.ok) {
+ throw new Error(`Request failed: ${response.status}`);
+ }
+ return response.json() as Promise;
+}
+
+async function requestWrappedData(path: string): Promise {
+ const payload = await requestJson<{ data: T }>(path);
+ return payload.data;
+}
+
+export async function getSentimentScatter(): Promise {
+ return requestJson('/api/analytics/sentiment-scatter');
+}
+
+export async function getCatalystCampaigns(): Promise {
+ return requestWrappedData('/api/catalyst/campaigns');
+}
+
+export async function getLeads(): Promise {
+ const payload = await requestJson<{ data: LeadRecord[] }>('/api/leads');
+ return payload.data;
+}
+
+export async function getLead(leadId: string): Promise {
+ return requestWrappedData(`/api/leads/${leadId}`);
+}
+
+export async function getKanbanBoard() {
+ return requestWrappedData>('/api/kanban/board');
+}
+
+export async function getChatLogs(leadId?: string): Promise {
+ const suffix = leadId ? `?lead_id=${encodeURIComponent(leadId)}` : '';
+ return requestWrappedData(`/api/chat-logs${suffix}`);
+}
+
+export async function getLeadDemographics(): Promise {
+ return requestWrappedData('/api/leads/demographics');
+}
+
+export async function seedSyntheticLeads(count = 100): Promise<{ seeded: number; chat_logs_seeded: number; batch: string }> {
+ const response = await fetch(`${API_URL}/api/leads/seed-synthetic`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ Accept: 'application/json',
+ },
+ body: JSON.stringify({ count }),
+ });
+ if (!response.ok) {
+ throw new Error(`Seed request failed: ${response.status}`);
+ }
+ const payload = await response.json() as { data: { seeded: number; chat_logs_seeded: number; batch: string } };
+ return payload.data;
+}
diff --git a/app/src/lib/crmMappers.ts b/app/src/lib/crmMappers.ts
new file mode 100644
index 00000000..ddd17513
--- /dev/null
+++ b/app/src/lib/crmMappers.ts
@@ -0,0 +1,122 @@
+import type { ChatLogRecord, LeadRecord } from '@/lib/api';
+import type { Lead } from '@/types';
+import type { LeadBadge, LeadTag, LeadSource, Message, MessageSender, PipelineStage, SentimentLog } from '@/types/crm';
+
+const TAG_MAP: Record = {
+ whale: '#CashBuyer',
+ potential: '#Investor',
+ hot: '#EndUser',
+};
+
+export function mapLeadRecordToStoreLead(record: LeadRecord): Lead {
+ const qualification = record.qualification.toLowerCase() as Lead['qualification'];
+ const status = record.stage === 'closed'
+ ? 'closed'
+ : record.stage === 'qualified' || record.stage === 'negotiation'
+ ? 'qualified'
+ : record.score >= 75
+ ? 'hot'
+ : record.stage === 'new'
+ ? 'new'
+ : 'engaged';
+ const tags = Array.isArray(record.metadata?.tags) ? (record.metadata.tags as string[]) : [];
+ return {
+ id: record.id,
+ name: record.name,
+ phone: record.phone ?? '',
+ source: mapSource(record.source),
+ status,
+ lastMessage: record.notes || 'No conversation summary yet.',
+ lastActive: new Date(record.updated_at ?? record.created_at ?? Date.now()),
+ unreadCount: 0,
+ qualification: qualification === 'tire_kicker' || qualification === 'potential' || qualification === 'whale'
+ ? qualification
+ : 'potential',
+ budget: record.budget,
+ interest: record.unit_interest,
+ quantumDynamicsScore: record.score,
+ tags: tags.length > 0 ? tags : [record.qualification],
+ };
+}
+
+export function mapLeadRecordToOracleLead(record: LeadRecord, chatLogs: ChatLogRecord[]): import('@/types/crm').Lead {
+ const badge = mapBadge(record.qualification);
+ const tags = mapOracleTags(record.qualification, record.metadata);
+ return {
+ id: record.id,
+ name: record.name,
+ phone: record.phone ?? '',
+ stage: mapPipelineStage(record.stage),
+ oracleScore: record.score,
+ badge,
+ tags,
+ source: mapSource(record.source),
+ budget: record.budget,
+ unitInterest: record.unit_interest,
+ profileImageUrl: `https://api.dicebear.com/9.x/glass/svg?seed=${encodeURIComponent(record.name)}`,
+ visitedShowroom: record.stage === 'site_visit' || record.stage === 'negotiation' || record.stage === 'closed',
+ inShowroomNow: record.stage === 'site_visit',
+ messages: chatLogs.map(mapChatLogToOracleMessage),
+ sentimentLog: buildSentimentLog(record.score, record.stage),
+ };
+}
+
+function mapSource(source: string): LeadSource {
+ if (source === 'walkin' || source === 'website' || source === 'whatsapp') return source;
+ return 'website';
+}
+
+function mapPipelineStage(stage: string): PipelineStage {
+ const normalized = stage.toLowerCase();
+ if (normalized === 'new' || normalized === 'new_inquiries') return 'new_inquiries';
+ if (normalized === 'qualified' || normalized === 'qualifying') return 'qualified';
+ if (normalized === 'site_visit') return 'site_visit';
+ if (normalized === 'negotiation') return 'negotiation';
+ return 'closed';
+}
+
+function mapBadge(qualification: string): LeadBadge | undefined {
+ const normalized = qualification.toLowerCase();
+ if (normalized === 'whale') return 'whale';
+ if (normalized === 'hot' || normalized === 'potential') return 'hot';
+ if (normalized === 'tire_kicker') return 'tire_kicker';
+ return undefined;
+}
+
+function mapOracleTags(qualification: string, metadata: Record): LeadTag[] {
+ const mapped = TAG_MAP[qualification.toLowerCase()];
+ const rawTags = Array.isArray(metadata?.tags) ? metadata.tags as string[] : [];
+ const canonical = rawTags.includes('#CashBuyer') || mapped === '#CashBuyer'
+ ? '#CashBuyer'
+ : rawTags.includes('#EndUser') || mapped === '#EndUser'
+ ? '#EndUser'
+ : '#Investor';
+ return [canonical];
+}
+
+function mapChatLogToOracleMessage(log: ChatLogRecord): Message {
+ return {
+ id: log.id,
+ sender: mapSender(log.sender),
+ content: log.content,
+ createdAt: log.created_at ?? new Date().toISOString(),
+ };
+}
+
+function mapSender(sender: string): MessageSender {
+ if (sender === 'lead' || sender === 'oracle' || sender === 'system') return sender;
+ return 'system';
+}
+
+function buildSentimentLog(score: number, stage: string): SentimentLog[] {
+ const base = Math.max(20, score - 18);
+ const labels = stage === 'site_visit'
+ ? ['Entry', 'Showroom peak', 'Pricing review']
+ : ['Discovery', 'Qualification', 'Follow-up'];
+ return labels.map((label, index) => ({
+ id: `${stage}-${index}`,
+ at: `${10 + index}:0${index}`,
+ score: Math.min(100, base + index * 9),
+ note: label,
+ }));
+}
diff --git a/app/src/store/useMarketingStore.ts b/app/src/store/useMarketingStore.ts
index fb38fa21..4a0cedd9 100644
--- a/app/src/store/useMarketingStore.ts
+++ b/app/src/store/useMarketingStore.ts
@@ -146,7 +146,7 @@ interface MarketingState {
adInsights: AdInsight[];
liveEvents: LiveOptimizationEvent[];
settings: CatalystSettings;
- activeTab: 'studio' | 'command' | 'intelligence' | 'war-room';
+ activeTab: 'studio' | 'command' | 'intelligence' | 'war-room' | 'marketing';
// Actions
addCampaign: (campaign: Campaign) => void;
diff --git a/app/src/store/useStore.ts b/app/src/store/useStore.ts
index 74a8e8b1..1a88924a 100644
--- a/app/src/store/useStore.ts
+++ b/app/src/store/useStore.ts
@@ -37,6 +37,8 @@ interface OracleState {
activeLeadId: string | null;
messages: Record;
isOracleThinking: boolean;
+ setLeads: (leads: Lead[]) => void;
+ replaceMessages: (messages: Record) => void;
setActiveLead: (leadId: string | null) => void;
addMessage: (leadId: string, message: ChatMessage) => void;
setOracleThinking: (thinking: boolean) => void;
@@ -274,6 +276,8 @@ export const useStore = create()(
activeLeadId: null,
messages: mockMessages,
isOracleThinking: false,
+ setLeads: (leads) => set({ leads }),
+ replaceMessages: (messages) => set({ messages }),
setActiveLead: (leadId) => set({ activeLeadId: leadId }),
addMessage: (leadId, message) => set((state) => ({
messages: {
diff --git a/backend/api/routes_catalyst.py b/backend/api/routes_catalyst.py
index 5c1445ea..667d2b2a 100644
--- a/backend/api/routes_catalyst.py
+++ b/backend/api/routes_catalyst.py
@@ -11,14 +11,23 @@ Routes:
"""
import os
+import uuid
import hashlib
import logging
from typing import Any
from datetime import datetime
-from fastapi import APIRouter, HTTPException, Request, status
+from fastapi import APIRouter, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
+from backend.services.ad_network_service import (
+ AdInsight,
+ BidStrategyUpdate,
+ BudgetUpdate,
+ Platform,
+ ad_network_service,
+)
+
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -91,6 +100,7 @@ def _sha256_hash(value: str) -> str:
class CampaignCreateRequest(BaseModel):
name: str = Field(..., description="Campaign display name")
+ platform: Platform = Field(default=Platform.META, description="Target ad network platform")
objective: str = Field("OUTCOME_LEADS", description="Meta campaign objective enum")
budget_daily: int = Field(..., gt=0, description="Daily budget in cents (AED × 100)")
status: str = Field("PAUSED", description="Initial campaign status — start PAUSED for review")
@@ -121,9 +131,55 @@ class MetaAuthRequest(BaseModel):
short_lived_token: str = Field(..., description="Short-lived user access token from Meta OAuth")
+@router.get("/campaigns", summary="List unified campaign summaries for the Catalyst marketing tab")
+async def list_campaigns(platform: Platform | None = Query(default=None)) -> dict:
+ campaigns = await ad_network_service.list_campaigns(platform=platform)
+ insights = await ad_network_service.get_insights(platform=platform, days=7)
+ rollup: dict[str, dict[str, float]] = {}
+ for insight in insights:
+ insight_campaign_id = insight.campaign_id if isinstance(insight, AdInsight) else insight.get("campaign_id")
+ if not insight_campaign_id:
+ continue
+ spent = insight.spend if isinstance(insight, AdInsight) else float(insight.get("spend", 0))
+ impressions = insight.impressions if isinstance(insight, AdInsight) else int(insight.get("impressions", 0))
+ clicks = insight.clicks if isinstance(insight, AdInsight) else int(insight.get("clicks", 0))
+ conversions = insight.conversions if isinstance(insight, AdInsight) else int(insight.get("conversions", 0))
+ slot = rollup.setdefault(
+ insight_campaign_id,
+ {
+ "spent": 0.0,
+ "impressions": 0.0,
+ "clicks": 0.0,
+ "conversions": 0.0,
+ },
+ )
+ slot["spent"] += spent
+ slot["impressions"] += impressions
+ slot["clicks"] += clicks
+ slot["conversions"] += conversions
+ data = [
+ {
+ "id": campaign.id,
+ "name": campaign.name,
+ "platform": campaign.platform.value,
+ "status": campaign.status.value,
+ "budget": campaign.daily_budget,
+ "spent": round(rollup.get(campaign.id, {}).get("spent", campaign.spent), 2),
+ "impressions": int(rollup.get(campaign.id, {}).get("impressions", 0)),
+ "clicks": int(rollup.get(campaign.id, {}).get("clicks", 0)),
+ "conversions": int(rollup.get(campaign.id, {}).get("conversions", 0)),
+ "objective": campaign.objective,
+ "bid_strategy": campaign.bid_strategy,
+ }
+ for campaign in campaigns
+ ]
+ source = "ad_network_service_live" if platform else "ad_network_service_unified"
+ return _ok(data, meta={"count": len(data), "source": source})
+
+
# ── 1. POST /campaigns/create ─────────────────────────────────────────────────
-@router.post("/campaigns/create", summary="Bulk-create Meta Marketing campaigns")
+@router.post("/campaigns/create", summary="Create Meta or Google marketing campaigns")
async def create_campaigns(
request: Request,
payload: CampaignCreateRequest,
@@ -134,6 +190,25 @@ async def create_campaigns(
Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
"""
+ if payload.platform == Platform.GOOGLE:
+ campaign_id = f"google-camp-{uuid.uuid4().hex[:8]}"
+ if hasattr(request.app.state, "broadcast_live_event"):
+ await request.app.state.broadcast_live_event(
+ "create",
+ f"Created Google Ads campaign '{payload.name}'.",
+ payload.name,
+ f"Budget: AED {payload.budget_daily / 100:.0f}/day",
+ )
+ return _ok(
+ CampaignCreateResponse(
+ campaign_id=campaign_id,
+ name=payload.name,
+ status=payload.status,
+ created_at=datetime.utcnow().isoformat(),
+ ).model_dump(),
+ meta={"platform": "google", "mode": "simulated_or_provider_managed"},
+ )
+
_api, account_id = _get_sdk()
try:
@@ -226,53 +301,55 @@ async def sync_creative(
# ── 3. GET /insights/realtime ─────────────────────────────────────────────────
-@router.get("/insights/realtime", summary="Poll Meta Ads Insights API")
+@router.get("/insights/realtime", summary="Poll unified Meta and Google Ads insights")
async def get_realtime_insights(
- date_preset: str = "last_7_days",
- level: str = "adset",
+ campaign_id: str | None = None,
+ platform: Platform | None = Query(default=None),
+ days: int = Query(default=7, ge=1, le=90),
) -> dict:
- """
- Polls `AdAccount.get_insights()` for CTR, CPA, spend, impressions across Ad Sets.
- Supports `date_preset` (e.g. 'today', 'last_7_days', 'last_30_days') and
- `level` ('campaign', 'adset', 'ad').
-
- Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID
- """
- _api, account_id = _get_sdk()
-
try:
- from facebook_business.adobjects.adaccount import AdAccount # type: ignore
- from facebook_business.adobjects.adsinsights import AdsInsights # type: ignore
-
- account = AdAccount(account_id)
- fields = [
- AdsInsights.Field.campaign_name,
- AdsInsights.Field.adset_name,
- AdsInsights.Field.spend,
- AdsInsights.Field.impressions,
- AdsInsights.Field.clicks,
- AdsInsights.Field.ctr,
- AdsInsights.Field.cpp, # cost per purchase (proxy for CPA)
- AdsInsights.Field.date_start,
- AdsInsights.Field.date_stop,
- ]
- params = {
- "date_preset": date_preset,
- "level": level,
- }
- insights_cursor = account.get_insights(fields=fields, params=params)
- results = [dict(row) for row in insights_cursor]
-
- return _ok(results, meta={
- "account_id": account_id,
- "date_preset": date_preset,
- "level": level,
- "count": len(results),
- })
+ insights = await ad_network_service.get_insights(campaign_id=campaign_id, platform=platform, days=days)
except Exception as exc:
logger.error("Insights fetch failed: %s", exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
+ data = [item.model_dump() if isinstance(item, AdInsight) else item for item in insights]
+ return _ok(data, meta={"count": len(data), "days": days, "platform": platform.value if platform else "all"})
+
+
+@router.put("/budget", summary="Update Meta or Google Ads budget and campaign status")
+async def update_campaign_budget(request: Request, payload: BudgetUpdate) -> dict:
+ try:
+ result = await ad_network_service.update_budget(payload)
+ if hasattr(request.app.state, "broadcast_live_event"):
+ await request.app.state.broadcast_live_event(
+ "budget_update",
+ f"Updated {payload.platform.value} budget for {payload.campaign_id}.",
+ payload.campaign_id,
+ f"daily={payload.daily_budget} lifetime={payload.lifetime_budget}",
+ )
+ return _ok(result)
+ except Exception as exc:
+ logger.error("Budget update failed: %s", exc)
+ raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
+
+
+@router.put("/bid-strategy", summary="Apply Meta or Google Ads bid strategy changes")
+async def update_bid_strategy(request: Request, payload: BidStrategyUpdate) -> dict:
+ try:
+ action = await ad_network_service.update_bid_strategy(payload)
+ if hasattr(request.app.state, "broadcast_live_event"):
+ await request.app.state.broadcast_live_event(
+ "bid_strategy_update",
+ f"Updated {payload.platform.value} bid strategy for {payload.campaign_id}.",
+ payload.campaign_id,
+ payload.strategy,
+ )
+ return _ok(action.model_dump())
+ except Exception as exc:
+ logger.error("Bid strategy update failed: %s", exc)
+ raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc))
+
# ── 4. POST /audiences/lookalike ──────────────────────────────────────────────
diff --git a/backend/api/routes_crm.py b/backend/api/routes_crm.py
index e69de29b..e2b3a4cc 100644
--- a/backend/api/routes_crm.py
+++ b/backend/api/routes_crm.py
@@ -0,0 +1,630 @@
+from __future__ import annotations
+
+import json
+import logging
+import uuid
+from datetime import datetime, timezone
+from typing import Any, Literal
+
+from fastapi import APIRouter, HTTPException, Query, Request, status
+from pydantic import BaseModel, Field
+
+logger = logging.getLogger(__name__)
+
+crm_router = APIRouter()
+analytics_router = APIRouter()
+
+_CRM_SCHEMA_CACHE_KEY = "_crm_schema_ready"
+_KANBAN_STAGE_MAP = {
+ "new": "New",
+ "new_inquiries": "New",
+ "qualifying": "Qualifying",
+ "qualified": "Qualifying",
+ "site_visit": "Site Visit",
+ "site visit": "Site Visit",
+ "negotiation": "Negotiation",
+ "closed": "Closed",
+ "closed_won": "Closed",
+ "closed/won": "Closed",
+}
+
+
+def _now() -> datetime:
+ return datetime.now(timezone.utc)
+
+
+def _normalize_stage(value: str | None) -> str:
+ if not value:
+ return "New"
+ return _KANBAN_STAGE_MAP.get(value.strip().lower(), value.strip())
+
+
+def _stage_key(value: str) -> str:
+ stage = _normalize_stage(value)
+ return stage.lower().replace(" ", "_")
+
+
+def _infer_qualification(score: int | None, source: str | None, notes: str | None) -> str:
+ joined = f"{source or ''} {notes or ''}".lower()
+ if score is None:
+ return "UNKNOWN"
+ if score >= 90 or "cash" in joined or "hnw" in joined or "family office" in joined:
+ return "WHALE"
+ if score >= 70:
+ return "POTENTIAL"
+ if score >= 45:
+ return "HOT"
+ return "TIRE_KICKER"
+
+
+async def _broadcast_crm_event(request: Request, payload: dict[str, Any]) -> None:
+ broadcaster = getattr(request.app.state, "broadcast_crm_event", None)
+ if broadcaster is not None:
+ await broadcaster(payload)
+
+
+async def _get_pool(request: Request):
+ pool = getattr(request.app.state, "db_pool", None)
+ if pool is None:
+ raise HTTPException(status_code=503, detail="Database unavailable.")
+ return pool
+
+
+async def _ensure_schema(request: Request) -> None:
+ if getattr(request.app.state, _CRM_SCHEMA_CACHE_KEY, False):
+ return
+
+ pool = await _get_pool(request)
+ async with pool.acquire() as conn:
+ await conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS leads (
+ id TEXT PRIMARY KEY,
+ name TEXT NOT NULL,
+ email TEXT,
+ phone TEXT,
+ source TEXT NOT NULL DEFAULT 'website',
+ notes TEXT NOT NULL DEFAULT '',
+ qualification TEXT NOT NULL DEFAULT 'UNKNOWN',
+ score INTEGER NOT NULL DEFAULT 0,
+ kanban_status TEXT NOT NULL DEFAULT 'New',
+ budget TEXT NOT NULL DEFAULT '',
+ unit_interest TEXT NOT NULL DEFAULT '',
+ metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
+ )
+ """
+ )
+ await conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS chat_logs (
+ id TEXT PRIMARY KEY,
+ lead_id TEXT NOT NULL REFERENCES leads(id) ON DELETE CASCADE,
+ sender TEXT NOT NULL,
+ channel TEXT NOT NULL DEFAULT 'oracle',
+ content TEXT NOT NULL,
+ metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
+ )
+ """
+ )
+ await conn.execute("CREATE INDEX IF NOT EXISTS idx_leads_stage ON leads (kanban_status)")
+ await conn.execute("CREATE INDEX IF NOT EXISTS idx_leads_score ON leads (score DESC)")
+ await conn.execute("CREATE INDEX IF NOT EXISTS idx_chat_logs_lead_id ON chat_logs (lead_id, created_at DESC)")
+
+ setattr(request.app.state, _CRM_SCHEMA_CACHE_KEY, True)
+
+
+class LeadUpsertRequest(BaseModel):
+ name: str = Field(..., min_length=1, max_length=200)
+ email: str | None = Field(default=None, max_length=255)
+ phone: str | None = Field(default=None, max_length=64)
+ source: str = Field(default="website", max_length=64)
+ notes: str = Field(default="", max_length=5000)
+ qualification: str | None = Field(default=None, max_length=64)
+ score: int = Field(default=0, ge=0, le=100)
+ kanban_status: str = Field(default="New", max_length=64)
+ budget: str = Field(default="", max_length=255)
+ unit_interest: str = Field(default="", max_length=255)
+ metadata: dict[str, Any] = Field(default_factory=dict)
+
+
+class KanbanMoveRequest(BaseModel):
+ lead_id: str
+ target_status: str
+
+
+class ChatLogCreateRequest(BaseModel):
+ lead_id: str
+ sender: Literal["lead", "oracle", "system", "broker"] = "oracle"
+ channel: str = Field(default="oracle", max_length=64)
+ content: str = Field(..., min_length=1, max_length=8000)
+ metadata: dict[str, Any] = Field(default_factory=dict)
+
+
+class SyntheticSeedRequest(BaseModel):
+ count: int = Field(default=100, ge=1, le=500)
+
+
+def _serialize_lead(row: Any) -> dict[str, Any]:
+ score = int(row["score"] or 0)
+ status_label = _normalize_stage(row["kanban_status"])
+ qualification = row["qualification"] or _infer_qualification(score, row.get("source"), row.get("notes"))
+ return {
+ "id": row["id"],
+ "name": row["name"],
+ "email": row["email"],
+ "phone": row["phone"],
+ "source": row["source"],
+ "notes": row["notes"],
+ "qualification": qualification,
+ "score": score,
+ "kanban_status": status_label,
+ "stage": _stage_key(status_label),
+ "budget": row["budget"],
+ "unit_interest": row["unit_interest"],
+ "metadata": row["metadata"] or {},
+ "created_at": row["created_at"].isoformat() if row["created_at"] else None,
+ "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
+ }
+
+
+def _serialize_chat_log(row: Any) -> dict[str, Any]:
+ return {
+ "id": row["id"],
+ "lead_id": row["lead_id"],
+ "sender": row["sender"],
+ "channel": row["channel"],
+ "content": row["content"],
+ "metadata": row["metadata"] or {},
+ "created_at": row["created_at"].isoformat() if row["created_at"] else None,
+ }
+
+
+def _build_synthetic_leads(count: int) -> list[dict[str, Any]]:
+ first_names = ["Amina", "Omar", "Farah", "Rayan", "Maya", "Khalid", "Noor", "Zara", "Ibrahim", "Layla"]
+ last_names = ["Rahman", "Al-Farsi", "Kapoor", "Haddad", "Mehta", "Nadeem", "Shaikh", "Rao", "Wilson", "Chen"]
+ sources = ["website", "walkin", "whatsapp"]
+ stages = ["New", "Qualifying", "Site Visit", "Negotiation", "Closed"]
+ interests = ["2BHK Marina View", "3BHK Corner Unit", "Penthouse Sky Deck", "Investment Studio", "4BHK Sea View"]
+ budgets = ["AED 2.4M", "AED 4.8M", "AED 7.2M", "AED 12M", "AED 18M"]
+ rows: list[dict[str, Any]] = []
+ for idx in range(count):
+ score = 35 + ((idx * 7) % 61)
+ if idx % 12 == 0:
+ score = 94
+ name = f"{first_names[idx % len(first_names)]} {last_names[(idx * 3) % len(last_names)]}"
+ source = sources[idx % len(sources)]
+ notes = (
+ "Cash-ready HNI buyer focusing on waterfront premium inventory."
+ if score >= 90
+ else "Follow-up required on payment plan and amenity preferences."
+ )
+ rows.append(
+ {
+ "id": str(uuid.uuid4()),
+ "name": name,
+ "email": f"{name.lower().replace(' ', '.')}@synthetic.velocity.local",
+ "phone": f"+9715000{idx:05d}",
+ "source": source,
+ "notes": notes,
+ "qualification": _infer_qualification(score, source, notes).upper(),
+ "score": score,
+ "kanban_status": stages[idx % len(stages)],
+ "budget": budgets[idx % len(budgets)],
+ "unit_interest": interests[idx % len(interests)],
+ "metadata": {
+ "synthetic": True,
+ "campaign": "verification-seed",
+ "batch": "sprint1-root-integration",
+ },
+ }
+ )
+ return rows
+
+
+@crm_router.get("/leads")
+async def list_leads(
+ request: Request,
+ kanban_status: str | None = None,
+ qualification: str | None = None,
+ search: str | None = Query(default=None, min_length=1),
+) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ clauses: list[str] = []
+ params: list[Any] = []
+ if kanban_status:
+ params.append(_normalize_stage(kanban_status))
+ clauses.append(f"kanban_status = ${len(params)}")
+ if qualification:
+ params.append(qualification.upper())
+ clauses.append(f"qualification = ${len(params)}")
+ if search:
+ params.append(f"%{search.lower()}%")
+ clauses.append(f"(LOWER(name) LIKE ${len(params)} OR LOWER(COALESCE(email, '')) LIKE ${len(params)} OR LOWER(COALESCE(phone, '')) LIKE ${len(params)})")
+
+ where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
+ query = f"""
+ SELECT id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ FROM leads
+ {where}
+ ORDER BY score DESC, updated_at DESC, created_at DESC
+ """
+ async with pool.acquire() as conn:
+ rows = await conn.fetch(query, *params)
+ leads = [_serialize_lead(row) for row in rows]
+ return {"status": "ok", "data": leads, "meta": {"count": len(leads)}}
+
+
+@crm_router.post("/leads", status_code=status.HTTP_201_CREATED)
+async def create_lead(request: Request, payload: LeadUpsertRequest) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ lead_id = str(uuid.uuid4())
+ qualification = (payload.qualification or _infer_qualification(payload.score, payload.source, payload.notes)).upper()
+ stage = _normalize_stage(payload.kanban_status)
+ async with pool.acquire() as conn:
+ row = await conn.fetchrow(
+ """
+ INSERT INTO leads (
+ id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ ) VALUES (
+ $1, $2, $3, $4, $5, $6, $7, $8, $9,
+ $10, $11, $12::jsonb, NOW(), NOW()
+ )
+ RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ """,
+ lead_id,
+ payload.name,
+ payload.email,
+ payload.phone,
+ payload.source,
+ payload.notes,
+ qualification,
+ payload.score,
+ stage,
+ payload.budget,
+ payload.unit_interest,
+ json.dumps(payload.metadata),
+ )
+ data = _serialize_lead(row)
+ await _broadcast_crm_event(request, {"type": "lead_created", "entity": "lead", "data": data})
+ return {"status": "ok", "data": data}
+
+
+@crm_router.put("/leads/{lead_id}")
+async def update_lead(lead_id: str, request: Request, payload: LeadUpsertRequest) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ qualification = (payload.qualification or _infer_qualification(payload.score, payload.source, payload.notes)).upper()
+ stage = _normalize_stage(payload.kanban_status)
+ async with pool.acquire() as conn:
+ row = await conn.fetchrow(
+ """
+ UPDATE leads
+ SET name = $2,
+ email = $3,
+ phone = $4,
+ source = $5,
+ notes = $6,
+ qualification = $7,
+ score = $8,
+ kanban_status = $9,
+ budget = $10,
+ unit_interest = $11,
+ metadata = $12::jsonb,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ """,
+ lead_id,
+ payload.name,
+ payload.email,
+ payload.phone,
+ payload.source,
+ payload.notes,
+ qualification,
+ payload.score,
+ stage,
+ payload.budget,
+ payload.unit_interest,
+ json.dumps(payload.metadata),
+ )
+ if row is None:
+ raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.")
+ data = _serialize_lead(row)
+ await _broadcast_crm_event(request, {"type": "lead_updated", "entity": "lead", "data": data})
+ return {"status": "ok", "data": data}
+
+
+@crm_router.delete("/leads/{lead_id}")
+async def delete_lead(lead_id: str, request: Request) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ async with pool.acquire() as conn:
+ result = await conn.execute("DELETE FROM leads WHERE id = $1", lead_id)
+ if result.endswith("0"):
+ raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.")
+ await _broadcast_crm_event(request, {"type": "lead_deleted", "entity": "lead", "entity_id": lead_id})
+ return {"status": "ok", "data": {"id": lead_id, "deleted": True}}
+
+
+@crm_router.post("/leads/seed-synthetic", status_code=status.HTTP_201_CREATED)
+async def seed_synthetic_leads(request: Request, payload: SyntheticSeedRequest) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ synthetic_rows = _build_synthetic_leads(payload.count)
+ inserted = 0
+ chat_logs_inserted = 0
+ async with pool.acquire() as conn:
+ for row in synthetic_rows:
+ await conn.execute(
+ """
+ INSERT INTO leads (
+ id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ ) VALUES (
+ $1, $2, $3, $4, $5, $6, $7, $8, $9,
+ $10, $11, $12::jsonb, NOW(), NOW()
+ )
+ ON CONFLICT (id) DO NOTHING
+ """,
+ row["id"],
+ row["name"],
+ row["email"],
+ row["phone"],
+ row["source"],
+ row["notes"],
+ row["qualification"],
+ row["score"],
+ row["kanban_status"],
+ row["budget"],
+ row["unit_interest"],
+ json.dumps(row["metadata"]),
+ )
+ inserted += 1
+ for sender, channel, content in [
+ ("lead", "whatsapp", f"{row['name']} asked for availability on {row['unit_interest']}."),
+ ("oracle", "oracle", "Oracle generated a guided follow-up based on budget, stage, and source quality."),
+ ]:
+ await conn.execute(
+ """
+ INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at)
+ VALUES ($1, $2, $3, $4, $5, $6::jsonb, NOW())
+ """,
+ str(uuid.uuid4()),
+ row["id"],
+ sender,
+ channel,
+ content,
+ json.dumps({"synthetic": True}),
+ )
+ chat_logs_inserted += 1
+ result = {
+ "status": "ok",
+ "data": {
+ "seeded": inserted,
+ "chat_logs_seeded": chat_logs_inserted,
+ "batch": "sprint1-root-integration",
+ },
+ }
+ await _broadcast_crm_event(
+ request,
+ {
+ "type": "crm_seeded",
+ "entity": "lead_batch",
+ "data": result["data"],
+ },
+ )
+ return result
+
+
+@crm_router.get("/leads/demographics")
+async def lead_demographics(request: Request) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ async with pool.acquire() as conn:
+ source_rows = await conn.fetch(
+ """
+ SELECT source, COUNT(*)::int AS lead_count, COALESCE(AVG(score), 0)::float AS avg_score
+ FROM leads
+ GROUP BY source
+ ORDER BY lead_count DESC, source ASC
+ """
+ )
+ qualification_rows = await conn.fetch(
+ """
+ SELECT qualification, COUNT(*)::int AS lead_count
+ FROM leads
+ GROUP BY qualification
+ ORDER BY lead_count DESC, qualification ASC
+ """
+ )
+ return {
+ "status": "ok",
+ "data": {
+ "by_source": [dict(row) for row in source_rows],
+ "by_qualification": [dict(row) for row in qualification_rows],
+ },
+ }
+
+
+@crm_router.get("/leads/{lead_id}")
+async def get_lead(lead_id: str, request: Request) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ async with pool.acquire() as conn:
+ row = await conn.fetchrow(
+ """
+ SELECT id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ FROM leads
+ WHERE id = $1
+ """,
+ lead_id,
+ )
+ if row is None:
+ raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.")
+ return {"status": "ok", "data": _serialize_lead(row)}
+
+
+@crm_router.get("/chat-logs")
+async def list_chat_logs(
+ request: Request,
+ lead_id: str | None = None,
+ channel: str | None = None,
+) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ clauses: list[str] = []
+ params: list[Any] = []
+ if lead_id:
+ params.append(lead_id)
+ clauses.append(f"lead_id = ${len(params)}")
+ if channel:
+ params.append(channel)
+ clauses.append(f"channel = ${len(params)}")
+ where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
+ query = f"""
+ SELECT id, lead_id, sender, channel, content, metadata, created_at
+ FROM chat_logs
+ {where}
+ ORDER BY created_at DESC
+ """
+ async with pool.acquire() as conn:
+ rows = await conn.fetch(query, *params)
+ data = [_serialize_chat_log(row) for row in rows]
+ return {"status": "ok", "data": data, "meta": {"count": len(data)}}
+
+
+@crm_router.post("/chat-logs", status_code=status.HTTP_201_CREATED)
+async def create_chat_log(request: Request, payload: ChatLogCreateRequest) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ log_id = str(uuid.uuid4())
+ async with pool.acquire() as conn:
+ lead = await conn.fetchrow("SELECT id FROM leads WHERE id = $1", payload.lead_id)
+ if lead is None:
+ raise HTTPException(status_code=404, detail=f"Lead '{payload.lead_id}' not found.")
+ row = await conn.fetchrow(
+ """
+ INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at)
+ VALUES ($1, $2, $3, $4, $5, $6::jsonb, NOW())
+ RETURNING id, lead_id, sender, channel, content, metadata, created_at
+ """,
+ log_id,
+ payload.lead_id,
+ payload.sender,
+ payload.channel,
+ payload.content,
+ json.dumps(payload.metadata),
+ )
+ data = _serialize_chat_log(row)
+ await _broadcast_crm_event(request, {"type": "chat_log_created", "entity": "chat_log", "data": data})
+ return {"status": "ok", "data": data}
+
+
+@crm_router.get("/kanban/board")
+async def get_kanban_board(request: Request) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ ordered_stages = ["New", "Qualifying", "Site Visit", "Negotiation", "Closed"]
+ async with pool.acquire() as conn:
+ rows = await conn.fetch(
+ """
+ SELECT id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ FROM leads
+ ORDER BY score DESC, updated_at DESC, created_at DESC
+ """
+ )
+ leads = [_serialize_lead(row) for row in rows]
+ grouped = {stage: [] for stage in ordered_stages}
+ for lead in leads:
+ grouped.setdefault(lead["kanban_status"], []).append(lead)
+ board = [
+ {
+ "status": stage,
+ "stage": _stage_key(stage),
+ "count": len(grouped.get(stage, [])),
+ "items": grouped.get(stage, []),
+ }
+ for stage in ordered_stages
+ ]
+ return {"status": "ok", "data": board}
+
+
+@crm_router.put("/kanban/move")
+async def move_kanban_card(request: Request, payload: KanbanMoveRequest) -> dict[str, Any]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ stage = _normalize_stage(payload.target_status)
+ async with pool.acquire() as conn:
+ row = await conn.fetchrow(
+ """
+ UPDATE leads
+ SET kanban_status = $2,
+ qualification = CASE
+ WHEN score >= 90 THEN 'WHALE'
+ WHEN score >= 70 THEN 'POTENTIAL'
+ WHEN score >= 45 THEN 'HOT'
+ ELSE qualification
+ END,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status,
+ budget, unit_interest, metadata, created_at, updated_at
+ """,
+ payload.lead_id,
+ stage,
+ )
+ if row is None:
+ raise HTTPException(status_code=404, detail=f"Lead '{payload.lead_id}' not found.")
+ data = _serialize_lead(row)
+ await _broadcast_crm_event(
+ request,
+ {
+ "type": "kanban_moved",
+ "entity": "lead",
+ "entity_id": payload.lead_id,
+ "data": data,
+ },
+ )
+ return {"status": "ok", "data": data}
+
+
+@analytics_router.get("/sentiment-scatter")
+async def sentiment_scatter(request: Request) -> list[dict[str, Any]]:
+ await _ensure_schema(request)
+ pool = await _get_pool(request)
+ async with pool.acquire() as conn:
+ rows = await conn.fetch(
+ """
+ SELECT id, name, score, qualification, kanban_status, source, notes, updated_at
+ FROM leads
+ WHERE score IS NOT NULL
+ ORDER BY score DESC, updated_at DESC
+ """
+ )
+ points: list[dict[str, Any]] = []
+ for row in rows:
+ score = int(row["score"] or 0)
+ qualification = row["qualification"] or _infer_qualification(score, row["source"], row["notes"])
+ points.append(
+ {
+ "id": row["id"],
+ "name": row["name"],
+ "sentiment_score": max(0, min(100, int(score * 0.82) + 10)),
+ "response_time_ms": max(120, 10000 - (score * 55)),
+ "score": score,
+ "qualification": qualification,
+ "kanban_status": _normalize_stage(row["kanban_status"]),
+ }
+ )
+ return points
diff --git a/backend/api/routes_oracle.py b/backend/api/routes_oracle.py
index e69de29b..045e0bb0 100644
--- a/backend/api/routes_oracle.py
+++ b/backend/api/routes_oracle.py
@@ -0,0 +1,104 @@
+from __future__ import annotations
+
+from fastapi import APIRouter, HTTPException, Request
+from pydantic import BaseModel, Field
+
+from backend.oracle.action_service import oracle_action_service
+from backend.oracle.persona_service import persona_service
+from backend.services.mcp_registry import mcp_registry
+from backend.services.nemoclaw_runtime import nemoclaw_runtime
+
+router = APIRouter()
+
+
+class WorkflowPreviewRequest(BaseModel):
+ prompt: str = Field(..., min_length=1, max_length=4096)
+ tenant_id: str = "tenant_velocity"
+ actor_role: str = "sales_director"
+
+
+class MCPExecuteRequest(BaseModel):
+ tool_name: str = Field(..., min_length=1, max_length=128)
+ query: str = Field(..., min_length=1, max_length=1024)
+
+
+class OracleWritebackRequest(BaseModel):
+ action_id: str
+ tenant_id: str = "tenant_velocity"
+ actor_id: str = "oracle_operator"
+ target_entity_type: str = Field(..., min_length=1, max_length=64)
+ target_entity_id: str = Field(..., min_length=1, max_length=128)
+ action_type: str = Field(default="lead_writeback", min_length=1, max_length=128)
+ writeback_payload: dict = Field(default_factory=dict)
+
+
+@router.get("/health")
+async def oracle_health() -> dict:
+ return {
+ "status": "ok",
+ "persona": await persona_service.health(),
+ "mcp_tools": mcp_registry.list_tools(),
+ }
+
+
+@router.get("/mcp/tools")
+async def oracle_mcp_tools() -> dict:
+ return {"status": "ok", "data": mcp_registry.list_tools()}
+
+
+@router.post("/mcp/execute")
+async def oracle_mcp_execute(request: Request, payload: MCPExecuteRequest) -> dict:
+ pool = getattr(request.app.state, "db_pool", None)
+ result = await mcp_registry.execute(payload.tool_name, payload.query, crm_pool=pool)
+ return {"status": "ok", "data": result}
+
+
+@router.post("/workflow/preview")
+async def workflow_preview(payload: WorkflowPreviewRequest) -> dict:
+ persona_plan = await persona_service.plan_for_prompt(
+ prompt=payload.prompt,
+ tenant_id=payload.tenant_id,
+ actor_role=payload.actor_role,
+ )
+ return {
+ "status": "ok",
+ "data": {
+ "persona_plan": persona_plan,
+ "workflow": nemoclaw_runtime.build_workflow_dispatch(
+ prompt=payload.prompt,
+ tenant_id=payload.tenant_id,
+ actor_role=payload.actor_role,
+ component_templates=persona_plan["recommendedTemplates"],
+ ),
+ },
+ }
+
+
+@router.get("/actions")
+async def list_oracle_actions(status: str | None = None, limit: int = 50) -> dict:
+ actions = await oracle_action_service.list_actions(status=status, limit=limit)
+ return {"status": "ok", "data": actions, "meta": {"count": len(actions)}}
+
+
+@router.get("/actions/{action_id}")
+async def get_oracle_action(action_id: str) -> dict:
+ action = await oracle_action_service.get_action(action_id)
+ if not action:
+ raise HTTPException(status_code=404, detail=f"Oracle action '{action_id}' not found.")
+ return {"status": "ok", "data": action}
+
+
+@router.post("/actions/writeback")
+async def apply_oracle_writeback(request: Request, payload: OracleWritebackRequest) -> dict:
+ result = await oracle_action_service.apply_writeback(payload.model_dump())
+ if hasattr(request.app.state, "broadcast_crm_event"):
+ await request.app.state.broadcast_crm_event(
+ {
+ "type": "oracle_writeback",
+ "entity": payload.target_entity_type,
+ "entity_id": payload.target_entity_id,
+ "action_id": payload.action_id,
+ "payload": result["resultPayload"],
+ }
+ )
+ return {"status": "ok", "data": result}
diff --git a/backend/main.py b/backend/main.py
index 99e93f03..6949667f 100644
--- a/backend/main.py
+++ b/backend/main.py
@@ -12,7 +12,7 @@ import json
import asyncio
import logging
from contextlib import asynccontextmanager
-from datetime import datetime
+from datetime import UTC, datetime
from typing import Set
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
@@ -21,10 +21,13 @@ from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
from backend.api.routes_catalyst import router as catalyst_router
+from backend.api.routes_crm import crm_router, analytics_router
+from backend.api.routes_oracle import router as oracle_helper_router
from backend.auth.dependencies import (
create_access_token, verify_password, get_current_user
)
from backend.db.pool import create_pool, close_pool
+from backend.oracle.router_v1 import router as oracle_v1_router
from backend.routers.cctv import router as cctv_router
from backend.routers.scenes import router as scenes_router
from backend.routers.videos import router as videos_router
@@ -86,6 +89,10 @@ if os.path.isdir(ASSET_DIR):
# ── Routers ───────────────────────────────────────────────────────────────────
app.include_router(catalyst_router, prefix="/api/catalyst", tags=["Catalyst"])
+app.include_router(crm_router, prefix="/api", tags=["CRM"])
+app.include_router(analytics_router, prefix="/api/analytics", tags=["Analytics"])
+app.include_router(oracle_helper_router, prefix="/api/oracle", tags=["Oracle"])
+app.include_router(oracle_v1_router, prefix="/api/oracle/v1", tags=["Oracle V1"])
app.include_router(sentinel_router, prefix="/api/sentinel", tags=["Sentinel"])
app.include_router(cctv_router, prefix="/api/cctv", tags=["CCTV"])
app.include_router(scenes_router, prefix="/api/scenes", tags=["Scenes"])
@@ -165,6 +172,30 @@ class _CatalystManager:
_catalyst_mgr = _CatalystManager()
+class _CRMManager:
+ def __init__(self) -> None:
+ self.active: Set[WebSocket] = set()
+
+ async def connect(self, ws: WebSocket) -> None:
+ await ws.accept()
+ self.active.add(ws)
+
+ def disconnect(self, ws: WebSocket) -> None:
+ self.active.discard(ws)
+
+ async def broadcast(self, payload: dict) -> None:
+ dead: Set[WebSocket] = set()
+ for ws in self.active:
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.add(ws)
+ self.active -= dead
+
+
+_crm_mgr = _CRMManager()
+
+
@app.websocket("/ws/catalyst")
async def catalyst_ws(ws: WebSocket) -> None:
await _catalyst_mgr.connect(ws)
@@ -176,13 +207,31 @@ async def catalyst_ws(ws: WebSocket) -> None:
_catalyst_mgr.disconnect(ws)
+@app.websocket("/ws/crm")
+async def crm_ws(ws: WebSocket) -> None:
+ await _crm_mgr.connect(ws)
+ await _crm_mgr.broadcast(
+ {
+ "type": "crm_presence",
+ "connected_clients": len(_crm_mgr.active),
+ "timestamp": datetime.now(UTC).isoformat(),
+ }
+ )
+ try:
+ while True:
+ message = await ws.receive_text()
+ await ws.send_text(json.dumps({"type": "crm_ack", "data": message}))
+ except WebSocketDisconnect:
+ _crm_mgr.disconnect(ws)
+
+
async def broadcast_live_event(event_type, message, campaign_name=None, value=None):
payload = {
"type": event_type,
"message": message,
"campaignName": campaign_name,
"value": value,
- "timestamp": datetime.utcnow().isoformat(),
+ "timestamp": datetime.now(UTC).isoformat(),
}
await _catalyst_mgr.broadcast(payload)
@@ -190,6 +239,17 @@ async def broadcast_live_event(event_type, message, campaign_name=None, value=No
app.state.broadcast_live_event = broadcast_live_event
+async def broadcast_crm_event(payload: dict) -> None:
+ enriched = {
+ **payload,
+ "timestamp": datetime.now(UTC).isoformat(),
+ }
+ await _crm_mgr.broadcast(enriched)
+
+
+app.state.broadcast_crm_event = broadcast_crm_event
+
+
# ── Health ─────────────────────────────────────────────────────────────────────
@app.get("/health", tags=["Health"])
@@ -201,6 +261,6 @@ async def health() -> dict:
"service": "velocity-backend",
"version": "2.0.0",
"db_pool": "connected" if db_ok else "unavailable",
- "timestamp": datetime.utcnow().isoformat(),
+ "timestamp": datetime.now(UTC).isoformat(),
}
diff --git a/backend/oracle/action_service.py b/backend/oracle/action_service.py
new file mode 100644
index 00000000..70307961
--- /dev/null
+++ b/backend/oracle/action_service.py
@@ -0,0 +1,346 @@
+from __future__ import annotations
+
+import json
+import os
+import uuid
+from datetime import datetime, timezone
+from typing import Any
+
+from fastapi import HTTPException
+
+try:
+ import asyncpg # type: ignore
+except Exception: # pragma: no cover
+ asyncpg = None # type: ignore
+
+
+_DB_URL = os.getenv("DATABASE_URL", "")
+
+
+def _now() -> str:
+ return datetime.now(timezone.utc).isoformat()
+
+
+def _db_ready() -> bool:
+ return bool(_DB_URL and not _DB_URL.startswith("PLACEHOLDER") and asyncpg is not None)
+
+
+class OracleActionService:
+ async def ensure_schema(self) -> None:
+ if not _db_ready():
+ return
+ assert asyncpg is not None
+ conn = await asyncpg.connect(_DB_URL)
+ try:
+ await conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS oracle_actions (
+ action_id UUID PRIMARY KEY,
+ execution_id UUID,
+ tenant_id TEXT NOT NULL,
+ page_id UUID,
+ branch_id TEXT,
+ actor_id TEXT NOT NULL,
+ target_entity_type TEXT NOT NULL,
+ target_entity_id TEXT,
+ action_type TEXT NOT NULL,
+ status TEXT NOT NULL DEFAULT 'planned',
+ prompt TEXT,
+ workflow_dispatch JSONB NOT NULL DEFAULT '{}'::jsonb,
+ component_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
+ writeback_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
+ result_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
+ )
+ """
+ )
+ await conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_oracle_actions_execution ON oracle_actions(execution_id, created_at DESC)"
+ )
+ await conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_oracle_actions_target ON oracle_actions(target_entity_type, target_entity_id, created_at DESC)"
+ )
+ finally:
+ await conn.close()
+
+ async def create_from_execution(
+ self,
+ *,
+ execution: dict[str, Any],
+ target_entity_type: str = "canvas_page",
+ target_entity_id: str | None = None,
+ action_type: str = "oracle_canvas_generation",
+ writeback_payload: dict[str, Any] | None = None,
+ ) -> dict[str, Any]:
+ action = {
+ "actionId": str(uuid.uuid4()),
+ "executionId": execution.get("executionId"),
+ "tenantId": execution.get("tenantId"),
+ "pageId": execution.get("pageId"),
+ "branchId": execution.get("branchId"),
+ "actorId": execution.get("actorId"),
+ "targetEntityType": target_entity_type,
+ "targetEntityId": target_entity_id or execution.get("pageId"),
+ "actionType": action_type,
+ "status": "planned",
+ "prompt": execution.get("prompt"),
+ "workflowDispatch": execution.get("workflowDispatch") or {},
+ "componentIds": execution.get("componentsCreated") or [],
+ "writebackPayload": writeback_payload or {},
+ "resultPayload": {},
+ "createdAt": _now(),
+ "updatedAt": _now(),
+ }
+ await self._persist_action(action)
+ return action
+
+ async def get_action(self, action_id: str) -> dict[str, Any] | None:
+ if not _db_ready():
+ return None
+ assert asyncpg is not None
+ conn = await asyncpg.connect(_DB_URL)
+ try:
+ row = await conn.fetchrow(
+ """
+ SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id,
+ target_entity_type, target_entity_id, action_type, status, prompt,
+ workflow_dispatch, component_ids, writeback_payload, result_payload,
+ created_at, updated_at
+ FROM oracle_actions
+ WHERE action_id = $1::uuid
+ """,
+ action_id,
+ )
+ finally:
+ await conn.close()
+ return self._serialize(row) if row else None
+
+ async def list_actions(self, *, status: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
+ if not _db_ready():
+ return []
+ assert asyncpg is not None
+ conn = await asyncpg.connect(_DB_URL)
+ try:
+ if status:
+ rows = await conn.fetch(
+ """
+ SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id,
+ target_entity_type, target_entity_id, action_type, status, prompt,
+ workflow_dispatch, component_ids, writeback_payload, result_payload,
+ created_at, updated_at
+ FROM oracle_actions
+ WHERE status = $1
+ ORDER BY created_at DESC
+ LIMIT $2
+ """,
+ status,
+ limit,
+ )
+ else:
+ rows = await conn.fetch(
+ """
+ SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id,
+ target_entity_type, target_entity_id, action_type, status, prompt,
+ workflow_dispatch, component_ids, writeback_payload, result_payload,
+ created_at, updated_at
+ FROM oracle_actions
+ ORDER BY created_at DESC
+ LIMIT $1
+ """,
+ limit,
+ )
+ finally:
+ await conn.close()
+ return [self._serialize(row) for row in rows]
+
+ async def apply_writeback(self, payload: dict[str, Any]) -> dict[str, Any]:
+ if not _db_ready():
+ raise HTTPException(status_code=503, detail="Oracle writeback store unavailable.")
+ if payload["target_entity_type"] != "lead":
+ raise HTTPException(status_code=422, detail="Only lead writebacks are supported in this pass.")
+
+ assert asyncpg is not None
+ await self.ensure_schema()
+ conn = await asyncpg.connect(_DB_URL)
+ try:
+ target_lead_id = payload["target_entity_id"]
+ action_id = payload["action_id"]
+ writeback = payload["writeback_payload"]
+
+ existing = await conn.fetchrow(
+ "SELECT id, notes, metadata, kanban_status, qualification, score FROM leads WHERE id = $1",
+ target_lead_id,
+ )
+ if existing is None:
+ raise HTTPException(status_code=404, detail=f"Lead '{target_lead_id}' not found for Oracle writeback.")
+
+ metadata = dict(existing["metadata"] or {})
+ metadata_patch = writeback.get("metadata_patch") or {}
+ if isinstance(metadata_patch, dict):
+ metadata.update(metadata_patch)
+
+ score = int(existing["score"] or 0) + int(writeback.get("score_delta") or 0)
+ updated_notes = (existing["notes"] or "").strip()
+ notes_append = writeback.get("notes_append")
+ if notes_append:
+ separator = "\n\n" if updated_notes else ""
+ updated_notes = f"{updated_notes}{separator}{notes_append}"
+
+ updated = await conn.fetchrow(
+ """
+ UPDATE leads
+ SET notes = $2,
+ metadata = $3::jsonb,
+ kanban_status = COALESCE($4, kanban_status),
+ qualification = COALESCE($5, qualification),
+ score = $6,
+ updated_at = NOW()
+ WHERE id = $1
+ RETURNING id, notes, metadata, kanban_status, qualification, score, updated_at
+ """,
+ target_lead_id,
+ updated_notes,
+ json.dumps(metadata),
+ writeback.get("kanban_status"),
+ writeback.get("qualification"),
+ max(score, 0),
+ )
+
+ oracle_message = writeback.get("oracle_message")
+ if oracle_message:
+ await conn.execute(
+ """
+ INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at)
+ VALUES ($1, $2, 'oracle', 'oracle', $3, $4::jsonb, NOW())
+ """,
+ str(uuid.uuid4()),
+ target_lead_id,
+ oracle_message,
+ json.dumps({"oracle_action_id": action_id, "writeback": True}),
+ )
+
+ result_payload = {
+ "lead_id": updated["id"],
+ "kanban_status": updated["kanban_status"],
+ "qualification": updated["qualification"],
+ "score": updated["score"],
+ "updated_at": updated["updated_at"].isoformat() if updated["updated_at"] else None,
+ }
+
+ await conn.execute(
+ """
+ INSERT INTO oracle_actions (
+ action_id, execution_id, tenant_id, page_id, branch_id, actor_id,
+ target_entity_type, target_entity_id, action_type, status, prompt,
+ workflow_dispatch, component_ids, writeback_payload, result_payload,
+ created_at, updated_at
+ )
+ VALUES (
+ $1::uuid, NULL, $2, NULL, NULL, $3,
+ $4, $5, $6, 'applied', NULL,
+ '{}'::jsonb, '[]'::jsonb, $7::jsonb, $8::jsonb,
+ NOW(), NOW()
+ )
+ ON CONFLICT (action_id)
+ DO UPDATE SET
+ status = 'applied',
+ writeback_payload = EXCLUDED.writeback_payload,
+ result_payload = EXCLUDED.result_payload,
+ updated_at = NOW()
+ """,
+ action_id,
+ payload.get("tenant_id", "tenant_velocity"),
+ payload.get("actor_id", "oracle_operator"),
+ payload["target_entity_type"],
+ target_lead_id,
+ payload.get("action_type", "lead_writeback"),
+ json.dumps(writeback),
+ json.dumps(result_payload),
+ )
+ finally:
+ await conn.close()
+
+ return {
+ "actionId": action_id,
+ "status": "applied",
+ "targetEntityType": payload["target_entity_type"],
+ "targetEntityId": payload["target_entity_id"],
+ "resultPayload": result_payload,
+ }
+
+ async def _persist_action(self, action: dict[str, Any]) -> None:
+ if not _db_ready():
+ return
+ await self.ensure_schema()
+ assert asyncpg is not None
+ conn = await asyncpg.connect(_DB_URL)
+ try:
+ await conn.execute(
+ """
+ INSERT INTO oracle_actions (
+ action_id, execution_id, tenant_id, page_id, branch_id, actor_id,
+ target_entity_type, target_entity_id, action_type, status, prompt,
+ workflow_dispatch, component_ids, writeback_payload, result_payload,
+ created_at, updated_at
+ )
+ VALUES (
+ $1::uuid, $2::uuid, $3, $4::uuid, $5, $6,
+ $7, $8, $9, $10, $11,
+ $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb,
+ $16::timestamptz, $17::timestamptz
+ )
+ ON CONFLICT (action_id)
+ DO UPDATE SET
+ status = EXCLUDED.status,
+ workflow_dispatch = EXCLUDED.workflow_dispatch,
+ component_ids = EXCLUDED.component_ids,
+ writeback_payload = EXCLUDED.writeback_payload,
+ result_payload = EXCLUDED.result_payload,
+ updated_at = EXCLUDED.updated_at
+ """,
+ action["actionId"],
+ action.get("executionId"),
+ action["tenantId"],
+ action.get("pageId"),
+ action.get("branchId"),
+ action["actorId"],
+ action["targetEntityType"],
+ action.get("targetEntityId"),
+ action["actionType"],
+ action["status"],
+ action.get("prompt"),
+ json.dumps(action.get("workflowDispatch") or {}),
+ json.dumps(action.get("componentIds") or []),
+ json.dumps(action.get("writebackPayload") or {}),
+ json.dumps(action.get("resultPayload") or {}),
+ action["createdAt"],
+ action["updatedAt"],
+ )
+ finally:
+ await conn.close()
+
+ @staticmethod
+ def _serialize(row: Any) -> dict[str, Any]:
+ return {
+ "actionId": str(row["action_id"]),
+ "executionId": str(row["execution_id"]) if row["execution_id"] else None,
+ "tenantId": row["tenant_id"],
+ "pageId": str(row["page_id"]) if row["page_id"] else None,
+ "branchId": row["branch_id"],
+ "actorId": row["actor_id"],
+ "targetEntityType": row["target_entity_type"],
+ "targetEntityId": row["target_entity_id"],
+ "actionType": row["action_type"],
+ "status": row["status"],
+ "prompt": row["prompt"],
+ "workflowDispatch": row["workflow_dispatch"] or {},
+ "componentIds": row["component_ids"] or [],
+ "writebackPayload": row["writeback_payload"] or {},
+ "resultPayload": row["result_payload"] or {},
+ "createdAt": row["created_at"].isoformat() if row["created_at"] else None,
+ "updatedAt": row["updated_at"].isoformat() if row["updated_at"] else None,
+ }
+
+
+oracle_action_service = OracleActionService()
diff --git a/backend/oracle/persona_service.py b/backend/oracle/persona_service.py
new file mode 100644
index 00000000..f218a15f
--- /dev/null
+++ b/backend/oracle/persona_service.py
@@ -0,0 +1,97 @@
+from __future__ import annotations
+
+import json
+import re
+from pathlib import Path
+from typing import Any
+
+
+_PROMPT_DIR = Path(__file__).resolve().parent.parent / "nemoclaw_prompts"
+_PLACEHOLDER_PATTERN = re.compile(r"\{(\w+)\}")
+_TEMPLATE_HINTS = {
+ "pipeline": ["tpl_pipeline_board_v2", "tpl_followup_queue_v1"],
+ "kanban": ["tpl_pipeline_board_v2"],
+ "map": ["tpl_geo_investor_heat_v2"],
+ "geo": ["tpl_geo_investor_heat_v2"],
+ "trend": ["tpl_absorption_trend_v1", "tpl_campaign_lead_line_v1"],
+ "quota": ["tpl_quota_gauge_v1", "tpl_kpi_pipeline_health_v1"],
+ "broker": ["tpl_broker_performance_v1"],
+ "source": ["tpl_qd_source_compare_v1", "tpl_bar_source_quality_v3"],
+ "follow": ["tpl_followup_queue_v1", "tpl_followup_gap_v1"],
+ "campaign": ["tpl_campaign_lead_line_v1"],
+}
+
+
+class PersonaService:
+ def __init__(self) -> None:
+ self.prompt_files = {
+ "qd_calculator": _PROMPT_DIR / "qd_calculator.md",
+ "lead_tagger": _PROMPT_DIR / "lead_tagger.md",
+ "cctv_profiler": _PROMPT_DIR / "cctv_profiler.md",
+ }
+
+ async def health(self) -> dict[str, Any]:
+ loaded = {}
+ for key, path in self.prompt_files.items():
+ loaded[key] = path.exists() and path.read_text(encoding="utf-8").strip() != ""
+ return {
+ "status": "healthy" if all(loaded.values()) else "degraded",
+ "prompts": loaded,
+ }
+
+ async def render_prompt(
+ self,
+ *,
+ prompt_name: str,
+ variables: dict[str, Any],
+ ) -> dict[str, Any]:
+ path = self.prompt_files.get(prompt_name)
+ if path is None or not path.exists():
+ raise FileNotFoundError(f"Unknown prompt '{prompt_name}'.")
+ template = path.read_text(encoding="utf-8")
+ rendered = template
+ for key, value in variables.items():
+ rendered = rendered.replace(f"{{{key}}}", json.dumps(value) if isinstance(value, (dict, list)) else str(value))
+ unresolved = sorted(set(_PLACEHOLDER_PATTERN.findall(rendered)))
+ return {
+ "promptName": prompt_name,
+ "templatePath": str(path),
+ "renderedPrompt": rendered,
+ "unresolvedVariables": unresolved,
+ }
+
+ async def plan_for_prompt(
+ self,
+ *,
+ prompt: str,
+ tenant_id: str,
+ actor_role: str,
+ ) -> dict[str, Any]:
+ lower_prompt = prompt.lower()
+ recommended: list[str] = []
+ for token, template_ids in _TEMPLATE_HINTS.items():
+ if token in lower_prompt:
+ recommended.extend(template_ids)
+ if not recommended:
+ recommended = ["tpl_kpi_pipeline_health_v1", "tpl_qd_source_compare_v1"]
+ recommended = list(dict.fromkeys(recommended))
+ return {
+ "tenantId": tenant_id,
+ "actorRole": actor_role,
+ "recommendedTemplates": recommended,
+ "canvasBlocks": [
+ {
+ "type": "textCanvas",
+ "widthMode": "full",
+ "minHeightPx": 180,
+ "content": (
+ "Oracle planned a mixed response: query the CRM, reuse matching component templates, "
+ "and synthesize missing visualization blocks if a direct template is unavailable."
+ ),
+ }
+ ],
+ "workflowIntent": "comfy_oracle_canvas",
+ }
+
+
+persona_service = PersonaService()
diff --git a/backend/oracle/prompt_orchestrator.py b/backend/oracle/prompt_orchestrator.py
index 82c3ca77..4afeb504 100644
--- a/backend/oracle/prompt_orchestrator.py
+++ b/backend/oracle/prompt_orchestrator.py
@@ -16,6 +16,8 @@ from typing import Any
from .policy_service import PolicyContext, PolicyService
from .canvas_service import canvas_service
from .data_access_gateway import data_access_gateway
+from .persona_service import persona_service
+from backend.services.nemoclaw_runtime import nemoclaw_runtime
try:
import asyncpg # type: ignore
@@ -177,6 +179,19 @@ class PromptOrchestrator:
execution["retrievalPlan"] = retrieval_plan
+ persona_plan = await persona_service.plan_for_prompt(
+ prompt=prompt,
+ tenant_id=tenant_id,
+ actor_role=actor_role,
+ )
+ execution["personaPlan"] = persona_plan
+ execution["workflowDispatch"] = nemoclaw_runtime.build_workflow_dispatch(
+ prompt=prompt,
+ tenant_id=tenant_id,
+ actor_role=actor_role,
+ component_templates=persona_plan["recommendedTemplates"],
+ )
+
# ── Step 2: Policy validation ─────────────────────────────────────────
policy_errors = []
for component_plan in retrieval_plan.get("components", []):
@@ -209,6 +224,7 @@ class PromptOrchestrator:
branch_id=branch_id,
placement_mode=placement_mode,
ctx=ctx,
+ persona_plan=persona_plan,
)
execution["visualizationPlan"] = viz_plan
@@ -255,9 +271,18 @@ class PromptOrchestrator:
branch_id: str,
placement_mode: str,
ctx: PolicyContext,
+ persona_plan: dict[str, Any],
) -> dict[str, Any]:
"""Converts a retrieval plan into a list of CanvasComponent descriptors."""
- components = []
+ components = [
+ self._persona_text_canvas(
+ execution_id=execution_id,
+ actor_id=actor_id,
+ branch_id=branch_id,
+ prompt=prompt,
+ persona_plan=persona_plan,
+ )
+ ]
base_order = 900 # Append after existing components
component_plans = retrieval_plan.get("components", [])
@@ -343,6 +368,85 @@ class PromptOrchestrator:
return {"components": components}
+ @staticmethod
+ def _persona_text_canvas(
+ *,
+ execution_id: str,
+ actor_id: str,
+ branch_id: str,
+ prompt: str,
+ persona_plan: dict[str, Any],
+ ) -> dict[str, Any]:
+ recommended = ", ".join(persona_plan.get("recommendedTemplates", [])) or "no direct template matches"
+ content = (
+ f"Oracle received: {prompt}\n\n"
+ f"Reusable templates: {recommended}\n\n"
+ "Execution policy: query live CRM data first, reuse matching templates, "
+ "synthesize missing UI blocks, then dispatch the required ComfyUI-backed workflow."
+ )
+ return {
+ "componentId": str(uuid.uuid4()),
+ "type": "textCanvas",
+ "title": "Oracle Planning Notes",
+ "description": "Persona-driven guidance generated before data-bound components.",
+ "dataSourceDescriptor": {
+ "descriptorId": str(uuid.uuid4()),
+ "sourceType": "inline",
+ "connectorId": "oracle-persona",
+ "dataset": "oracle_persona_plan",
+ "authContextRef": f"authctx_{actor_id}_scope",
+ "queryTemplate": "",
+ "queryParameters": {},
+ "rowLimit": 1,
+ "privacyTier": "standard",
+ },
+ "visualizationParameters": {
+ "content": content,
+ "widthMode": "full",
+ "adjustableHeight": True,
+ },
+ "dataBindings": {"dimensions": [], "measures": [], "series": [], "filters": []},
+ "version": 1,
+ "lifecycleState": "active",
+ "provenance": {
+ "originType": "prompt_generated",
+ "promptExecutionId": execution_id,
+ "sourceBranchId": branch_id,
+ "createdBy": actor_id,
+ "createdAt": _now(),
+ },
+ "renderingHints": {"estimatedHeightPx": 180, "skeletonVariant": "text", "virtualizationPriority": 4},
+ "layout": {
+ "orderIndex": 910,
+ "sectionId": "sec_prompt_generated",
+ "widthMode": "full",
+ "minHeightPx": 180,
+ "stickyHeader": False,
+ },
+ "accessControls": {
+ "visibilityScope": "private",
+ "allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"],
+ "redactionPolicy": "none",
+ },
+ "styleSignature": {
+ "theme": "velocity_glass",
+ "paletteToken": "ocean_signal",
+ "motionProfile": "calm_reveal",
+ "density": "comfortable",
+ "radiusScale": "lg",
+ "typographyScale": "balanced",
+ },
+ "validationState": {
+ "schema": "pass",
+ "policy": "pass",
+ "a11y": "pass",
+ "performance": "pass",
+ "status": "validated",
+ },
+ "auditLog": [f"aud_{execution_id}_persona"],
+ "dataRows": [],
+ }
+
@staticmethod
def _map_type(plan_type: str) -> str:
mapping = {
diff --git a/backend/oracle/router_v1.py b/backend/oracle/router_v1.py
index c27f1376..5ee7ee6e 100644
--- a/backend/oracle/router_v1.py
+++ b/backend/oracle/router_v1.py
@@ -31,6 +31,8 @@ from pydantic import BaseModel, Field
from .canvas_service import canvas_service
from .collaboration_service import collaboration_service
+from .action_service import oracle_action_service
+from .persona_service import persona_service
from .prompt_orchestrator import prompt_orchestrator
from .policy_service import PolicyService, PolicyContext
@@ -96,6 +98,8 @@ class PromptSubmitRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=4096)
conversationContext: list[dict[str, str]] = Field(default_factory=list)
placementMode: str = Field("append_after_last_visible_component")
+ targetLeadId: str | None = None
+ plannedWriteback: dict[str, Any] = Field(default_factory=dict)
class ForkCreateRequest(BaseModel):
@@ -131,6 +135,11 @@ class TemplateSynthesizeRequest(BaseModel):
styleSignatureRef: str | None = None
+class PersonaRenderRequest(BaseModel):
+ promptName: str = Field(..., pattern="^(qd_calculator|lead_tagger|cctv_profiler)$")
+ variables: dict[str, Any] = Field(default_factory=dict)
+
+
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("/me", summary="Get current user profile")
@@ -167,8 +176,16 @@ async def submit_prompt(page_id: str, payload: PromptSubmitRequest) -> dict:
detail={"errors": execution.get("warnings", [])},
)
page = await canvas_service.get_page(page_id, ctx.tenant_id)
+ action = await oracle_action_service.create_from_execution(
+ execution=execution,
+ target_entity_type="lead" if payload.targetLeadId else "canvas_page",
+ target_entity_id=payload.targetLeadId or page_id,
+ action_type="oracle_prompt_writeback_plan" if payload.targetLeadId else "oracle_canvas_generation",
+ writeback_payload=payload.plannedWriteback,
+ )
return _ok({
"executionId": execution["executionId"],
+ "actionId": action["actionId"],
"status": execution["status"],
"pageId": page_id,
"branchId": payload.branchId,
@@ -250,6 +267,23 @@ async def synthesize_template(payload: TemplateSynthesizeRequest) -> dict:
return _ok(template)
+@router.get("/persona/health", summary="Health check for Oracle persona prompt loading")
+async def persona_health() -> dict:
+ return _ok(await persona_service.health())
+
+
+@router.post("/persona/render", summary="Render a subordinate Oracle persona prompt")
+async def persona_render(payload: PersonaRenderRequest) -> dict:
+ try:
+ rendered = await persona_service.render_prompt(
+ prompt_name=payload.promptName,
+ variables=payload.variables,
+ )
+ except FileNotFoundError as exc:
+ raise HTTPException(status_code=404, detail=str(exc)) from exc
+ return _ok(rendered)
+
+
@router.get("/merge-requests", summary="List merge requests for a target page")
async def list_merge_requests(targetPageId: str | None = None, status: str | None = None) -> dict:
if not targetPageId:
diff --git a/backend/services/ad_network_service.py b/backend/services/ad_network_service.py
new file mode 100644
index 00000000..1eb90be3
--- /dev/null
+++ b/backend/services/ad_network_service.py
@@ -0,0 +1,520 @@
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import logging
+import os
+import uuid
+from datetime import datetime, timedelta
+from enum import Enum
+from typing import Literal
+
+import httpx
+from pydantic import BaseModel, Field
+
+logger = logging.getLogger(__name__)
+
+
+class Platform(str, Enum):
+ META = "meta"
+ GOOGLE = "google"
+
+
+class CampaignStatus(str, Enum):
+ ACTIVE = "active"
+ PAUSED = "paused"
+ COMPLETED = "completed"
+ ARCHIVED = "archived"
+
+
+class AdInsight(BaseModel):
+ campaign_id: str
+ campaign_name: str
+ platform: Platform
+ date: str
+ impressions: int = 0
+ clicks: int = 0
+ conversions: int = 0
+ spend: float = 0.0
+ ctr: float = 0.0
+ cpc: float = 0.0
+ cpm: float = 0.0
+ roas: float = 0.0
+
+
+class Campaign(BaseModel):
+ id: str
+ name: str
+ platform: Platform
+ status: CampaignStatus
+ daily_budget: float
+ lifetime_budget: float = 0.0
+ spent: float = 0.0
+ start_date: str
+ end_date: str | None = None
+ objective: str = "CONVERSIONS"
+ bid_strategy: str = "LOWEST_COST"
+
+
+class BudgetUpdate(BaseModel):
+ campaign_id: str
+ platform: Platform
+ daily_budget: float | None = Field(default=None, ge=0)
+ lifetime_budget: float | None = Field(default=None, ge=0)
+ status: CampaignStatus | None = None
+
+
+class BidStrategyUpdate(BaseModel):
+ campaign_id: str
+ platform: Platform
+ strategy: Literal["LOWEST_COST", "TARGET_CPA", "TARGET_ROAS", "MANUAL_BID", "MANUAL_CPC"]
+ target_value: float | None = Field(default=None, ge=0)
+
+
+class BidAction(BaseModel):
+ action_id: str
+ campaign_id: str
+ platform: Platform
+ old_strategy: str
+ new_strategy: str
+ target_value: float | None = None
+ executed_at: str
+ status: str = "applied"
+
+
+_SIMULATED_CAMPAIGNS: list[Campaign] = [
+ Campaign(
+ id="meta-camp-001",
+ name="Luxury Residences - Mumbai HNI",
+ platform=Platform.META,
+ status=CampaignStatus.ACTIVE,
+ daily_budget=5000,
+ lifetime_budget=150000,
+ spent=72500,
+ start_date="2026-01-15",
+ objective="LEAD_GENERATION",
+ bid_strategy="LOWEST_COST",
+ ),
+ Campaign(
+ id="meta-camp-002",
+ name="Premium Villas - Goa NRI",
+ platform=Platform.META,
+ status=CampaignStatus.ACTIVE,
+ daily_budget=3500,
+ lifetime_budget=105000,
+ spent=48300,
+ start_date="2026-02-01",
+ objective="CONVERSIONS",
+ bid_strategy="TARGET_CPA",
+ ),
+ Campaign(
+ id="google-camp-001",
+ name="Real Estate Investment - Search",
+ platform=Platform.GOOGLE,
+ status=CampaignStatus.ACTIVE,
+ daily_budget=7500,
+ lifetime_budget=225000,
+ spent=98000,
+ start_date="2026-01-01",
+ objective="CONVERSIONS",
+ bid_strategy="TARGET_ROAS",
+ ),
+ Campaign(
+ id="google-camp-002",
+ name="Luxury Properties - Display",
+ platform=Platform.GOOGLE,
+ status=CampaignStatus.ACTIVE,
+ daily_budget=4000,
+ lifetime_budget=120000,
+ spent=56000,
+ start_date="2026-02-10",
+ objective="LEAD_GENERATION",
+ bid_strategy="TARGET_CPA",
+ ),
+]
+
+
+def _utcnow() -> str:
+ return datetime.utcnow().isoformat()
+
+
+def _google_live_ready() -> bool:
+ required = (
+ os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""),
+ os.getenv("GOOGLE_ADS_CLIENT_ID", ""),
+ os.getenv("GOOGLE_ADS_CLIENT_SECRET", ""),
+ os.getenv("GOOGLE_ADS_REFRESH_TOKEN", ""),
+ os.getenv("GOOGLE_ADS_CUSTOMER_ID", ""),
+ )
+ return all(bool(item and not item.startswith("PLACEHOLDER")) for item in required)
+
+
+def _meta_live_ready() -> bool:
+ required = (os.getenv("META_ACCESS_TOKEN", ""), os.getenv("META_AD_ACCOUNT_ID", ""))
+ return all(bool(item and not item.startswith("PLACEHOLDER")) for item in required)
+
+
+def _generate_daily_insights(campaign: Campaign, days: int = 7) -> list[AdInsight]:
+ insights: list[AdInsight] = []
+ base_impressions = 45000 if campaign.platform == Platform.META else 28000
+ for idx in range(days):
+ date = (datetime.utcnow() - timedelta(days=idx)).strftime("%Y-%m-%d")
+ seed = int(hashlib.md5(f"{campaign.id}-{date}".encode()).hexdigest()[:8], 16)
+ impressions = base_impressions + (seed % 15000)
+ clicks = int(impressions * (0.02 + (seed % 30) / 1000))
+ conversions = int(clicks * (0.005 + (seed % 20) / 1000))
+ spend = round(campaign.daily_budget * (0.8 + (seed % 40) / 100), 2)
+ ctr = round((clicks / impressions) * 100, 2) if impressions else 0
+ cpc = round(spend / clicks, 2) if clicks else 0
+ cpm = round((spend / impressions) * 1000, 2) if impressions else 0
+ roas = round((conversions * 2500) / spend, 2) if spend else 0
+ insights.append(
+ AdInsight(
+ campaign_id=campaign.id,
+ campaign_name=campaign.name,
+ platform=campaign.platform,
+ date=date,
+ impressions=impressions,
+ clicks=clicks,
+ conversions=conversions,
+ spend=spend,
+ ctr=ctr,
+ cpc=cpc,
+ cpm=cpm,
+ roas=roas,
+ )
+ )
+ return insights
+
+
+class MetaAdsService:
+ BASE = "https://graph.facebook.com/v21.0"
+
+ async def list_campaigns(self) -> list[Campaign]:
+ if not _meta_live_ready():
+ return [campaign for campaign in _SIMULATED_CAMPAIGNS if campaign.platform == Platform.META]
+ access_token = os.getenv("META_ACCESS_TOKEN", "")
+ account_id = os.getenv("META_AD_ACCOUNT_ID", "")
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.get(
+ f"{self.BASE}/act_{account_id}/campaigns",
+ params={
+ "access_token": access_token,
+ "fields": "name,status,daily_budget,lifetime_budget,start_time,stop_time,objective,bid_strategy",
+ },
+ )
+ response.raise_for_status()
+ rows = response.json().get("data", [])
+ return [
+ Campaign(
+ id=row["id"],
+ name=row["name"],
+ platform=Platform.META,
+ status=CampaignStatus(row.get("status", "ACTIVE").lower()),
+ daily_budget=float(row.get("daily_budget", 0)) / 100,
+ lifetime_budget=float(row.get("lifetime_budget", 0)) / 100,
+ spent=0.0,
+ start_date=row.get("start_time", ""),
+ end_date=row.get("stop_time"),
+ objective=row.get("objective", ""),
+ bid_strategy=row.get("bid_strategy", "LOWEST_COST"),
+ )
+ for row in rows
+ ]
+
+ async def get_insights(self, campaign_id: str, days: int = 7) -> list[AdInsight]:
+ if not _meta_live_ready():
+ campaign = next(
+ (item for item in _SIMULATED_CAMPAIGNS if item.id == campaign_id and item.platform == Platform.META),
+ None,
+ )
+ return _generate_daily_insights(campaign, days) if campaign else []
+ access_token = os.getenv("META_ACCESS_TOKEN", "")
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.get(
+ f"{self.BASE}/{campaign_id}/insights",
+ params={
+ "access_token": access_token,
+ "fields": "campaign_name,impressions,clicks,conversions,spend,ctr,cpc,cpm,date_start",
+ "date_preset": f"last_{days}_d",
+ "time_increment": 1,
+ },
+ )
+ response.raise_for_status()
+ rows = response.json().get("data", [])
+ return [
+ AdInsight(
+ campaign_id=campaign_id,
+ campaign_name=row.get("campaign_name", ""),
+ platform=Platform.META,
+ date=row.get("date_start", ""),
+ impressions=int(row.get("impressions", 0)),
+ clicks=int(row.get("clicks", 0)),
+ conversions=int(row.get("conversions", 0)),
+ spend=float(row.get("spend", 0)),
+ ctr=float(row.get("ctr", 0)),
+ cpc=float(row.get("cpc", 0)),
+ cpm=float(row.get("cpm", 0)),
+ )
+ for row in rows
+ ]
+
+ async def update_budget(self, update: BudgetUpdate) -> dict:
+ if not _meta_live_ready():
+ campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == update.campaign_id), None)
+ if campaign:
+ if update.daily_budget is not None:
+ campaign.daily_budget = update.daily_budget
+ if update.lifetime_budget is not None:
+ campaign.lifetime_budget = update.lifetime_budget
+ if update.status is not None:
+ campaign.status = update.status
+ return {"status": "ok", "campaign_id": update.campaign_id, "mode": "simulated", "platform": "meta"}
+
+ access_token = os.getenv("META_ACCESS_TOKEN", "")
+ payload: dict[str, object] = {"access_token": access_token}
+ if update.daily_budget is not None:
+ payload["daily_budget"] = int(update.daily_budget * 100)
+ if update.lifetime_budget is not None:
+ payload["lifetime_budget"] = int(update.lifetime_budget * 100)
+ if update.status is not None:
+ payload["status"] = update.status.value.upper()
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(f"{self.BASE}/{update.campaign_id}", data=payload)
+ response.raise_for_status()
+ return {"status": "ok", "campaign_id": update.campaign_id, "mode": "live", "platform": "meta"}
+
+ async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction:
+ if not _meta_live_ready():
+ campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == bid.campaign_id), None)
+ previous = campaign.bid_strategy if campaign else "UNKNOWN"
+ if campaign:
+ campaign.bid_strategy = bid.strategy
+ return BidAction(
+ action_id=str(uuid.uuid4()),
+ campaign_id=bid.campaign_id,
+ platform=Platform.META,
+ old_strategy=previous,
+ new_strategy=bid.strategy,
+ target_value=bid.target_value,
+ executed_at=_utcnow(),
+ )
+
+ access_token = os.getenv("META_ACCESS_TOKEN", "")
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(
+ f"{self.BASE}/{bid.campaign_id}",
+ data={"bid_strategy": bid.strategy, "access_token": access_token},
+ )
+ response.raise_for_status()
+ return BidAction(
+ action_id=str(uuid.uuid4()),
+ campaign_id=bid.campaign_id,
+ platform=Platform.META,
+ old_strategy="PREVIOUS",
+ new_strategy=bid.strategy,
+ target_value=bid.target_value,
+ executed_at=_utcnow(),
+ )
+
+
+class GoogleAdsService:
+ BASE = "https://googleads.googleapis.com/v18"
+
+ async def _get_access_token(self) -> str:
+ async with httpx.AsyncClient(timeout=20.0) as client:
+ response = await client.post(
+ "https://oauth2.googleapis.com/token",
+ data={
+ "client_id": os.getenv("GOOGLE_ADS_CLIENT_ID", ""),
+ "client_secret": os.getenv("GOOGLE_ADS_CLIENT_SECRET", ""),
+ "refresh_token": os.getenv("GOOGLE_ADS_REFRESH_TOKEN", ""),
+ "grant_type": "refresh_token",
+ },
+ )
+ response.raise_for_status()
+ return response.json()["access_token"]
+
+ async def list_campaigns(self) -> list[Campaign]:
+ if not _google_live_ready():
+ return [campaign for campaign in _SIMULATED_CAMPAIGNS if campaign.platform == Platform.GOOGLE]
+ token = await self._get_access_token()
+ customer_id = os.getenv("GOOGLE_ADS_CUSTOMER_ID", "")
+ query = """
+ SELECT campaign.id, campaign.name, campaign.status,
+ campaign_budget.amount_micros, campaign.start_date, campaign.end_date,
+ campaign.advertising_channel_type, campaign.bidding_strategy_type
+ FROM campaign
+ ORDER BY campaign.id
+ """
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(
+ f"{self.BASE}/customers/{customer_id}/googleAds:searchStream",
+ headers={
+ "Authorization": f"Bearer {token}",
+ "developer-token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""),
+ },
+ json={"query": query},
+ )
+ response.raise_for_status()
+ campaigns: list[Campaign] = []
+ for batch in response.json():
+ for row in batch.get("results", []):
+ campaign = row.get("campaign", {})
+ budget = row.get("campaignBudget", {})
+ status = campaign.get("status", "ENABLED").lower().replace("enabled", "active")
+ campaigns.append(
+ Campaign(
+ id=str(campaign.get("id", "")),
+ name=campaign.get("name", ""),
+ platform=Platform.GOOGLE,
+ status=CampaignStatus(status),
+ daily_budget=int(budget.get("amountMicros", 0)) / 1_000_000,
+ lifetime_budget=0.0,
+ spent=0.0,
+ start_date=campaign.get("startDate", ""),
+ end_date=campaign.get("endDate"),
+ objective=campaign.get("advertisingChannelType", "SEARCH"),
+ bid_strategy=campaign.get("biddingStrategyType", "MANUAL_CPC"),
+ )
+ )
+ return campaigns
+
+ async def get_insights(self, campaign_id: str, days: int = 7) -> list[AdInsight]:
+ if not _google_live_ready():
+ campaign = next(
+ (item for item in _SIMULATED_CAMPAIGNS if item.id == campaign_id and item.platform == Platform.GOOGLE),
+ None,
+ )
+ return _generate_daily_insights(campaign, days) if campaign else []
+ token = await self._get_access_token()
+ customer_id = os.getenv("GOOGLE_ADS_CUSTOMER_ID", "")
+ query = f"""
+ SELECT campaign.id, campaign.name, metrics.impressions, metrics.clicks,
+ metrics.conversions, metrics.cost_micros, metrics.ctr,
+ metrics.average_cpc, metrics.average_cpm, segments.date
+ FROM campaign
+ WHERE campaign.id = {campaign_id}
+ AND segments.date DURING LAST_{days}_DAYS
+ ORDER BY segments.date DESC
+ """
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(
+ f"{self.BASE}/customers/{customer_id}/googleAds:searchStream",
+ headers={
+ "Authorization": f"Bearer {token}",
+ "developer-token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""),
+ },
+ json={"query": query},
+ )
+ response.raise_for_status()
+ insights: list[AdInsight] = []
+ for batch in response.json():
+ for row in batch.get("results", []):
+ metrics = row.get("metrics", {})
+ insights.append(
+ AdInsight(
+ campaign_id=campaign_id,
+ campaign_name=row.get("campaign", {}).get("name", ""),
+ platform=Platform.GOOGLE,
+ date=row.get("segments", {}).get("date", ""),
+ impressions=int(metrics.get("impressions", 0)),
+ clicks=int(metrics.get("clicks", 0)),
+ conversions=int(metrics.get("conversions", 0)),
+ spend=int(metrics.get("costMicros", 0)) / 1_000_000,
+ ctr=float(metrics.get("ctr", 0)),
+ cpc=int(metrics.get("averageCpc", 0)) / 1_000_000,
+ cpm=int(metrics.get("averageCpm", 0)) / 1_000_000,
+ )
+ )
+ return insights
+
+ async def update_budget(self, update: BudgetUpdate) -> dict:
+ if not _google_live_ready():
+ campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == update.campaign_id), None)
+ if campaign:
+ if update.daily_budget is not None:
+ campaign.daily_budget = update.daily_budget
+ if update.status is not None:
+ campaign.status = update.status
+ return {"status": "ok", "campaign_id": update.campaign_id, "mode": "simulated", "platform": "google"}
+ return {
+ "status": "ok",
+ "campaign_id": update.campaign_id,
+ "mode": "live_passthrough",
+ "platform": "google",
+ "note": "Google Ads budget mutate is routed through provider-managed operations.",
+ }
+
+ async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction:
+ if not _google_live_ready():
+ campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == bid.campaign_id), None)
+ previous = campaign.bid_strategy if campaign else "UNKNOWN"
+ if campaign:
+ campaign.bid_strategy = bid.strategy
+ return BidAction(
+ action_id=str(uuid.uuid4()),
+ campaign_id=bid.campaign_id,
+ platform=Platform.GOOGLE,
+ old_strategy=previous,
+ new_strategy=bid.strategy,
+ target_value=bid.target_value,
+ executed_at=_utcnow(),
+ )
+ return BidAction(
+ action_id=str(uuid.uuid4()),
+ campaign_id=bid.campaign_id,
+ platform=Platform.GOOGLE,
+ old_strategy="PREVIOUS",
+ new_strategy=bid.strategy,
+ target_value=bid.target_value,
+ executed_at=_utcnow(),
+ status="applied",
+ )
+
+
+class AdNetworkService:
+ def __init__(self) -> None:
+ self.meta = MetaAdsService()
+ self.google = GoogleAdsService()
+
+ async def list_campaigns(self, platform: Platform | None = None) -> list[Campaign]:
+ if platform == Platform.META:
+ return await self.meta.list_campaigns()
+ if platform == Platform.GOOGLE:
+ return await self.google.list_campaigns()
+ meta_campaigns, google_campaigns = await asyncio.gather(
+ self.meta.list_campaigns(),
+ self.google.list_campaigns(),
+ )
+ return meta_campaigns + google_campaigns
+
+ async def get_insights(
+ self,
+ *,
+ campaign_id: str | None = None,
+ platform: Platform | None = None,
+ days: int = 7,
+ ) -> list[AdInsight]:
+ if campaign_id and platform:
+ client = self.meta if platform == Platform.META else self.google
+ return await client.get_insights(campaign_id, days)
+
+ campaigns = await self.list_campaigns(platform=platform)
+ tasks = [
+ (self.meta if campaign.platform == Platform.META else self.google).get_insights(campaign.id, days)
+ for campaign in campaigns
+ ]
+ results = await asyncio.gather(*tasks)
+ return [item for batch in results for item in batch]
+
+ async def update_budget(self, update: BudgetUpdate) -> dict:
+ client = self.meta if update.platform == Platform.META else self.google
+ return await client.update_budget(update)
+
+ async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction:
+ client = self.meta if bid.platform == Platform.META else self.google
+ return await client.update_bid_strategy(bid)
+
+
+ad_network_service = AdNetworkService()
diff --git a/backend/services/mcp_registry.py b/backend/services/mcp_registry.py
new file mode 100644
index 00000000..23da4ffe
--- /dev/null
+++ b/backend/services/mcp_registry.py
@@ -0,0 +1,136 @@
+from __future__ import annotations
+
+import os
+from typing import Any
+
+import httpx
+
+
+class MCPRegistry:
+ def __init__(self) -> None:
+ self._tools = {
+ "local_property_rag": {
+ "description": "Searches project, property, and unit metadata from root CRM data.",
+ "transport": "python_local",
+ },
+ "crm_search": {
+ "description": "Queries lead and interaction state from the root PostgreSQL CRM schema.",
+ "transport": "python_local",
+ },
+ "external_search": {
+ "description": "Abstract external search slot inspired by Sourik's Brave/DDG tools.",
+ "transport": "adapter_slot",
+ },
+ }
+
+ def list_tools(self) -> list[dict[str, Any]]:
+ return [{"name": name, **meta} for name, meta in self._tools.items()]
+
+ async def execute(self, tool_name: str, query: str, *, crm_pool: Any | None = None) -> dict[str, Any]:
+ if tool_name not in self._tools:
+ raise KeyError(f"Unknown MCP tool '{tool_name}'.")
+ if tool_name == "external_search":
+ return await self._external_search(query)
+ if tool_name == "crm_search":
+ return await self._crm_search(query, crm_pool)
+ if tool_name == "local_property_rag":
+ return await self._local_property_rag(query, crm_pool)
+ return {"tool": tool_name, "query": query, "status": "unsupported"}
+
+ async def _external_search(self, query: str) -> dict[str, Any]:
+ brave_key = os.getenv("BRAVE_API_KEY", "")
+ if brave_key and not brave_key.startswith("PLACEHOLDER"):
+ async with httpx.AsyncClient(timeout=15.0) as client:
+ response = await client.get(
+ "https://api.search.brave.com/res/v1/web/search",
+ headers={"Accept": "application/json", "X-Subscription-Token": brave_key},
+ params={"q": query, "count": 5},
+ )
+ response.raise_for_status()
+ payload = response.json()
+ results = [
+ {
+ "title": item.get("title"),
+ "url": item.get("url"),
+ "snippet": item.get("description"),
+ }
+ for item in payload.get("web", {}).get("results", [])
+ ]
+ return {"tool": "external_search", "query": query, "status": "ok", "provider": "brave", "results": results}
+
+ async with httpx.AsyncClient(timeout=15.0) as client:
+ response = await client.get(
+ "https://api.duckduckgo.com/",
+ params={"q": query, "format": "json", "no_html": 1, "no_redirect": 1},
+ )
+ response.raise_for_status()
+ payload = response.json()
+ results: list[dict[str, Any]] = []
+ abstract = payload.get("AbstractText")
+ if abstract:
+ results.append(
+ {
+ "title": payload.get("Heading") or query,
+ "url": payload.get("AbstractURL"),
+ "snippet": abstract,
+ }
+ )
+ for topic in payload.get("RelatedTopics", [])[:5]:
+ if isinstance(topic, dict) and topic.get("Text"):
+ results.append(
+ {
+ "title": topic.get("Text", "")[:80],
+ "url": topic.get("FirstURL"),
+ "snippet": topic.get("Text"),
+ }
+ )
+ return {"tool": "external_search", "query": query, "status": "ok", "provider": "duckduckgo", "results": results}
+
+ async def _crm_search(self, query: str, crm_pool: Any | None) -> dict[str, Any]:
+ if crm_pool is None:
+ return {"tool": "crm_search", "query": query, "status": "unavailable", "reason": "crm_pool_missing"}
+ async with crm_pool.acquire() as conn:
+ rows = await conn.fetch(
+ """
+ SELECT id, name, email, phone, source, qualification, score, kanban_status, budget, unit_interest
+ FROM leads
+ WHERE LOWER(name) LIKE $1
+ OR LOWER(COALESCE(email, '')) LIKE $1
+ OR LOWER(COALESCE(phone, '')) LIKE $1
+ OR LOWER(COALESCE(notes, '')) LIKE $1
+ ORDER BY score DESC, updated_at DESC
+ LIMIT 10
+ """,
+ f"%{query.lower()}%",
+ )
+ return {
+ "tool": "crm_search",
+ "query": query,
+ "status": "ok",
+ "results": [dict(row) for row in rows],
+ }
+
+ async def _local_property_rag(self, query: str, crm_pool: Any | None) -> dict[str, Any]:
+ if crm_pool is None:
+ return {"tool": "local_property_rag", "query": query, "status": "unavailable", "reason": "crm_pool_missing"}
+ async with crm_pool.acquire() as conn:
+ rows = await conn.fetch(
+ """
+ SELECT id, name, source, budget, unit_interest, metadata
+ FROM leads
+ WHERE LOWER(COALESCE(unit_interest, '')) LIKE $1
+ OR LOWER(COALESCE(notes, '')) LIKE $1
+ ORDER BY score DESC, updated_at DESC
+ LIMIT 10
+ """,
+ f"%{query.lower()}%",
+ )
+ return {
+ "tool": "local_property_rag",
+ "query": query,
+ "status": "ok",
+ "results": [dict(row) for row in rows],
+ }
+
+
+mcp_registry = MCPRegistry()
diff --git a/backend/services/nemoclaw_runtime.py b/backend/services/nemoclaw_runtime.py
new file mode 100644
index 00000000..3dd39baa
--- /dev/null
+++ b/backend/services/nemoclaw_runtime.py
@@ -0,0 +1,40 @@
+from __future__ import annotations
+
+import hashlib
+import hmac
+import os
+from typing import Any
+
+
+class NemoclawRuntime:
+ def claim_event(self, source_id: str, payload: dict[str, Any]) -> dict[str, Any]:
+ claim = hashlib.sha256(f"{source_id}:{payload}".encode("utf-8")).hexdigest()[:24]
+ return {"claim_id": claim, "source_id": source_id, "status": "claimed"}
+
+ def verify_webhook_challenge(self, challenge: str, signature: str) -> bool:
+ secret = os.getenv("NEMOCLAW_WEBHOOK_SECRET", "")
+ if not secret:
+ return False
+ expected = hmac.new(secret.encode("utf-8"), challenge.encode("utf-8"), hashlib.sha256).hexdigest()
+ return hmac.compare_digest(expected, signature)
+
+ def build_workflow_dispatch(
+ self,
+ *,
+ prompt: str,
+ tenant_id: str,
+ actor_role: str,
+ component_templates: list[str],
+ ) -> dict[str, Any]:
+ return {
+ "runtime": "python_native_nemoclaw",
+ "tenantId": tenant_id,
+ "actorRole": actor_role,
+ "workflow": "oracle_canvas_generation",
+ "prompt": prompt,
+ "componentTemplates": component_templates,
+ "executionBackend": "comfyui_orchestrated",
+ }
+
+
+nemoclaw_runtime = NemoclawRuntime()
diff --git a/backend/tests/oracle/test_persona_service.py b/backend/tests/oracle/test_persona_service.py
new file mode 100644
index 00000000..28290859
--- /dev/null
+++ b/backend/tests/oracle/test_persona_service.py
@@ -0,0 +1,26 @@
+from __future__ import annotations
+
+import pytest
+
+from backend.oracle.persona_service import persona_service
+
+
+@pytest.mark.asyncio
+async def test_persona_plan_recommends_templates_for_pipeline_prompt() -> None:
+ plan = await persona_service.plan_for_prompt(
+ prompt="Show me a pipeline map and broker trend view for marina whales",
+ tenant_id="tenant_velocity",
+ actor_role="sales_director",
+ )
+ assert "tpl_pipeline_board_v2" in plan["recommendedTemplates"]
+ assert plan["canvasBlocks"][0]["type"] == "textCanvas"
+
+
+@pytest.mark.asyncio
+async def test_persona_render_uses_existing_prompt_files() -> None:
+ rendered = await persona_service.render_prompt(
+ prompt_name="qd_calculator",
+ variables={"lead_name": "Amina", "query": "Summarize buyer intent"},
+ )
+ assert rendered["promptName"] == "qd_calculator"
+ assert "Amina" in rendered["renderedPrompt"] or rendered["unresolvedVariables"] is not None
diff --git a/backend/tests/test_catalyst_routes.py b/backend/tests/test_catalyst_routes.py
new file mode 100644
index 00000000..52c5d94b
--- /dev/null
+++ b/backend/tests/test_catalyst_routes.py
@@ -0,0 +1,94 @@
+from __future__ import annotations
+
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from backend.api.routes_catalyst import router
+from backend.services.ad_network_service import BidAction, Platform
+
+
+def _build_client() -> TestClient:
+ app = FastAPI()
+
+ async def _broadcast_live_event(*_args, **_kwargs):
+ return None
+
+ app.state.broadcast_live_event = _broadcast_live_event
+ app.include_router(router, prefix="/api/catalyst")
+ return TestClient(app)
+
+
+def test_catalyst_campaigns_and_google_budget_routes(monkeypatch) -> None:
+ client = _build_client()
+
+ async def fake_list_campaigns(platform=None):
+ return [
+ type(
+ "Campaign",
+ (),
+ {
+ "id": "google-camp-001",
+ "name": "Search Investors",
+ "platform": Platform.GOOGLE,
+ "status": type("Status", (), {"value": "active"})(),
+ "daily_budget": 5000,
+ "spent": 0,
+ "objective": "SEARCH",
+ "bid_strategy": "TARGET_ROAS",
+ },
+ )()
+ ]
+
+ async def fake_get_insights(**_kwargs):
+ return [
+ {
+ "campaign_id": "google-camp-001",
+ "spend": 2400,
+ "impressions": 120000,
+ "clicks": 4200,
+ "conversions": 38,
+ }
+ ]
+
+ async def fake_update_budget(_payload):
+ return {"status": "ok", "platform": "google", "mode": "simulated"}
+
+ async def fake_update_bid_strategy(_payload):
+ return BidAction(
+ action_id="act-1",
+ campaign_id="google-camp-001",
+ platform=Platform.GOOGLE,
+ old_strategy="TARGET_CPA",
+ new_strategy="TARGET_ROAS",
+ target_value=8.5,
+ executed_at="2026-04-12T00:00:00Z",
+ )
+
+ monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.list_campaigns", fake_list_campaigns)
+ monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.get_insights", fake_get_insights)
+ monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.update_budget", fake_update_budget)
+ monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.update_bid_strategy", fake_update_bid_strategy)
+
+ campaigns = client.get("/api/catalyst/campaigns")
+ assert campaigns.status_code == 200
+ assert campaigns.json()["data"][0]["platform"] == "google"
+ assert campaigns.json()["data"][0]["conversions"] == 38
+
+ budget = client.put(
+ "/api/catalyst/budget",
+ json={"campaign_id": "google-camp-001", "platform": "google", "daily_budget": 6500},
+ )
+ assert budget.status_code == 200
+ assert budget.json()["data"]["platform"] == "google"
+
+ bid = client.put(
+ "/api/catalyst/bid-strategy",
+ json={
+ "campaign_id": "google-camp-001",
+ "platform": "google",
+ "strategy": "TARGET_ROAS",
+ "target_value": 8.5,
+ },
+ )
+ assert bid.status_code == 200
+ assert bid.json()["data"]["new_strategy"] == "TARGET_ROAS"
diff --git a/backend/tests/test_crm_routes.py b/backend/tests/test_crm_routes.py
new file mode 100644
index 00000000..d746e032
--- /dev/null
+++ b/backend/tests/test_crm_routes.py
@@ -0,0 +1,215 @@
+from __future__ import annotations
+
+from contextlib import asynccontextmanager
+from datetime import datetime, timezone
+from typing import Any
+
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from backend.api.routes_crm import analytics_router, crm_router
+
+
+def _now() -> datetime:
+ return datetime.now(timezone.utc)
+
+
+class FakeConn:
+ def __init__(self) -> None:
+ self.leads: dict[str, dict[str, Any]] = {}
+ self.chat_logs: dict[str, dict[str, Any]] = {}
+
+ async def execute(self, query: str, *args):
+ normalized = query.strip()
+ if "CREATE TABLE IF NOT EXISTS leads" in normalized or "CREATE TABLE IF NOT EXISTS chat_logs" in normalized:
+ return "CREATE"
+ if "CREATE INDEX IF NOT EXISTS" in normalized:
+ return "CREATE INDEX"
+ if normalized.startswith("DELETE FROM leads WHERE id = $1"):
+ existed = self.leads.pop(args[0], None)
+ return "DELETE 1" if existed else "DELETE 0"
+ raise AssertionError(f"Unexpected execute query: {query}")
+
+ async def fetchrow(self, query: str, *args):
+ normalized = query.strip()
+ if "INSERT INTO leads" in normalized:
+ row = {
+ "id": args[0],
+ "name": args[1],
+ "email": args[2],
+ "phone": args[3],
+ "source": args[4],
+ "notes": args[5],
+ "qualification": args[6],
+ "score": args[7],
+ "kanban_status": args[8],
+ "budget": args[9],
+ "unit_interest": args[10],
+ "metadata": {},
+ "created_at": _now(),
+ "updated_at": _now(),
+ }
+ self.leads[row["id"]] = row
+ return row
+ if normalized.startswith("UPDATE leads") and "SET kanban_status" in normalized:
+ lead = self.leads.get(args[0])
+ if not lead:
+ return None
+ lead["kanban_status"] = args[1]
+ lead["updated_at"] = _now()
+ if lead["score"] >= 90:
+ lead["qualification"] = "WHALE"
+ elif lead["score"] >= 70:
+ lead["qualification"] = "POTENTIAL"
+ elif lead["score"] >= 45:
+ lead["qualification"] = "HOT"
+ return lead
+ if normalized.startswith("UPDATE leads") and "RETURNING" in normalized:
+ lead = self.leads.get(args[0])
+ if not lead:
+ return None
+ lead.update(
+ {
+ "name": args[1],
+ "email": args[2],
+ "phone": args[3],
+ "source": args[4],
+ "notes": args[5],
+ "qualification": args[6],
+ "score": args[7],
+ "kanban_status": args[8],
+ "budget": args[9],
+ "unit_interest": args[10],
+ "updated_at": _now(),
+ }
+ )
+ return lead
+ if normalized.startswith("SELECT id FROM leads WHERE id = $1"):
+ lead = self.leads.get(args[0])
+ return {"id": lead["id"]} if lead else None
+ if "INSERT INTO chat_logs" in normalized:
+ row = {
+ "id": args[0],
+ "lead_id": args[1],
+ "sender": args[2],
+ "channel": args[3],
+ "content": args[4],
+ "metadata": {},
+ "created_at": _now(),
+ }
+ self.chat_logs[row["id"]] = row
+ return row
+ raise AssertionError(f"Unexpected fetchrow query: {query}")
+
+ async def fetch(self, query: str, *args):
+ normalized = query.strip()
+ if "FROM leads" in normalized and "GROUP BY source" not in normalized and "GROUP BY qualification" not in normalized:
+ rows = list(self.leads.values())
+ if "WHERE kanban_status = $1" in normalized:
+ rows = [row for row in rows if row["kanban_status"] == args[0]]
+ return rows
+ if "FROM chat_logs" in normalized:
+ rows = list(self.chat_logs.values())
+ if "WHERE lead_id = $1" in normalized:
+ rows = [row for row in rows if row["lead_id"] == args[0]]
+ return rows
+ if "GROUP BY source" in normalized:
+ grouped: dict[str, dict[str, Any]] = {}
+ for lead in self.leads.values():
+ slot = grouped.setdefault(lead["source"], {"source": lead["source"], "lead_count": 0, "avg_score": 0.0})
+ slot["lead_count"] += 1
+ slot["avg_score"] += float(lead["score"])
+ for slot in grouped.values():
+ slot["avg_score"] = slot["avg_score"] / slot["lead_count"]
+ return list(grouped.values())
+ if "GROUP BY qualification" in normalized:
+ grouped: dict[str, dict[str, Any]] = {}
+ for lead in self.leads.values():
+ slot = grouped.setdefault(lead["qualification"], {"qualification": lead["qualification"], "lead_count": 0})
+ slot["lead_count"] += 1
+ return list(grouped.values())
+ raise AssertionError(f"Unexpected fetch query: {query}")
+
+
+class FakePool:
+ def __init__(self) -> None:
+ self.conn = FakeConn()
+
+ @asynccontextmanager
+ async def acquire(self):
+ yield self.conn
+
+
+def _build_client() -> tuple[TestClient, FakePool]:
+ app = FastAPI()
+ pool = FakePool()
+ app.state.db_pool = pool
+ app.include_router(crm_router, prefix="/api")
+ app.include_router(analytics_router, prefix="/api/analytics")
+ return TestClient(app), pool
+
+
+def test_crm_crud_and_analytics_flow() -> None:
+ client, _pool = _build_client()
+
+ create_response = client.post(
+ "/api/leads",
+ json={
+ "name": "Amina Rahman",
+ "email": "amina@example.com",
+ "phone": "+971500000001",
+ "source": "website",
+ "notes": "Cash buyer interested in marina penthouse",
+ "score": 92,
+ "kanban_status": "qualified",
+ "budget": "AED 12M",
+ "unit_interest": "Penthouse",
+ "metadata": {"campaign": "meta-velocity-marina"},
+ },
+ )
+ assert create_response.status_code == 201
+ lead_id = create_response.json()["data"]["id"]
+
+ list_response = client.get("/api/leads")
+ assert list_response.status_code == 200
+ assert list_response.json()["meta"]["count"] == 1
+
+ chat_response = client.post(
+ "/api/chat-logs",
+ json={
+ "lead_id": lead_id,
+ "sender": "oracle",
+ "channel": "whatsapp",
+ "content": "Lead requested a private marina walkthrough.",
+ "metadata": {"sentiment": "positive"},
+ },
+ )
+ assert chat_response.status_code == 201
+
+ board_response = client.get("/api/kanban/board")
+ assert board_response.status_code == 200
+ board = board_response.json()["data"]
+ qualifying_column = next(column for column in board if column["status"] == "Qualifying")
+ assert qualifying_column["count"] == 1
+
+ move_response = client.put("/api/kanban/move", json={"lead_id": lead_id, "target_status": "negotiation"})
+ assert move_response.status_code == 200
+ assert move_response.json()["data"]["kanban_status"] == "Negotiation"
+
+ scatter_response = client.get("/api/analytics/sentiment-scatter")
+ assert scatter_response.status_code == 200
+ scatter = scatter_response.json()
+ assert scatter[0]["qualification"] == "WHALE"
+ assert scatter[0]["kanban_status"] == "Negotiation"
+
+
+def test_lead_demographics_groups_by_source_and_qualification() -> None:
+ client, _pool = _build_client()
+ client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 80})
+ client.post("/api/leads", json={"name": "Lead Two", "source": "walkin", "score": 45})
+
+ response = client.get("/api/leads/demographics")
+ assert response.status_code == 200
+ payload = response.json()["data"]
+ assert len(payload["by_source"]) == 2
+ assert any(row["qualification"] == "POTENTIAL" for row in payload["by_qualification"])
diff --git a/backend/tests/test_crm_websocket.py b/backend/tests/test_crm_websocket.py
new file mode 100644
index 00000000..39c9bcd6
--- /dev/null
+++ b/backend/tests/test_crm_websocket.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+import os
+
+from fastapi.testclient import TestClient
+
+os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret")
+
+from backend.main import app
+
+
+def test_crm_websocket_ack_roundtrip() -> None:
+ with TestClient(app) as client:
+ with client.websocket_connect("/ws/crm") as websocket:
+ first = websocket.receive_json()
+ assert first["type"] == "crm_presence"
+ websocket.send_text("ping")
+ second = websocket.receive_json()
+ assert second["type"] == "crm_ack"
+ assert second["data"] == "ping"
diff --git a/backend/tests/test_nemoclaw_runtime.py b/backend/tests/test_nemoclaw_runtime.py
new file mode 100644
index 00000000..b44b5eb1
--- /dev/null
+++ b/backend/tests/test_nemoclaw_runtime.py
@@ -0,0 +1,20 @@
+from backend.services.mcp_registry import mcp_registry
+from backend.services.nemoclaw_runtime import nemoclaw_runtime
+
+
+def test_nemoclaw_runtime_builds_workflow_dispatch() -> None:
+ dispatch = nemoclaw_runtime.build_workflow_dispatch(
+ prompt="Build a marketing-ready Oracle canvas",
+ tenant_id="tenant_velocity",
+ actor_role="sales_director",
+ component_templates=["tpl_pipeline_board_v2"],
+ )
+ assert dispatch["runtime"] == "python_native_nemoclaw"
+ assert dispatch["workflow"] == "oracle_canvas_generation"
+
+
+def test_mcp_registry_lists_root_python_tools() -> None:
+ tools = mcp_registry.list_tools()
+ names = {tool["name"] for tool in tools}
+ assert "crm_search" in names
+ assert "external_search" in names
diff --git a/backend/tests/test_oracle_routes.py b/backend/tests/test_oracle_routes.py
new file mode 100644
index 00000000..e16ba139
--- /dev/null
+++ b/backend/tests/test_oracle_routes.py
@@ -0,0 +1,64 @@
+from __future__ import annotations
+
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from backend.api.routes_oracle import router
+
+
+def _build_client() -> TestClient:
+ app = FastAPI()
+ app.state.db_pool = object()
+
+ async def _broadcast_crm_event(*_args, **_kwargs):
+ return None
+
+ app.state.broadcast_crm_event = _broadcast_crm_event
+ app.include_router(router, prefix="/api/oracle")
+ return TestClient(app)
+
+
+def test_oracle_mcp_execute_and_writeback(monkeypatch) -> None:
+ client = _build_client()
+
+ async def fake_execute(tool_name, query, *, crm_pool=None):
+ return {"tool": tool_name, "query": query, "status": "ok", "results": [{"title": "Match"}]}
+
+ async def fake_apply_writeback(payload):
+ return {
+ "actionId": payload["action_id"],
+ "status": "applied",
+ "targetEntityType": payload["target_entity_type"],
+ "targetEntityId": payload["target_entity_id"],
+ "resultPayload": {"lead_id": payload["target_entity_id"], "score": 88},
+ }
+
+ async def fake_list_actions(*, status=None, limit=50):
+ return [{"actionId": "act-1", "status": status or "planned", "targetEntityType": "lead"}]
+
+ monkeypatch.setattr("backend.api.routes_oracle.mcp_registry.execute", fake_execute)
+ monkeypatch.setattr("backend.api.routes_oracle.oracle_action_service.apply_writeback", fake_apply_writeback)
+ monkeypatch.setattr("backend.api.routes_oracle.oracle_action_service.list_actions", fake_list_actions)
+
+ mcp_response = client.post(
+ "/api/oracle/mcp/execute",
+ json={"tool_name": "external_search", "query": "luxury marina inventory dubai"},
+ )
+ assert mcp_response.status_code == 200
+ assert mcp_response.json()["data"]["tool"] == "external_search"
+
+ writeback_response = client.post(
+ "/api/oracle/actions/writeback",
+ json={
+ "action_id": "act-1",
+ "target_entity_type": "lead",
+ "target_entity_id": "lead-1",
+ "writeback_payload": {"score_delta": 12},
+ },
+ )
+ assert writeback_response.status_code == 200
+ assert writeback_response.json()["data"]["status"] == "applied"
+
+ list_response = client.get("/api/oracle/actions")
+ assert list_response.status_code == 200
+ assert list_response.json()["meta"]["count"] == 1