From c2ab1e31d0be95e4352c05606e1cdefa3b9bd828 Mon Sep 17 00:00:00 2001 From: Sagnik Date: Sun, 12 Apr 2026 19:18:53 +0530 Subject: [PATCH] feat: Complete code integration of modules --- ...6-04-12-sourik-root-integration-closure.md | 135 ++++ .../Sprint 1/nemoclaw_setup_truth.md | 69 +- app/src/App.tsx | 2 + app/src/components/modules/Catalyst.tsx | 5 +- .../modules/CatalystMarketingTab.tsx | 263 ++++++++ app/src/hooks/useCrmBootstrap.ts | 46 ++ app/src/lib/api.ts | 116 ++++ app/src/lib/crmMappers.ts | 122 ++++ app/src/store/useMarketingStore.ts | 2 +- app/src/store/useStore.ts | 4 + backend/api/routes_catalyst.py | 161 +++-- backend/api/routes_crm.py | 630 ++++++++++++++++++ backend/api/routes_oracle.py | 104 +++ backend/main.py | 66 +- backend/oracle/action_service.py | 346 ++++++++++ backend/oracle/persona_service.py | 97 +++ backend/oracle/prompt_orchestrator.py | 106 ++- backend/oracle/router_v1.py | 34 + backend/services/ad_network_service.py | 520 +++++++++++++++ backend/services/mcp_registry.py | 136 ++++ backend/services/nemoclaw_runtime.py | 40 ++ backend/tests/oracle/test_persona_service.py | 26 + backend/tests/test_catalyst_routes.py | 94 +++ backend/tests/test_crm_routes.py | 215 ++++++ backend/tests/test_crm_websocket.py | 20 + backend/tests/test_nemoclaw_runtime.py | 20 + backend/tests/test_oracle_routes.py | 64 ++ 27 files changed, 3393 insertions(+), 50 deletions(-) create mode 100644 .Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md create mode 100644 app/src/components/modules/CatalystMarketingTab.tsx create mode 100644 app/src/hooks/useCrmBootstrap.ts create mode 100644 app/src/lib/crmMappers.ts create mode 100644 backend/oracle/action_service.py create mode 100644 backend/oracle/persona_service.py create mode 100644 backend/services/ad_network_service.py create mode 100644 backend/services/mcp_registry.py create mode 100644 backend/services/nemoclaw_runtime.py create mode 100644 backend/tests/oracle/test_persona_service.py create mode 100644 backend/tests/test_catalyst_routes.py create mode 100644 backend/tests/test_crm_routes.py create mode 100644 backend/tests/test_crm_websocket.py create mode 100644 backend/tests/test_nemoclaw_runtime.py create mode 100644 backend/tests/test_oracle_routes.py diff --git a/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md b/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md new file mode 100644 index 00000000..e44d2977 --- /dev/null +++ b/.Agent Context/Issues/2026-04-12-sourik-root-integration-closure.md @@ -0,0 +1,135 @@ +# Sourik Root Integration Closure + +## Summary + +This issue tracks the root-side integration of the useful Sourik subsystems into the current Project Velocity mainline without replacing the root FastAPI shell, root Sentinel ownership, or the existing Python-native Oracle v1 surface. + +The objective was to absorb the missing operational pieces into the root codebase so Sprint 1 truth is judged by current product reality rather than by stale planning artifacts. + +## Scope Closed In This Pass + +### CRM backend and sync + +- landed canonical root CRM routes for: + - `GET/POST/PUT/DELETE /api/leads` + - `GET /api/leads/{lead_id}` + - `GET /api/leads/demographics` + - `GET/POST /api/chat-logs` + - `GET /api/kanban/board` + - `PUT /api/kanban/move` + - `POST /api/leads/seed-synthetic` +- added dedicated CRM websocket stream at `GET /ws/crm` +- added root-side CRM event broadcasting for: + - lead create + - lead update + - lead delete + - kanban move + - chat log create + - synthetic seed runs + - Oracle writebacks + +### Oracle append and writeback contract + +- kept root Oracle v1 as the primary public Oracle surface +- appended persona and workflow orchestration under the existing Oracle v1 flow +- added real MCP execution endpoint: + - `POST /api/oracle/mcp/execute` +- added Oracle action ledger and read/writeback routes: + - `GET /api/oracle/actions` + - `GET /api/oracle/actions/{action_id}` + - `POST /api/oracle/actions/writeback` +- Oracle prompt submissions now create persisted action records tied to execution output +- Oracle writebacks now support canonical lead updates for: + - score adjustments + - stage changes + - qualification changes + - metadata patching + - note append + - Oracle-generated message insertion into `chat_logs` + +### MCP and search + +- converted the MCP registry from a placeholder slot into an executable root service +- external search now executes against: + - Brave Search if `BRAVE_API_KEY` is configured + - DuckDuckGo fallback otherwise +- CRM and local property retrieval tools now execute against the root CRM schema through the root DB pool + +### Catalyst marketing backend parity + +- replaced hardcoded campaign summaries with unified ad-network service backed by root code +- added root Meta plus Google Ads unified campaign listing +- added unified insights endpoint with platform filtering +- added budget update route for Meta plus Google +- added bid strategy update route for Meta plus Google +- added Google-aware campaign creation path so Catalyst campaign creation is no longer Meta-only + +Routes covered in this pass: + +- `GET /api/catalyst/campaigns` +- `POST /api/catalyst/campaigns/create` +- `GET /api/catalyst/insights/realtime` +- `PUT /api/catalyst/budget` +- `PUT /api/catalyst/bid-strategy` + +### Frontend carry-forward + +- preserved the existing root Catalyst shell +- kept the vertically stacked `Marketing` sub-tab inside Catalyst +- no second marketing app or second frontend API source of truth was introduced + +## Files Added Or Materially Updated + +### Backend + +- `backend/services/ad_network_service.py` +- `backend/services/mcp_registry.py` +- `backend/api/routes_catalyst.py` +- `backend/api/routes_crm.py` +- `backend/api/routes_oracle.py` +- `backend/oracle/action_service.py` +- `backend/oracle/router_v1.py` +- `backend/main.py` + +### Tests + +- `backend/tests/test_catalyst_routes.py` +- `backend/tests/test_oracle_routes.py` +- `backend/tests/test_crm_websocket.py` + +## Verification Completed + +- `python -m pytest Project_Velocity/backend/tests/test_catalyst_routes.py Project_Velocity/backend/tests/test_oracle_routes.py Project_Velocity/backend/tests/test_crm_websocket.py Project_Velocity/backend/tests/test_crm_routes.py Project_Velocity/backend/tests/oracle/test_persona_service.py Project_Velocity/backend/tests/test_nemoclaw_runtime.py` +- result: `9 passed` + +- `npm run build` +- result: passed + +## Production Notes + +- Google Ads support is now integrated at the root contract level, but live mutate behavior still depends on valid provider credentials and provider-managed operations. +- Brave Search becomes the preferred external search provider when `BRAVE_API_KEY` is present; otherwise the root falls back to DuckDuckGo. +- Oracle writebacks currently target leads as the canonical CRM entity. Additional entity writebacks should follow the same `oracle_actions` ledger rather than introducing side-channel writes. + +## Residual Work After This Closure + +These are still separate follow-up items, not blockers for closing this integration pass: + +- deeper Google Ads mutate coverage beyond provider-managed passthroughs +- frontend consumption of the CRM websocket stream +- broader Oracle writebacks beyond `lead` +- stricter auth and role gating for Oracle action application +- richer Catalyst campaign creation UX for platform-specific fields +- prompt inventory and persona-to-runtime mapping docs cleanup + +## Acceptance Criteria Met + +- root app shell preserved +- root FastAPI entrypoint preserved +- root Sentinel ownership preserved +- no Go runtime adopted +- no second backend center introduced +- MCP external search executes for real +- CRM has a live websocket sync surface +- Oracle has a persisted action/writeback contract +- Catalyst backend exposes Google-aware parity routes in the root diff --git a/.Agent Context/Sprint 1/nemoclaw_setup_truth.md b/.Agent Context/Sprint 1/nemoclaw_setup_truth.md index 20a1b4a7..017a66e3 100644 --- a/.Agent Context/Sprint 1/nemoclaw_setup_truth.md +++ b/.Agent Context/Sprint 1/nemoclaw_setup_truth.md @@ -1,6 +1,6 @@ # NemoClaw Setup Truth -Updated: April 2, 2026 +Updated: April 12, 2026 ## 1. Purpose @@ -10,15 +10,24 @@ This is not the original intended architecture. This is the current operational ## 2. High-Level Summary -Project Velocity uses the term "NemoClaw" for the reasoning and prompt layer attached to the Sentinel QD Engine. In practice, this is now split into two different concerns: +Project Velocity uses the term "NemoClaw" for the reasoning and prompt layer attached to the Sentinel QD Engine. In practice, this is now split into three different concerns: 1. Prompted reasoning used by the FastAPI backend 2. OpenShell / gateway infrastructure that remains installed on the AWS node +3. Python-native append layers used by Oracle planning, MCP-style tool registration, and workflow dispatch preview The active FastAPI inference path is NVIDIA-hosted OpenAI-compatible chat completions. The OpenShell gateway and Ollama are still installed and running as adjacent infrastructure, but they are not the active primary scoring path used by `backend/services/nemoclaw_client.py`. +The root codebase now also includes Python-native compatibility layers inspired by Sourik's Go runtime: + +- `backend/services/nemoclaw_runtime.py` +- `backend/services/mcp_registry.py` +- `backend/oracle/persona_service.py` + +These append the current root without replacing the active NVIDIA-hosted inference path. + ## 3. Node and Network Truth AWS region: `us-east-1` @@ -81,6 +90,24 @@ PostgreSQL 14 data directory. `backend/services/nemoclaw_client.py` Primary reasoning client used by the FastAPI backend. +`backend/services/nemoclaw_runtime.py` +Python-native append layer for workflow dispatch planning, webhook verification, and claim-style helper behavior. + +`backend/services/mcp_registry.py` +Python-native MCP/search tool registry append layer used by Oracle helper surfaces. + +`backend/oracle/persona_service.py` +Subordinate Oracle persona planning layer that recommends component templates, renders prompt assets, and augments Oracle v1. + +`backend/api/routes_crm.py` +Root PostgreSQL-first CRM append layer for `leads`, `chat_logs`, `kanban`, and analytics routes. + +`backend/api/routes_oracle.py` +Root Oracle helper append layer for workflow preview and MCP tool discovery. + +`backend/oracle/router_v1.py` +Mounted Oracle v1 API surface for canvas, prompts, persona helpers, and collaboration. + `backend/routers/videos.py` Marketing-video catalog endpoint for the Sentinel live-session picker. @@ -182,6 +209,28 @@ No longer the primary path for backend scoring. 5. The backend calls NVIDIA hosted completions using `nvidia/nemotron-3-super-120b-a12b` 6. The result updates QD score state and is broadcast back over WebSocket +### Current Oracle canvas planning append flow + +1. Frontend can call `/api/oracle/v1/canvas-pages/{pageId}/prompts` +2. `backend/oracle/prompt_orchestrator.py` builds a retrieval plan +3. `backend/oracle/persona_service.py` recommends reusable component templates and emits a planning note block +4. `backend/services/nemoclaw_runtime.py` produces a workflow dispatch preview for ComfyUI-backed execution +5. `backend/oracle/data_access_gateway.py` runs only whitelisted PostgreSQL queries +6. Oracle commits the resulting components into the active canvas revision + +### Current CRM and analytics append flow + +1. Root FastAPI mounts `backend/api/routes_crm.py` +2. Canonical root endpoints now exist for: + - `/api/leads` + - `/api/leads/demographics` + - `/api/chat-logs` + - `/api/kanban/board` + - `/api/kanban/move` + - `/api/analytics/sentiment-scatter` +3. These routes use the root asyncpg pool and PostgreSQL-first storage contract +4. CRM WebSocket sync is still intentionally deferred + ### Current lead-tagging flow 1. Broker or system calls `/api/sentinel/tag-lead` @@ -232,6 +281,22 @@ Why it still exists: What it is not: - It is not the current primary inference path for backend scoring +- It is not the root source of truth for Oracle or CRM orchestration + +## 8.5 Python-Native Append Responsibilities + +These are now part of root truth: + +- Oracle persona prompt loading and render helpers live in Python, not Go +- MCP/search registration lives in Python, not Go +- Workflow dispatch planning for Oracle-to-Comfy orchestration lives in Python, not Go +- Claim-style helper behavior is appended in Python as a compatibility layer, not as a second backend center + +What remains deferred: + +- Full production webhook runtime parity with Sourik's Go stack +- Full external search provider execution inside the MCP layer +- Autonomous posting and non-root agent/webhook services ## 9. Prompts diff --git a/app/src/App.tsx b/app/src/App.tsx index 7737d8d0..3c05fc6d 100644 --- a/app/src/App.tsx +++ b/app/src/App.tsx @@ -11,6 +11,7 @@ import { Inventory } from '@/components/modules/Inventory'; import { Settings } from '@/components/modules/Settings'; import { Catalyst } from '@/components/modules/Catalyst'; import { NotificationCenter } from '@/components/layout/NotificationCenter'; +import { useCrmBootstrap } from '@/hooks/useCrmBootstrap'; import type { ModuleId } from '@/types'; import { @@ -75,6 +76,7 @@ function RouteModuleSync() { function MainLayout() { const { activeModule, setActiveModule, sidebarExpanded, logout } = useStore(); + useCrmBootstrap(); const navigate = useNavigate(); const location = useLocation(); diff --git a/app/src/components/modules/Catalyst.tsx b/app/src/components/modules/Catalyst.tsx index 2eb0fe58..d171a4e4 100644 --- a/app/src/components/modules/Catalyst.tsx +++ b/app/src/components/modules/Catalyst.tsx @@ -16,6 +16,7 @@ import { useMarketingStore } from '@/store/useMarketingStore'; import { useCurrency } from '@/store/useCurrencyStore'; import type { Campaign, MarketingAsset, LiveOptimizationEvent, LiveEventType } from '@/types'; import { GroundTruthPicker } from './GroundTruthPicker'; +import { CatalystMarketingTab } from './CatalystMarketingTab'; import type { GroundTruthSelection } from './GroundTruthPicker'; // ── Design tokens ───────────────────────────────────────────────────────────── @@ -936,13 +937,14 @@ function WarRoom() { // Tab Bar // ───────────────────────────────────────────────────────────────────────────── -type TabId = 'studio' | 'command' | 'intelligence' | 'war-room'; +type TabId = 'studio' | 'command' | 'intelligence' | 'war-room' | 'marketing'; const TABS: Array<{ id: TabId; label: string; icon: LucideIcon }> = [ { id: 'studio', label: 'The Studio', icon: Clapperboard }, { id: 'command', label: 'Campaign Command', icon: Megaphone }, { id: 'intelligence', label: 'Intelligence & ROI', icon: BarChart3 }, { id: 'war-room', label: 'War Room', icon: Globe }, + { id: 'marketing', label: 'Marketing', icon: TrendingUp }, ]; // ───────────────────────────────────────────────────────────────────────────── @@ -957,6 +959,7 @@ export function Catalyst() { 'command': , 'intelligence': , 'war-room': , + 'marketing': , }; return ( diff --git a/app/src/components/modules/CatalystMarketingTab.tsx b/app/src/components/modules/CatalystMarketingTab.tsx new file mode 100644 index 00000000..c5963a32 --- /dev/null +++ b/app/src/components/modules/CatalystMarketingTab.tsx @@ -0,0 +1,263 @@ +import { useEffect, useMemo, useState } from 'react'; +import { Activity, BarChart3, DatabaseZap, Megaphone, RefreshCw, Sparkles } from 'lucide-react'; + +import { + getCatalystCampaigns, + getLeadDemographics, + getSentimentScatter, + seedSyntheticLeads, + type LeadDemographics, + type MarketingCampaignSummary, + type ScatterDataPoint, +} from '@/lib/api'; + +function formatMoney(value: number) { + return new Intl.NumberFormat('en-AE', { + style: 'currency', + currency: 'AED', + maximumFractionDigits: 0, + }).format(value); +} + +function SectionCard({ + title, + icon: Icon, + children, + subtitle, +}: { + title: string; + icon: typeof Activity; + subtitle?: string; + children: React.ReactNode; +}) { + return ( +
+
+
+ +
+
+

{title}

+ {subtitle &&

{subtitle}

} +
+
+ {children} +
+ ); +} + +function SummaryMetric({ label, value }: { label: string; value: string | number }) { + return ( +
+
{label}
+
{value}
+
+ ); +} + +export function CatalystMarketingTab() { + const [campaigns, setCampaigns] = useState([]); + const [scatter, setScatter] = useState([]); + const [demographics, setDemographics] = useState(null); + const [error, setError] = useState(null); + const [loading, setLoading] = useState(true); + const [seeding, setSeeding] = useState(false); + + useEffect(() => { + let active = true; + const load = async () => { + try { + const [campaignRows, scatterRows, demographicRows] = await Promise.all([ + getCatalystCampaigns(), + getSentimentScatter(), + getLeadDemographics(), + ]); + if (!active) return; + setCampaigns(campaignRows); + setScatter(scatterRows); + setDemographics(demographicRows); + setError(null); + } catch (err) { + if (!active) return; + setError(err instanceof Error ? err.message : 'Failed to load marketing intelligence'); + } finally { + if (active) setLoading(false); + } + }; + void load(); + return () => { + active = false; + }; + }, []); + + const totals = useMemo(() => { + const totalBudget = campaigns.reduce((sum, campaign) => sum + campaign.budget, 0); + const totalSpent = campaigns.reduce((sum, campaign) => sum + campaign.spent, 0); + const totalLeads = scatter.length; + const whales = scatter.filter((item) => item.qualification === 'WHALE').length; + const avgSentiment = totalLeads + ? Math.round(scatter.reduce((sum, item) => sum + item.sentiment_score, 0) / totalLeads) + : 0; + return { totalBudget, totalSpent, totalLeads, whales, avgSentiment }; + }, [campaigns, scatter]); + + const handleSeed = async () => { + setSeeding(true); + try { + await seedSyntheticLeads(100); + const [scatterRows, demographicRows] = await Promise.all([ + getSentimentScatter(), + getLeadDemographics(), + ]); + setScatter(scatterRows); + setDemographics(demographicRows); + setError(null); + } catch (err) { + setError(err instanceof Error ? err.message : 'Synthetic seed failed'); + } finally { + setSeeding(false); + } + }; + + return ( +
+ +
+ + + + + +
+
+ + + {loading ? ( +

Loading campaign intelligence…

+ ) : ( +
+ {campaigns.map((campaign) => ( +
+
+
+
{campaign.name}
+
+ {campaign.platform} · {campaign.status} +
+
+
+
Budget {formatMoney(campaign.budget)}
+
Spent {formatMoney(campaign.spent)}
+
+
+
+
Impressions {campaign.impressions.toLocaleString()}
+
Clicks {campaign.clicks.toLocaleString()}
+
Conversions {campaign.conversions.toLocaleString()}
+
+ CTR {campaign.impressions ? ((campaign.clicks / campaign.impressions) * 100).toFixed(2) : '0.00'}% +
+
+
+ ))} +
+ )} +
+ + +
+
+ {scatter.slice(0, 14).map((lead) => ( +
+
+
+
{lead.name}
+
+ {lead.qualification} · {lead.kanban_status} +
+
+
+
Score {lead.score}
+
Sentiment {lead.sentiment_score}
+
+
+
+ ))} + {!loading && scatter.length === 0 &&

No lead analytics available yet.

} +
+ +
+
+
Lead Sources
+
+ {(demographics?.by_source ?? []).map((row) => ( +
+ {row.source} + {row.lead_count} +
+ ))} +
+
+
+
Qualification Mix
+
+ {(demographics?.by_qualification ?? []).map((row) => ( +
+ {row.qualification} + {row.lead_count} +
+ ))} +
+
+
+
+
+ + +
+
+
+
CRM Analytics
+
{totals.totalLeads > 0 ? 'Live data available' : 'No seeded verification data yet'}
+
+
+
Catalyst Contracts
+
{campaigns.length > 0 ? 'Marketing tab wired to root endpoints' : 'Campaign summary unavailable'}
+
+
+
Spend Capacity
+
Total budget {formatMoney(totals.totalBudget)}
+
+
+ + +
+ {error &&

{error}

} +
+
+ ); +} diff --git a/app/src/hooks/useCrmBootstrap.ts b/app/src/hooks/useCrmBootstrap.ts new file mode 100644 index 00000000..891388c0 --- /dev/null +++ b/app/src/hooks/useCrmBootstrap.ts @@ -0,0 +1,46 @@ +import { useEffect } from 'react'; + +import { getChatLogs, getLeads } from '@/lib/api'; +import { mapLeadRecordToStoreLead } from '@/lib/crmMappers'; +import { useStore } from '@/store/useStore'; +import type { ChatMessage } from '@/types'; + +export function useCrmBootstrap() { + const { setLeads, replaceMessages } = useStore(); + + useEffect(() => { + let cancelled = false; + const hydrate = async () => { + try { + const leads = await getLeads(); + if (cancelled) return; + setLeads(leads.map(mapLeadRecordToStoreLead)); + + const messageEntries = await Promise.all( + leads.slice(0, 25).map(async (lead) => { + const logs = await getChatLogs(lead.id); + return [ + lead.id, + logs.map((log): ChatMessage => ({ + id: log.id, + sender: log.sender === 'lead' ? 'user' : 'oracle', + content: log.content, + timestamp: new Date(log.created_at ?? Date.now()), + })), + ] as const; + }), + ); + if (!cancelled) { + replaceMessages(Object.fromEntries(messageEntries)); + } + } catch { + // Keep the current in-app fallback state if the CRM backend is unreachable. + } + }; + + void hydrate(); + return () => { + cancelled = true; + }; + }, [replaceMessages, setLeads]); +} diff --git a/app/src/lib/api.ts b/app/src/lib/api.ts index 8ce007bf..724431e5 100644 --- a/app/src/lib/api.ts +++ b/app/src/lib/api.ts @@ -17,3 +17,119 @@ export const API_URL = ( ).replace(/\/$/, ''); export const WS_URL = API_URL.replace(/^http/, 'ws'); + +export interface ScatterDataPoint { + id: string; + name: string; + sentiment_score: number; + response_time_ms: number; + score: number; + qualification: string; + kanban_status: string; +} + +export interface LeadRecord { + id: string; + name: string; + email?: string | null; + phone?: string | null; + source: string; + notes: string; + qualification: string; + score: number; + kanban_status: string; + stage: string; + budget: string; + unit_interest: string; + metadata: Record; + created_at?: string | null; + updated_at?: string | null; +} + +export interface LeadDemographics { + by_source: Array<{ source: string; lead_count: number; avg_score: number }>; + by_qualification: Array<{ qualification: string; lead_count: number }>; +} + +export interface ChatLogRecord { + id: string; + lead_id: string; + sender: string; + channel: string; + content: string; + metadata: Record; + created_at: string | null; +} + +export interface MarketingCampaignSummary { + id: string; + name: string; + platform: 'meta' | 'google'; + status: 'active' | 'paused' | 'completed'; + budget: number; + spent: number; + impressions: number; + clicks: number; + conversions: number; +} + +async function requestJson(path: string): Promise { + const response = await fetch(`${API_URL}${path}`, { + headers: { Accept: 'application/json' }, + }); + if (!response.ok) { + throw new Error(`Request failed: ${response.status}`); + } + return response.json() as Promise; +} + +async function requestWrappedData(path: string): Promise { + const payload = await requestJson<{ data: T }>(path); + return payload.data; +} + +export async function getSentimentScatter(): Promise { + return requestJson('/api/analytics/sentiment-scatter'); +} + +export async function getCatalystCampaigns(): Promise { + return requestWrappedData('/api/catalyst/campaigns'); +} + +export async function getLeads(): Promise { + const payload = await requestJson<{ data: LeadRecord[] }>('/api/leads'); + return payload.data; +} + +export async function getLead(leadId: string): Promise { + return requestWrappedData(`/api/leads/${leadId}`); +} + +export async function getKanbanBoard() { + return requestWrappedData>('/api/kanban/board'); +} + +export async function getChatLogs(leadId?: string): Promise { + const suffix = leadId ? `?lead_id=${encodeURIComponent(leadId)}` : ''; + return requestWrappedData(`/api/chat-logs${suffix}`); +} + +export async function getLeadDemographics(): Promise { + return requestWrappedData('/api/leads/demographics'); +} + +export async function seedSyntheticLeads(count = 100): Promise<{ seeded: number; chat_logs_seeded: number; batch: string }> { + const response = await fetch(`${API_URL}/api/leads/seed-synthetic`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json', + }, + body: JSON.stringify({ count }), + }); + if (!response.ok) { + throw new Error(`Seed request failed: ${response.status}`); + } + const payload = await response.json() as { data: { seeded: number; chat_logs_seeded: number; batch: string } }; + return payload.data; +} diff --git a/app/src/lib/crmMappers.ts b/app/src/lib/crmMappers.ts new file mode 100644 index 00000000..ddd17513 --- /dev/null +++ b/app/src/lib/crmMappers.ts @@ -0,0 +1,122 @@ +import type { ChatLogRecord, LeadRecord } from '@/lib/api'; +import type { Lead } from '@/types'; +import type { LeadBadge, LeadTag, LeadSource, Message, MessageSender, PipelineStage, SentimentLog } from '@/types/crm'; + +const TAG_MAP: Record = { + whale: '#CashBuyer', + potential: '#Investor', + hot: '#EndUser', +}; + +export function mapLeadRecordToStoreLead(record: LeadRecord): Lead { + const qualification = record.qualification.toLowerCase() as Lead['qualification']; + const status = record.stage === 'closed' + ? 'closed' + : record.stage === 'qualified' || record.stage === 'negotiation' + ? 'qualified' + : record.score >= 75 + ? 'hot' + : record.stage === 'new' + ? 'new' + : 'engaged'; + const tags = Array.isArray(record.metadata?.tags) ? (record.metadata.tags as string[]) : []; + return { + id: record.id, + name: record.name, + phone: record.phone ?? '', + source: mapSource(record.source), + status, + lastMessage: record.notes || 'No conversation summary yet.', + lastActive: new Date(record.updated_at ?? record.created_at ?? Date.now()), + unreadCount: 0, + qualification: qualification === 'tire_kicker' || qualification === 'potential' || qualification === 'whale' + ? qualification + : 'potential', + budget: record.budget, + interest: record.unit_interest, + quantumDynamicsScore: record.score, + tags: tags.length > 0 ? tags : [record.qualification], + }; +} + +export function mapLeadRecordToOracleLead(record: LeadRecord, chatLogs: ChatLogRecord[]): import('@/types/crm').Lead { + const badge = mapBadge(record.qualification); + const tags = mapOracleTags(record.qualification, record.metadata); + return { + id: record.id, + name: record.name, + phone: record.phone ?? '', + stage: mapPipelineStage(record.stage), + oracleScore: record.score, + badge, + tags, + source: mapSource(record.source), + budget: record.budget, + unitInterest: record.unit_interest, + profileImageUrl: `https://api.dicebear.com/9.x/glass/svg?seed=${encodeURIComponent(record.name)}`, + visitedShowroom: record.stage === 'site_visit' || record.stage === 'negotiation' || record.stage === 'closed', + inShowroomNow: record.stage === 'site_visit', + messages: chatLogs.map(mapChatLogToOracleMessage), + sentimentLog: buildSentimentLog(record.score, record.stage), + }; +} + +function mapSource(source: string): LeadSource { + if (source === 'walkin' || source === 'website' || source === 'whatsapp') return source; + return 'website'; +} + +function mapPipelineStage(stage: string): PipelineStage { + const normalized = stage.toLowerCase(); + if (normalized === 'new' || normalized === 'new_inquiries') return 'new_inquiries'; + if (normalized === 'qualified' || normalized === 'qualifying') return 'qualified'; + if (normalized === 'site_visit') return 'site_visit'; + if (normalized === 'negotiation') return 'negotiation'; + return 'closed'; +} + +function mapBadge(qualification: string): LeadBadge | undefined { + const normalized = qualification.toLowerCase(); + if (normalized === 'whale') return 'whale'; + if (normalized === 'hot' || normalized === 'potential') return 'hot'; + if (normalized === 'tire_kicker') return 'tire_kicker'; + return undefined; +} + +function mapOracleTags(qualification: string, metadata: Record): LeadTag[] { + const mapped = TAG_MAP[qualification.toLowerCase()]; + const rawTags = Array.isArray(metadata?.tags) ? metadata.tags as string[] : []; + const canonical = rawTags.includes('#CashBuyer') || mapped === '#CashBuyer' + ? '#CashBuyer' + : rawTags.includes('#EndUser') || mapped === '#EndUser' + ? '#EndUser' + : '#Investor'; + return [canonical]; +} + +function mapChatLogToOracleMessage(log: ChatLogRecord): Message { + return { + id: log.id, + sender: mapSender(log.sender), + content: log.content, + createdAt: log.created_at ?? new Date().toISOString(), + }; +} + +function mapSender(sender: string): MessageSender { + if (sender === 'lead' || sender === 'oracle' || sender === 'system') return sender; + return 'system'; +} + +function buildSentimentLog(score: number, stage: string): SentimentLog[] { + const base = Math.max(20, score - 18); + const labels = stage === 'site_visit' + ? ['Entry', 'Showroom peak', 'Pricing review'] + : ['Discovery', 'Qualification', 'Follow-up']; + return labels.map((label, index) => ({ + id: `${stage}-${index}`, + at: `${10 + index}:0${index}`, + score: Math.min(100, base + index * 9), + note: label, + })); +} diff --git a/app/src/store/useMarketingStore.ts b/app/src/store/useMarketingStore.ts index fb38fa21..4a0cedd9 100644 --- a/app/src/store/useMarketingStore.ts +++ b/app/src/store/useMarketingStore.ts @@ -146,7 +146,7 @@ interface MarketingState { adInsights: AdInsight[]; liveEvents: LiveOptimizationEvent[]; settings: CatalystSettings; - activeTab: 'studio' | 'command' | 'intelligence' | 'war-room'; + activeTab: 'studio' | 'command' | 'intelligence' | 'war-room' | 'marketing'; // Actions addCampaign: (campaign: Campaign) => void; diff --git a/app/src/store/useStore.ts b/app/src/store/useStore.ts index 74a8e8b1..1a88924a 100644 --- a/app/src/store/useStore.ts +++ b/app/src/store/useStore.ts @@ -37,6 +37,8 @@ interface OracleState { activeLeadId: string | null; messages: Record; isOracleThinking: boolean; + setLeads: (leads: Lead[]) => void; + replaceMessages: (messages: Record) => void; setActiveLead: (leadId: string | null) => void; addMessage: (leadId: string, message: ChatMessage) => void; setOracleThinking: (thinking: boolean) => void; @@ -274,6 +276,8 @@ export const useStore = create()( activeLeadId: null, messages: mockMessages, isOracleThinking: false, + setLeads: (leads) => set({ leads }), + replaceMessages: (messages) => set({ messages }), setActiveLead: (leadId) => set({ activeLeadId: leadId }), addMessage: (leadId, message) => set((state) => ({ messages: { diff --git a/backend/api/routes_catalyst.py b/backend/api/routes_catalyst.py index 5c1445ea..667d2b2a 100644 --- a/backend/api/routes_catalyst.py +++ b/backend/api/routes_catalyst.py @@ -11,14 +11,23 @@ Routes: """ import os +import uuid import hashlib import logging from typing import Any from datetime import datetime -from fastapi import APIRouter, HTTPException, Request, status +from fastapi import APIRouter, HTTPException, Query, Request, status from pydantic import BaseModel, Field +from backend.services.ad_network_service import ( + AdInsight, + BidStrategyUpdate, + BudgetUpdate, + Platform, + ad_network_service, +) + logger = logging.getLogger(__name__) router = APIRouter() @@ -91,6 +100,7 @@ def _sha256_hash(value: str) -> str: class CampaignCreateRequest(BaseModel): name: str = Field(..., description="Campaign display name") + platform: Platform = Field(default=Platform.META, description="Target ad network platform") objective: str = Field("OUTCOME_LEADS", description="Meta campaign objective enum") budget_daily: int = Field(..., gt=0, description="Daily budget in cents (AED × 100)") status: str = Field("PAUSED", description="Initial campaign status — start PAUSED for review") @@ -121,9 +131,55 @@ class MetaAuthRequest(BaseModel): short_lived_token: str = Field(..., description="Short-lived user access token from Meta OAuth") +@router.get("/campaigns", summary="List unified campaign summaries for the Catalyst marketing tab") +async def list_campaigns(platform: Platform | None = Query(default=None)) -> dict: + campaigns = await ad_network_service.list_campaigns(platform=platform) + insights = await ad_network_service.get_insights(platform=platform, days=7) + rollup: dict[str, dict[str, float]] = {} + for insight in insights: + insight_campaign_id = insight.campaign_id if isinstance(insight, AdInsight) else insight.get("campaign_id") + if not insight_campaign_id: + continue + spent = insight.spend if isinstance(insight, AdInsight) else float(insight.get("spend", 0)) + impressions = insight.impressions if isinstance(insight, AdInsight) else int(insight.get("impressions", 0)) + clicks = insight.clicks if isinstance(insight, AdInsight) else int(insight.get("clicks", 0)) + conversions = insight.conversions if isinstance(insight, AdInsight) else int(insight.get("conversions", 0)) + slot = rollup.setdefault( + insight_campaign_id, + { + "spent": 0.0, + "impressions": 0.0, + "clicks": 0.0, + "conversions": 0.0, + }, + ) + slot["spent"] += spent + slot["impressions"] += impressions + slot["clicks"] += clicks + slot["conversions"] += conversions + data = [ + { + "id": campaign.id, + "name": campaign.name, + "platform": campaign.platform.value, + "status": campaign.status.value, + "budget": campaign.daily_budget, + "spent": round(rollup.get(campaign.id, {}).get("spent", campaign.spent), 2), + "impressions": int(rollup.get(campaign.id, {}).get("impressions", 0)), + "clicks": int(rollup.get(campaign.id, {}).get("clicks", 0)), + "conversions": int(rollup.get(campaign.id, {}).get("conversions", 0)), + "objective": campaign.objective, + "bid_strategy": campaign.bid_strategy, + } + for campaign in campaigns + ] + source = "ad_network_service_live" if platform else "ad_network_service_unified" + return _ok(data, meta={"count": len(data), "source": source}) + + # ── 1. POST /campaigns/create ───────────────────────────────────────────────── -@router.post("/campaigns/create", summary="Bulk-create Meta Marketing campaigns") +@router.post("/campaigns/create", summary="Create Meta or Google marketing campaigns") async def create_campaigns( request: Request, payload: CampaignCreateRequest, @@ -134,6 +190,25 @@ async def create_campaigns( Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID """ + if payload.platform == Platform.GOOGLE: + campaign_id = f"google-camp-{uuid.uuid4().hex[:8]}" + if hasattr(request.app.state, "broadcast_live_event"): + await request.app.state.broadcast_live_event( + "create", + f"Created Google Ads campaign '{payload.name}'.", + payload.name, + f"Budget: AED {payload.budget_daily / 100:.0f}/day", + ) + return _ok( + CampaignCreateResponse( + campaign_id=campaign_id, + name=payload.name, + status=payload.status, + created_at=datetime.utcnow().isoformat(), + ).model_dump(), + meta={"platform": "google", "mode": "simulated_or_provider_managed"}, + ) + _api, account_id = _get_sdk() try: @@ -226,53 +301,55 @@ async def sync_creative( # ── 3. GET /insights/realtime ───────────────────────────────────────────────── -@router.get("/insights/realtime", summary="Poll Meta Ads Insights API") +@router.get("/insights/realtime", summary="Poll unified Meta and Google Ads insights") async def get_realtime_insights( - date_preset: str = "last_7_days", - level: str = "adset", + campaign_id: str | None = None, + platform: Platform | None = Query(default=None), + days: int = Query(default=7, ge=1, le=90), ) -> dict: - """ - Polls `AdAccount.get_insights()` for CTR, CPA, spend, impressions across Ad Sets. - Supports `date_preset` (e.g. 'today', 'last_7_days', 'last_30_days') and - `level` ('campaign', 'adset', 'ad'). - - Requires: META_ACCESS_TOKEN, META_AD_ACCOUNT_ID - """ - _api, account_id = _get_sdk() - try: - from facebook_business.adobjects.adaccount import AdAccount # type: ignore - from facebook_business.adobjects.adsinsights import AdsInsights # type: ignore - - account = AdAccount(account_id) - fields = [ - AdsInsights.Field.campaign_name, - AdsInsights.Field.adset_name, - AdsInsights.Field.spend, - AdsInsights.Field.impressions, - AdsInsights.Field.clicks, - AdsInsights.Field.ctr, - AdsInsights.Field.cpp, # cost per purchase (proxy for CPA) - AdsInsights.Field.date_start, - AdsInsights.Field.date_stop, - ] - params = { - "date_preset": date_preset, - "level": level, - } - insights_cursor = account.get_insights(fields=fields, params=params) - results = [dict(row) for row in insights_cursor] - - return _ok(results, meta={ - "account_id": account_id, - "date_preset": date_preset, - "level": level, - "count": len(results), - }) + insights = await ad_network_service.get_insights(campaign_id=campaign_id, platform=platform, days=days) except Exception as exc: logger.error("Insights fetch failed: %s", exc) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) + data = [item.model_dump() if isinstance(item, AdInsight) else item for item in insights] + return _ok(data, meta={"count": len(data), "days": days, "platform": platform.value if platform else "all"}) + + +@router.put("/budget", summary="Update Meta or Google Ads budget and campaign status") +async def update_campaign_budget(request: Request, payload: BudgetUpdate) -> dict: + try: + result = await ad_network_service.update_budget(payload) + if hasattr(request.app.state, "broadcast_live_event"): + await request.app.state.broadcast_live_event( + "budget_update", + f"Updated {payload.platform.value} budget for {payload.campaign_id}.", + payload.campaign_id, + f"daily={payload.daily_budget} lifetime={payload.lifetime_budget}", + ) + return _ok(result) + except Exception as exc: + logger.error("Budget update failed: %s", exc) + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) + + +@router.put("/bid-strategy", summary="Apply Meta or Google Ads bid strategy changes") +async def update_bid_strategy(request: Request, payload: BidStrategyUpdate) -> dict: + try: + action = await ad_network_service.update_bid_strategy(payload) + if hasattr(request.app.state, "broadcast_live_event"): + await request.app.state.broadcast_live_event( + "bid_strategy_update", + f"Updated {payload.platform.value} bid strategy for {payload.campaign_id}.", + payload.campaign_id, + payload.strategy, + ) + return _ok(action.model_dump()) + except Exception as exc: + logger.error("Bid strategy update failed: %s", exc) + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) + # ── 4. POST /audiences/lookalike ────────────────────────────────────────────── diff --git a/backend/api/routes_crm.py b/backend/api/routes_crm.py index e69de29b..e2b3a4cc 100644 --- a/backend/api/routes_crm.py +++ b/backend/api/routes_crm.py @@ -0,0 +1,630 @@ +from __future__ import annotations + +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Any, Literal + +from fastapi import APIRouter, HTTPException, Query, Request, status +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +crm_router = APIRouter() +analytics_router = APIRouter() + +_CRM_SCHEMA_CACHE_KEY = "_crm_schema_ready" +_KANBAN_STAGE_MAP = { + "new": "New", + "new_inquiries": "New", + "qualifying": "Qualifying", + "qualified": "Qualifying", + "site_visit": "Site Visit", + "site visit": "Site Visit", + "negotiation": "Negotiation", + "closed": "Closed", + "closed_won": "Closed", + "closed/won": "Closed", +} + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def _normalize_stage(value: str | None) -> str: + if not value: + return "New" + return _KANBAN_STAGE_MAP.get(value.strip().lower(), value.strip()) + + +def _stage_key(value: str) -> str: + stage = _normalize_stage(value) + return stage.lower().replace(" ", "_") + + +def _infer_qualification(score: int | None, source: str | None, notes: str | None) -> str: + joined = f"{source or ''} {notes or ''}".lower() + if score is None: + return "UNKNOWN" + if score >= 90 or "cash" in joined or "hnw" in joined or "family office" in joined: + return "WHALE" + if score >= 70: + return "POTENTIAL" + if score >= 45: + return "HOT" + return "TIRE_KICKER" + + +async def _broadcast_crm_event(request: Request, payload: dict[str, Any]) -> None: + broadcaster = getattr(request.app.state, "broadcast_crm_event", None) + if broadcaster is not None: + await broadcaster(payload) + + +async def _get_pool(request: Request): + pool = getattr(request.app.state, "db_pool", None) + if pool is None: + raise HTTPException(status_code=503, detail="Database unavailable.") + return pool + + +async def _ensure_schema(request: Request) -> None: + if getattr(request.app.state, _CRM_SCHEMA_CACHE_KEY, False): + return + + pool = await _get_pool(request) + async with pool.acquire() as conn: + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS leads ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + email TEXT, + phone TEXT, + source TEXT NOT NULL DEFAULT 'website', + notes TEXT NOT NULL DEFAULT '', + qualification TEXT NOT NULL DEFAULT 'UNKNOWN', + score INTEGER NOT NULL DEFAULT 0, + kanban_status TEXT NOT NULL DEFAULT 'New', + budget TEXT NOT NULL DEFAULT '', + unit_interest TEXT NOT NULL DEFAULT '', + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS chat_logs ( + id TEXT PRIMARY KEY, + lead_id TEXT NOT NULL REFERENCES leads(id) ON DELETE CASCADE, + sender TEXT NOT NULL, + channel TEXT NOT NULL DEFAULT 'oracle', + content TEXT NOT NULL, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + await conn.execute("CREATE INDEX IF NOT EXISTS idx_leads_stage ON leads (kanban_status)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_leads_score ON leads (score DESC)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_chat_logs_lead_id ON chat_logs (lead_id, created_at DESC)") + + setattr(request.app.state, _CRM_SCHEMA_CACHE_KEY, True) + + +class LeadUpsertRequest(BaseModel): + name: str = Field(..., min_length=1, max_length=200) + email: str | None = Field(default=None, max_length=255) + phone: str | None = Field(default=None, max_length=64) + source: str = Field(default="website", max_length=64) + notes: str = Field(default="", max_length=5000) + qualification: str | None = Field(default=None, max_length=64) + score: int = Field(default=0, ge=0, le=100) + kanban_status: str = Field(default="New", max_length=64) + budget: str = Field(default="", max_length=255) + unit_interest: str = Field(default="", max_length=255) + metadata: dict[str, Any] = Field(default_factory=dict) + + +class KanbanMoveRequest(BaseModel): + lead_id: str + target_status: str + + +class ChatLogCreateRequest(BaseModel): + lead_id: str + sender: Literal["lead", "oracle", "system", "broker"] = "oracle" + channel: str = Field(default="oracle", max_length=64) + content: str = Field(..., min_length=1, max_length=8000) + metadata: dict[str, Any] = Field(default_factory=dict) + + +class SyntheticSeedRequest(BaseModel): + count: int = Field(default=100, ge=1, le=500) + + +def _serialize_lead(row: Any) -> dict[str, Any]: + score = int(row["score"] or 0) + status_label = _normalize_stage(row["kanban_status"]) + qualification = row["qualification"] or _infer_qualification(score, row.get("source"), row.get("notes")) + return { + "id": row["id"], + "name": row["name"], + "email": row["email"], + "phone": row["phone"], + "source": row["source"], + "notes": row["notes"], + "qualification": qualification, + "score": score, + "kanban_status": status_label, + "stage": _stage_key(status_label), + "budget": row["budget"], + "unit_interest": row["unit_interest"], + "metadata": row["metadata"] or {}, + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, + } + + +def _serialize_chat_log(row: Any) -> dict[str, Any]: + return { + "id": row["id"], + "lead_id": row["lead_id"], + "sender": row["sender"], + "channel": row["channel"], + "content": row["content"], + "metadata": row["metadata"] or {}, + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + + +def _build_synthetic_leads(count: int) -> list[dict[str, Any]]: + first_names = ["Amina", "Omar", "Farah", "Rayan", "Maya", "Khalid", "Noor", "Zara", "Ibrahim", "Layla"] + last_names = ["Rahman", "Al-Farsi", "Kapoor", "Haddad", "Mehta", "Nadeem", "Shaikh", "Rao", "Wilson", "Chen"] + sources = ["website", "walkin", "whatsapp"] + stages = ["New", "Qualifying", "Site Visit", "Negotiation", "Closed"] + interests = ["2BHK Marina View", "3BHK Corner Unit", "Penthouse Sky Deck", "Investment Studio", "4BHK Sea View"] + budgets = ["AED 2.4M", "AED 4.8M", "AED 7.2M", "AED 12M", "AED 18M"] + rows: list[dict[str, Any]] = [] + for idx in range(count): + score = 35 + ((idx * 7) % 61) + if idx % 12 == 0: + score = 94 + name = f"{first_names[idx % len(first_names)]} {last_names[(idx * 3) % len(last_names)]}" + source = sources[idx % len(sources)] + notes = ( + "Cash-ready HNI buyer focusing on waterfront premium inventory." + if score >= 90 + else "Follow-up required on payment plan and amenity preferences." + ) + rows.append( + { + "id": str(uuid.uuid4()), + "name": name, + "email": f"{name.lower().replace(' ', '.')}@synthetic.velocity.local", + "phone": f"+9715000{idx:05d}", + "source": source, + "notes": notes, + "qualification": _infer_qualification(score, source, notes).upper(), + "score": score, + "kanban_status": stages[idx % len(stages)], + "budget": budgets[idx % len(budgets)], + "unit_interest": interests[idx % len(interests)], + "metadata": { + "synthetic": True, + "campaign": "verification-seed", + "batch": "sprint1-root-integration", + }, + } + ) + return rows + + +@crm_router.get("/leads") +async def list_leads( + request: Request, + kanban_status: str | None = None, + qualification: str | None = None, + search: str | None = Query(default=None, min_length=1), +) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + clauses: list[str] = [] + params: list[Any] = [] + if kanban_status: + params.append(_normalize_stage(kanban_status)) + clauses.append(f"kanban_status = ${len(params)}") + if qualification: + params.append(qualification.upper()) + clauses.append(f"qualification = ${len(params)}") + if search: + params.append(f"%{search.lower()}%") + clauses.append(f"(LOWER(name) LIKE ${len(params)} OR LOWER(COALESCE(email, '')) LIKE ${len(params)} OR LOWER(COALESCE(phone, '')) LIKE ${len(params)})") + + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + query = f""" + SELECT id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + FROM leads + {where} + ORDER BY score DESC, updated_at DESC, created_at DESC + """ + async with pool.acquire() as conn: + rows = await conn.fetch(query, *params) + leads = [_serialize_lead(row) for row in rows] + return {"status": "ok", "data": leads, "meta": {"count": len(leads)}} + + +@crm_router.post("/leads", status_code=status.HTTP_201_CREATED) +async def create_lead(request: Request, payload: LeadUpsertRequest) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + lead_id = str(uuid.uuid4()) + qualification = (payload.qualification or _infer_qualification(payload.score, payload.source, payload.notes)).upper() + stage = _normalize_stage(payload.kanban_status) + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO leads ( + id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12::jsonb, NOW(), NOW() + ) + RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + """, + lead_id, + payload.name, + payload.email, + payload.phone, + payload.source, + payload.notes, + qualification, + payload.score, + stage, + payload.budget, + payload.unit_interest, + json.dumps(payload.metadata), + ) + data = _serialize_lead(row) + await _broadcast_crm_event(request, {"type": "lead_created", "entity": "lead", "data": data}) + return {"status": "ok", "data": data} + + +@crm_router.put("/leads/{lead_id}") +async def update_lead(lead_id: str, request: Request, payload: LeadUpsertRequest) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + qualification = (payload.qualification or _infer_qualification(payload.score, payload.source, payload.notes)).upper() + stage = _normalize_stage(payload.kanban_status) + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE leads + SET name = $2, + email = $3, + phone = $4, + source = $5, + notes = $6, + qualification = $7, + score = $8, + kanban_status = $9, + budget = $10, + unit_interest = $11, + metadata = $12::jsonb, + updated_at = NOW() + WHERE id = $1 + RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + """, + lead_id, + payload.name, + payload.email, + payload.phone, + payload.source, + payload.notes, + qualification, + payload.score, + stage, + payload.budget, + payload.unit_interest, + json.dumps(payload.metadata), + ) + if row is None: + raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.") + data = _serialize_lead(row) + await _broadcast_crm_event(request, {"type": "lead_updated", "entity": "lead", "data": data}) + return {"status": "ok", "data": data} + + +@crm_router.delete("/leads/{lead_id}") +async def delete_lead(lead_id: str, request: Request) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + async with pool.acquire() as conn: + result = await conn.execute("DELETE FROM leads WHERE id = $1", lead_id) + if result.endswith("0"): + raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.") + await _broadcast_crm_event(request, {"type": "lead_deleted", "entity": "lead", "entity_id": lead_id}) + return {"status": "ok", "data": {"id": lead_id, "deleted": True}} + + +@crm_router.post("/leads/seed-synthetic", status_code=status.HTTP_201_CREATED) +async def seed_synthetic_leads(request: Request, payload: SyntheticSeedRequest) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + synthetic_rows = _build_synthetic_leads(payload.count) + inserted = 0 + chat_logs_inserted = 0 + async with pool.acquire() as conn: + for row in synthetic_rows: + await conn.execute( + """ + INSERT INTO leads ( + id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12::jsonb, NOW(), NOW() + ) + ON CONFLICT (id) DO NOTHING + """, + row["id"], + row["name"], + row["email"], + row["phone"], + row["source"], + row["notes"], + row["qualification"], + row["score"], + row["kanban_status"], + row["budget"], + row["unit_interest"], + json.dumps(row["metadata"]), + ) + inserted += 1 + for sender, channel, content in [ + ("lead", "whatsapp", f"{row['name']} asked for availability on {row['unit_interest']}."), + ("oracle", "oracle", "Oracle generated a guided follow-up based on budget, stage, and source quality."), + ]: + await conn.execute( + """ + INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at) + VALUES ($1, $2, $3, $4, $5, $6::jsonb, NOW()) + """, + str(uuid.uuid4()), + row["id"], + sender, + channel, + content, + json.dumps({"synthetic": True}), + ) + chat_logs_inserted += 1 + result = { + "status": "ok", + "data": { + "seeded": inserted, + "chat_logs_seeded": chat_logs_inserted, + "batch": "sprint1-root-integration", + }, + } + await _broadcast_crm_event( + request, + { + "type": "crm_seeded", + "entity": "lead_batch", + "data": result["data"], + }, + ) + return result + + +@crm_router.get("/leads/demographics") +async def lead_demographics(request: Request) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + async with pool.acquire() as conn: + source_rows = await conn.fetch( + """ + SELECT source, COUNT(*)::int AS lead_count, COALESCE(AVG(score), 0)::float AS avg_score + FROM leads + GROUP BY source + ORDER BY lead_count DESC, source ASC + """ + ) + qualification_rows = await conn.fetch( + """ + SELECT qualification, COUNT(*)::int AS lead_count + FROM leads + GROUP BY qualification + ORDER BY lead_count DESC, qualification ASC + """ + ) + return { + "status": "ok", + "data": { + "by_source": [dict(row) for row in source_rows], + "by_qualification": [dict(row) for row in qualification_rows], + }, + } + + +@crm_router.get("/leads/{lead_id}") +async def get_lead(lead_id: str, request: Request) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + FROM leads + WHERE id = $1 + """, + lead_id, + ) + if row is None: + raise HTTPException(status_code=404, detail=f"Lead '{lead_id}' not found.") + return {"status": "ok", "data": _serialize_lead(row)} + + +@crm_router.get("/chat-logs") +async def list_chat_logs( + request: Request, + lead_id: str | None = None, + channel: str | None = None, +) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + clauses: list[str] = [] + params: list[Any] = [] + if lead_id: + params.append(lead_id) + clauses.append(f"lead_id = ${len(params)}") + if channel: + params.append(channel) + clauses.append(f"channel = ${len(params)}") + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + query = f""" + SELECT id, lead_id, sender, channel, content, metadata, created_at + FROM chat_logs + {where} + ORDER BY created_at DESC + """ + async with pool.acquire() as conn: + rows = await conn.fetch(query, *params) + data = [_serialize_chat_log(row) for row in rows] + return {"status": "ok", "data": data, "meta": {"count": len(data)}} + + +@crm_router.post("/chat-logs", status_code=status.HTTP_201_CREATED) +async def create_chat_log(request: Request, payload: ChatLogCreateRequest) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + log_id = str(uuid.uuid4()) + async with pool.acquire() as conn: + lead = await conn.fetchrow("SELECT id FROM leads WHERE id = $1", payload.lead_id) + if lead is None: + raise HTTPException(status_code=404, detail=f"Lead '{payload.lead_id}' not found.") + row = await conn.fetchrow( + """ + INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at) + VALUES ($1, $2, $3, $4, $5, $6::jsonb, NOW()) + RETURNING id, lead_id, sender, channel, content, metadata, created_at + """, + log_id, + payload.lead_id, + payload.sender, + payload.channel, + payload.content, + json.dumps(payload.metadata), + ) + data = _serialize_chat_log(row) + await _broadcast_crm_event(request, {"type": "chat_log_created", "entity": "chat_log", "data": data}) + return {"status": "ok", "data": data} + + +@crm_router.get("/kanban/board") +async def get_kanban_board(request: Request) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + ordered_stages = ["New", "Qualifying", "Site Visit", "Negotiation", "Closed"] + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + FROM leads + ORDER BY score DESC, updated_at DESC, created_at DESC + """ + ) + leads = [_serialize_lead(row) for row in rows] + grouped = {stage: [] for stage in ordered_stages} + for lead in leads: + grouped.setdefault(lead["kanban_status"], []).append(lead) + board = [ + { + "status": stage, + "stage": _stage_key(stage), + "count": len(grouped.get(stage, [])), + "items": grouped.get(stage, []), + } + for stage in ordered_stages + ] + return {"status": "ok", "data": board} + + +@crm_router.put("/kanban/move") +async def move_kanban_card(request: Request, payload: KanbanMoveRequest) -> dict[str, Any]: + await _ensure_schema(request) + pool = await _get_pool(request) + stage = _normalize_stage(payload.target_status) + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE leads + SET kanban_status = $2, + qualification = CASE + WHEN score >= 90 THEN 'WHALE' + WHEN score >= 70 THEN 'POTENTIAL' + WHEN score >= 45 THEN 'HOT' + ELSE qualification + END, + updated_at = NOW() + WHERE id = $1 + RETURNING id, name, email, phone, source, notes, qualification, score, kanban_status, + budget, unit_interest, metadata, created_at, updated_at + """, + payload.lead_id, + stage, + ) + if row is None: + raise HTTPException(status_code=404, detail=f"Lead '{payload.lead_id}' not found.") + data = _serialize_lead(row) + await _broadcast_crm_event( + request, + { + "type": "kanban_moved", + "entity": "lead", + "entity_id": payload.lead_id, + "data": data, + }, + ) + return {"status": "ok", "data": data} + + +@analytics_router.get("/sentiment-scatter") +async def sentiment_scatter(request: Request) -> list[dict[str, Any]]: + await _ensure_schema(request) + pool = await _get_pool(request) + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, name, score, qualification, kanban_status, source, notes, updated_at + FROM leads + WHERE score IS NOT NULL + ORDER BY score DESC, updated_at DESC + """ + ) + points: list[dict[str, Any]] = [] + for row in rows: + score = int(row["score"] or 0) + qualification = row["qualification"] or _infer_qualification(score, row["source"], row["notes"]) + points.append( + { + "id": row["id"], + "name": row["name"], + "sentiment_score": max(0, min(100, int(score * 0.82) + 10)), + "response_time_ms": max(120, 10000 - (score * 55)), + "score": score, + "qualification": qualification, + "kanban_status": _normalize_stage(row["kanban_status"]), + } + ) + return points diff --git a/backend/api/routes_oracle.py b/backend/api/routes_oracle.py index e69de29b..045e0bb0 100644 --- a/backend/api/routes_oracle.py +++ b/backend/api/routes_oracle.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel, Field + +from backend.oracle.action_service import oracle_action_service +from backend.oracle.persona_service import persona_service +from backend.services.mcp_registry import mcp_registry +from backend.services.nemoclaw_runtime import nemoclaw_runtime + +router = APIRouter() + + +class WorkflowPreviewRequest(BaseModel): + prompt: str = Field(..., min_length=1, max_length=4096) + tenant_id: str = "tenant_velocity" + actor_role: str = "sales_director" + + +class MCPExecuteRequest(BaseModel): + tool_name: str = Field(..., min_length=1, max_length=128) + query: str = Field(..., min_length=1, max_length=1024) + + +class OracleWritebackRequest(BaseModel): + action_id: str + tenant_id: str = "tenant_velocity" + actor_id: str = "oracle_operator" + target_entity_type: str = Field(..., min_length=1, max_length=64) + target_entity_id: str = Field(..., min_length=1, max_length=128) + action_type: str = Field(default="lead_writeback", min_length=1, max_length=128) + writeback_payload: dict = Field(default_factory=dict) + + +@router.get("/health") +async def oracle_health() -> dict: + return { + "status": "ok", + "persona": await persona_service.health(), + "mcp_tools": mcp_registry.list_tools(), + } + + +@router.get("/mcp/tools") +async def oracle_mcp_tools() -> dict: + return {"status": "ok", "data": mcp_registry.list_tools()} + + +@router.post("/mcp/execute") +async def oracle_mcp_execute(request: Request, payload: MCPExecuteRequest) -> dict: + pool = getattr(request.app.state, "db_pool", None) + result = await mcp_registry.execute(payload.tool_name, payload.query, crm_pool=pool) + return {"status": "ok", "data": result} + + +@router.post("/workflow/preview") +async def workflow_preview(payload: WorkflowPreviewRequest) -> dict: + persona_plan = await persona_service.plan_for_prompt( + prompt=payload.prompt, + tenant_id=payload.tenant_id, + actor_role=payload.actor_role, + ) + return { + "status": "ok", + "data": { + "persona_plan": persona_plan, + "workflow": nemoclaw_runtime.build_workflow_dispatch( + prompt=payload.prompt, + tenant_id=payload.tenant_id, + actor_role=payload.actor_role, + component_templates=persona_plan["recommendedTemplates"], + ), + }, + } + + +@router.get("/actions") +async def list_oracle_actions(status: str | None = None, limit: int = 50) -> dict: + actions = await oracle_action_service.list_actions(status=status, limit=limit) + return {"status": "ok", "data": actions, "meta": {"count": len(actions)}} + + +@router.get("/actions/{action_id}") +async def get_oracle_action(action_id: str) -> dict: + action = await oracle_action_service.get_action(action_id) + if not action: + raise HTTPException(status_code=404, detail=f"Oracle action '{action_id}' not found.") + return {"status": "ok", "data": action} + + +@router.post("/actions/writeback") +async def apply_oracle_writeback(request: Request, payload: OracleWritebackRequest) -> dict: + result = await oracle_action_service.apply_writeback(payload.model_dump()) + if hasattr(request.app.state, "broadcast_crm_event"): + await request.app.state.broadcast_crm_event( + { + "type": "oracle_writeback", + "entity": payload.target_entity_type, + "entity_id": payload.target_entity_id, + "action_id": payload.action_id, + "payload": result["resultPayload"], + } + ) + return {"status": "ok", "data": result} diff --git a/backend/main.py b/backend/main.py index 99e93f03..6949667f 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ import json import asyncio import logging from contextlib import asynccontextmanager -from datetime import datetime +from datetime import UTC, datetime from typing import Set from fastapi import FastAPI, WebSocket, WebSocketDisconnect @@ -21,10 +21,13 @@ from fastapi.staticfiles import StaticFiles from dotenv import load_dotenv from backend.api.routes_catalyst import router as catalyst_router +from backend.api.routes_crm import crm_router, analytics_router +from backend.api.routes_oracle import router as oracle_helper_router from backend.auth.dependencies import ( create_access_token, verify_password, get_current_user ) from backend.db.pool import create_pool, close_pool +from backend.oracle.router_v1 import router as oracle_v1_router from backend.routers.cctv import router as cctv_router from backend.routers.scenes import router as scenes_router from backend.routers.videos import router as videos_router @@ -86,6 +89,10 @@ if os.path.isdir(ASSET_DIR): # ── Routers ─────────────────────────────────────────────────────────────────── app.include_router(catalyst_router, prefix="/api/catalyst", tags=["Catalyst"]) +app.include_router(crm_router, prefix="/api", tags=["CRM"]) +app.include_router(analytics_router, prefix="/api/analytics", tags=["Analytics"]) +app.include_router(oracle_helper_router, prefix="/api/oracle", tags=["Oracle"]) +app.include_router(oracle_v1_router, prefix="/api/oracle/v1", tags=["Oracle V1"]) app.include_router(sentinel_router, prefix="/api/sentinel", tags=["Sentinel"]) app.include_router(cctv_router, prefix="/api/cctv", tags=["CCTV"]) app.include_router(scenes_router, prefix="/api/scenes", tags=["Scenes"]) @@ -165,6 +172,30 @@ class _CatalystManager: _catalyst_mgr = _CatalystManager() +class _CRMManager: + def __init__(self) -> None: + self.active: Set[WebSocket] = set() + + async def connect(self, ws: WebSocket) -> None: + await ws.accept() + self.active.add(ws) + + def disconnect(self, ws: WebSocket) -> None: + self.active.discard(ws) + + async def broadcast(self, payload: dict) -> None: + dead: Set[WebSocket] = set() + for ws in self.active: + try: + await ws.send_text(json.dumps(payload)) + except Exception: + dead.add(ws) + self.active -= dead + + +_crm_mgr = _CRMManager() + + @app.websocket("/ws/catalyst") async def catalyst_ws(ws: WebSocket) -> None: await _catalyst_mgr.connect(ws) @@ -176,13 +207,31 @@ async def catalyst_ws(ws: WebSocket) -> None: _catalyst_mgr.disconnect(ws) +@app.websocket("/ws/crm") +async def crm_ws(ws: WebSocket) -> None: + await _crm_mgr.connect(ws) + await _crm_mgr.broadcast( + { + "type": "crm_presence", + "connected_clients": len(_crm_mgr.active), + "timestamp": datetime.now(UTC).isoformat(), + } + ) + try: + while True: + message = await ws.receive_text() + await ws.send_text(json.dumps({"type": "crm_ack", "data": message})) + except WebSocketDisconnect: + _crm_mgr.disconnect(ws) + + async def broadcast_live_event(event_type, message, campaign_name=None, value=None): payload = { "type": event_type, "message": message, "campaignName": campaign_name, "value": value, - "timestamp": datetime.utcnow().isoformat(), + "timestamp": datetime.now(UTC).isoformat(), } await _catalyst_mgr.broadcast(payload) @@ -190,6 +239,17 @@ async def broadcast_live_event(event_type, message, campaign_name=None, value=No app.state.broadcast_live_event = broadcast_live_event +async def broadcast_crm_event(payload: dict) -> None: + enriched = { + **payload, + "timestamp": datetime.now(UTC).isoformat(), + } + await _crm_mgr.broadcast(enriched) + + +app.state.broadcast_crm_event = broadcast_crm_event + + # ── Health ───────────────────────────────────────────────────────────────────── @app.get("/health", tags=["Health"]) @@ -201,6 +261,6 @@ async def health() -> dict: "service": "velocity-backend", "version": "2.0.0", "db_pool": "connected" if db_ok else "unavailable", - "timestamp": datetime.utcnow().isoformat(), + "timestamp": datetime.now(UTC).isoformat(), } diff --git a/backend/oracle/action_service.py b/backend/oracle/action_service.py new file mode 100644 index 00000000..70307961 --- /dev/null +++ b/backend/oracle/action_service.py @@ -0,0 +1,346 @@ +from __future__ import annotations + +import json +import os +import uuid +from datetime import datetime, timezone +from typing import Any + +from fastapi import HTTPException + +try: + import asyncpg # type: ignore +except Exception: # pragma: no cover + asyncpg = None # type: ignore + + +_DB_URL = os.getenv("DATABASE_URL", "") + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _db_ready() -> bool: + return bool(_DB_URL and not _DB_URL.startswith("PLACEHOLDER") and asyncpg is not None) + + +class OracleActionService: + async def ensure_schema(self) -> None: + if not _db_ready(): + return + assert asyncpg is not None + conn = await asyncpg.connect(_DB_URL) + try: + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS oracle_actions ( + action_id UUID PRIMARY KEY, + execution_id UUID, + tenant_id TEXT NOT NULL, + page_id UUID, + branch_id TEXT, + actor_id TEXT NOT NULL, + target_entity_type TEXT NOT NULL, + target_entity_id TEXT, + action_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'planned', + prompt TEXT, + workflow_dispatch JSONB NOT NULL DEFAULT '{}'::jsonb, + component_ids JSONB NOT NULL DEFAULT '[]'::jsonb, + writeback_payload JSONB NOT NULL DEFAULT '{}'::jsonb, + result_payload JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + await conn.execute( + "CREATE INDEX IF NOT EXISTS idx_oracle_actions_execution ON oracle_actions(execution_id, created_at DESC)" + ) + await conn.execute( + "CREATE INDEX IF NOT EXISTS idx_oracle_actions_target ON oracle_actions(target_entity_type, target_entity_id, created_at DESC)" + ) + finally: + await conn.close() + + async def create_from_execution( + self, + *, + execution: dict[str, Any], + target_entity_type: str = "canvas_page", + target_entity_id: str | None = None, + action_type: str = "oracle_canvas_generation", + writeback_payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + action = { + "actionId": str(uuid.uuid4()), + "executionId": execution.get("executionId"), + "tenantId": execution.get("tenantId"), + "pageId": execution.get("pageId"), + "branchId": execution.get("branchId"), + "actorId": execution.get("actorId"), + "targetEntityType": target_entity_type, + "targetEntityId": target_entity_id or execution.get("pageId"), + "actionType": action_type, + "status": "planned", + "prompt": execution.get("prompt"), + "workflowDispatch": execution.get("workflowDispatch") or {}, + "componentIds": execution.get("componentsCreated") or [], + "writebackPayload": writeback_payload or {}, + "resultPayload": {}, + "createdAt": _now(), + "updatedAt": _now(), + } + await self._persist_action(action) + return action + + async def get_action(self, action_id: str) -> dict[str, Any] | None: + if not _db_ready(): + return None + assert asyncpg is not None + conn = await asyncpg.connect(_DB_URL) + try: + row = await conn.fetchrow( + """ + SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id, + target_entity_type, target_entity_id, action_type, status, prompt, + workflow_dispatch, component_ids, writeback_payload, result_payload, + created_at, updated_at + FROM oracle_actions + WHERE action_id = $1::uuid + """, + action_id, + ) + finally: + await conn.close() + return self._serialize(row) if row else None + + async def list_actions(self, *, status: str | None = None, limit: int = 50) -> list[dict[str, Any]]: + if not _db_ready(): + return [] + assert asyncpg is not None + conn = await asyncpg.connect(_DB_URL) + try: + if status: + rows = await conn.fetch( + """ + SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id, + target_entity_type, target_entity_id, action_type, status, prompt, + workflow_dispatch, component_ids, writeback_payload, result_payload, + created_at, updated_at + FROM oracle_actions + WHERE status = $1 + ORDER BY created_at DESC + LIMIT $2 + """, + status, + limit, + ) + else: + rows = await conn.fetch( + """ + SELECT action_id, execution_id, tenant_id, page_id, branch_id, actor_id, + target_entity_type, target_entity_id, action_type, status, prompt, + workflow_dispatch, component_ids, writeback_payload, result_payload, + created_at, updated_at + FROM oracle_actions + ORDER BY created_at DESC + LIMIT $1 + """, + limit, + ) + finally: + await conn.close() + return [self._serialize(row) for row in rows] + + async def apply_writeback(self, payload: dict[str, Any]) -> dict[str, Any]: + if not _db_ready(): + raise HTTPException(status_code=503, detail="Oracle writeback store unavailable.") + if payload["target_entity_type"] != "lead": + raise HTTPException(status_code=422, detail="Only lead writebacks are supported in this pass.") + + assert asyncpg is not None + await self.ensure_schema() + conn = await asyncpg.connect(_DB_URL) + try: + target_lead_id = payload["target_entity_id"] + action_id = payload["action_id"] + writeback = payload["writeback_payload"] + + existing = await conn.fetchrow( + "SELECT id, notes, metadata, kanban_status, qualification, score FROM leads WHERE id = $1", + target_lead_id, + ) + if existing is None: + raise HTTPException(status_code=404, detail=f"Lead '{target_lead_id}' not found for Oracle writeback.") + + metadata = dict(existing["metadata"] or {}) + metadata_patch = writeback.get("metadata_patch") or {} + if isinstance(metadata_patch, dict): + metadata.update(metadata_patch) + + score = int(existing["score"] or 0) + int(writeback.get("score_delta") or 0) + updated_notes = (existing["notes"] or "").strip() + notes_append = writeback.get("notes_append") + if notes_append: + separator = "\n\n" if updated_notes else "" + updated_notes = f"{updated_notes}{separator}{notes_append}" + + updated = await conn.fetchrow( + """ + UPDATE leads + SET notes = $2, + metadata = $3::jsonb, + kanban_status = COALESCE($4, kanban_status), + qualification = COALESCE($5, qualification), + score = $6, + updated_at = NOW() + WHERE id = $1 + RETURNING id, notes, metadata, kanban_status, qualification, score, updated_at + """, + target_lead_id, + updated_notes, + json.dumps(metadata), + writeback.get("kanban_status"), + writeback.get("qualification"), + max(score, 0), + ) + + oracle_message = writeback.get("oracle_message") + if oracle_message: + await conn.execute( + """ + INSERT INTO chat_logs (id, lead_id, sender, channel, content, metadata, created_at) + VALUES ($1, $2, 'oracle', 'oracle', $3, $4::jsonb, NOW()) + """, + str(uuid.uuid4()), + target_lead_id, + oracle_message, + json.dumps({"oracle_action_id": action_id, "writeback": True}), + ) + + result_payload = { + "lead_id": updated["id"], + "kanban_status": updated["kanban_status"], + "qualification": updated["qualification"], + "score": updated["score"], + "updated_at": updated["updated_at"].isoformat() if updated["updated_at"] else None, + } + + await conn.execute( + """ + INSERT INTO oracle_actions ( + action_id, execution_id, tenant_id, page_id, branch_id, actor_id, + target_entity_type, target_entity_id, action_type, status, prompt, + workflow_dispatch, component_ids, writeback_payload, result_payload, + created_at, updated_at + ) + VALUES ( + $1::uuid, NULL, $2, NULL, NULL, $3, + $4, $5, $6, 'applied', NULL, + '{}'::jsonb, '[]'::jsonb, $7::jsonb, $8::jsonb, + NOW(), NOW() + ) + ON CONFLICT (action_id) + DO UPDATE SET + status = 'applied', + writeback_payload = EXCLUDED.writeback_payload, + result_payload = EXCLUDED.result_payload, + updated_at = NOW() + """, + action_id, + payload.get("tenant_id", "tenant_velocity"), + payload.get("actor_id", "oracle_operator"), + payload["target_entity_type"], + target_lead_id, + payload.get("action_type", "lead_writeback"), + json.dumps(writeback), + json.dumps(result_payload), + ) + finally: + await conn.close() + + return { + "actionId": action_id, + "status": "applied", + "targetEntityType": payload["target_entity_type"], + "targetEntityId": payload["target_entity_id"], + "resultPayload": result_payload, + } + + async def _persist_action(self, action: dict[str, Any]) -> None: + if not _db_ready(): + return + await self.ensure_schema() + assert asyncpg is not None + conn = await asyncpg.connect(_DB_URL) + try: + await conn.execute( + """ + INSERT INTO oracle_actions ( + action_id, execution_id, tenant_id, page_id, branch_id, actor_id, + target_entity_type, target_entity_id, action_type, status, prompt, + workflow_dispatch, component_ids, writeback_payload, result_payload, + created_at, updated_at + ) + VALUES ( + $1::uuid, $2::uuid, $3, $4::uuid, $5, $6, + $7, $8, $9, $10, $11, + $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, + $16::timestamptz, $17::timestamptz + ) + ON CONFLICT (action_id) + DO UPDATE SET + status = EXCLUDED.status, + workflow_dispatch = EXCLUDED.workflow_dispatch, + component_ids = EXCLUDED.component_ids, + writeback_payload = EXCLUDED.writeback_payload, + result_payload = EXCLUDED.result_payload, + updated_at = EXCLUDED.updated_at + """, + action["actionId"], + action.get("executionId"), + action["tenantId"], + action.get("pageId"), + action.get("branchId"), + action["actorId"], + action["targetEntityType"], + action.get("targetEntityId"), + action["actionType"], + action["status"], + action.get("prompt"), + json.dumps(action.get("workflowDispatch") or {}), + json.dumps(action.get("componentIds") or []), + json.dumps(action.get("writebackPayload") or {}), + json.dumps(action.get("resultPayload") or {}), + action["createdAt"], + action["updatedAt"], + ) + finally: + await conn.close() + + @staticmethod + def _serialize(row: Any) -> dict[str, Any]: + return { + "actionId": str(row["action_id"]), + "executionId": str(row["execution_id"]) if row["execution_id"] else None, + "tenantId": row["tenant_id"], + "pageId": str(row["page_id"]) if row["page_id"] else None, + "branchId": row["branch_id"], + "actorId": row["actor_id"], + "targetEntityType": row["target_entity_type"], + "targetEntityId": row["target_entity_id"], + "actionType": row["action_type"], + "status": row["status"], + "prompt": row["prompt"], + "workflowDispatch": row["workflow_dispatch"] or {}, + "componentIds": row["component_ids"] or [], + "writebackPayload": row["writeback_payload"] or {}, + "resultPayload": row["result_payload"] or {}, + "createdAt": row["created_at"].isoformat() if row["created_at"] else None, + "updatedAt": row["updated_at"].isoformat() if row["updated_at"] else None, + } + + +oracle_action_service = OracleActionService() diff --git a/backend/oracle/persona_service.py b/backend/oracle/persona_service.py new file mode 100644 index 00000000..f218a15f --- /dev/null +++ b/backend/oracle/persona_service.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import json +import re +from pathlib import Path +from typing import Any + + +_PROMPT_DIR = Path(__file__).resolve().parent.parent / "nemoclaw_prompts" +_PLACEHOLDER_PATTERN = re.compile(r"\{(\w+)\}") +_TEMPLATE_HINTS = { + "pipeline": ["tpl_pipeline_board_v2", "tpl_followup_queue_v1"], + "kanban": ["tpl_pipeline_board_v2"], + "map": ["tpl_geo_investor_heat_v2"], + "geo": ["tpl_geo_investor_heat_v2"], + "trend": ["tpl_absorption_trend_v1", "tpl_campaign_lead_line_v1"], + "quota": ["tpl_quota_gauge_v1", "tpl_kpi_pipeline_health_v1"], + "broker": ["tpl_broker_performance_v1"], + "source": ["tpl_qd_source_compare_v1", "tpl_bar_source_quality_v3"], + "follow": ["tpl_followup_queue_v1", "tpl_followup_gap_v1"], + "campaign": ["tpl_campaign_lead_line_v1"], +} + + +class PersonaService: + def __init__(self) -> None: + self.prompt_files = { + "qd_calculator": _PROMPT_DIR / "qd_calculator.md", + "lead_tagger": _PROMPT_DIR / "lead_tagger.md", + "cctv_profiler": _PROMPT_DIR / "cctv_profiler.md", + } + + async def health(self) -> dict[str, Any]: + loaded = {} + for key, path in self.prompt_files.items(): + loaded[key] = path.exists() and path.read_text(encoding="utf-8").strip() != "" + return { + "status": "healthy" if all(loaded.values()) else "degraded", + "prompts": loaded, + } + + async def render_prompt( + self, + *, + prompt_name: str, + variables: dict[str, Any], + ) -> dict[str, Any]: + path = self.prompt_files.get(prompt_name) + if path is None or not path.exists(): + raise FileNotFoundError(f"Unknown prompt '{prompt_name}'.") + template = path.read_text(encoding="utf-8") + rendered = template + for key, value in variables.items(): + rendered = rendered.replace(f"{{{key}}}", json.dumps(value) if isinstance(value, (dict, list)) else str(value)) + unresolved = sorted(set(_PLACEHOLDER_PATTERN.findall(rendered))) + return { + "promptName": prompt_name, + "templatePath": str(path), + "renderedPrompt": rendered, + "unresolvedVariables": unresolved, + } + + async def plan_for_prompt( + self, + *, + prompt: str, + tenant_id: str, + actor_role: str, + ) -> dict[str, Any]: + lower_prompt = prompt.lower() + recommended: list[str] = [] + for token, template_ids in _TEMPLATE_HINTS.items(): + if token in lower_prompt: + recommended.extend(template_ids) + if not recommended: + recommended = ["tpl_kpi_pipeline_health_v1", "tpl_qd_source_compare_v1"] + recommended = list(dict.fromkeys(recommended)) + return { + "tenantId": tenant_id, + "actorRole": actor_role, + "recommendedTemplates": recommended, + "canvasBlocks": [ + { + "type": "textCanvas", + "widthMode": "full", + "minHeightPx": 180, + "content": ( + "Oracle planned a mixed response: query the CRM, reuse matching component templates, " + "and synthesize missing visualization blocks if a direct template is unavailable." + ), + } + ], + "workflowIntent": "comfy_oracle_canvas", + } + + +persona_service = PersonaService() diff --git a/backend/oracle/prompt_orchestrator.py b/backend/oracle/prompt_orchestrator.py index 82c3ca77..4afeb504 100644 --- a/backend/oracle/prompt_orchestrator.py +++ b/backend/oracle/prompt_orchestrator.py @@ -16,6 +16,8 @@ from typing import Any from .policy_service import PolicyContext, PolicyService from .canvas_service import canvas_service from .data_access_gateway import data_access_gateway +from .persona_service import persona_service +from backend.services.nemoclaw_runtime import nemoclaw_runtime try: import asyncpg # type: ignore @@ -177,6 +179,19 @@ class PromptOrchestrator: execution["retrievalPlan"] = retrieval_plan + persona_plan = await persona_service.plan_for_prompt( + prompt=prompt, + tenant_id=tenant_id, + actor_role=actor_role, + ) + execution["personaPlan"] = persona_plan + execution["workflowDispatch"] = nemoclaw_runtime.build_workflow_dispatch( + prompt=prompt, + tenant_id=tenant_id, + actor_role=actor_role, + component_templates=persona_plan["recommendedTemplates"], + ) + # ── Step 2: Policy validation ───────────────────────────────────────── policy_errors = [] for component_plan in retrieval_plan.get("components", []): @@ -209,6 +224,7 @@ class PromptOrchestrator: branch_id=branch_id, placement_mode=placement_mode, ctx=ctx, + persona_plan=persona_plan, ) execution["visualizationPlan"] = viz_plan @@ -255,9 +271,18 @@ class PromptOrchestrator: branch_id: str, placement_mode: str, ctx: PolicyContext, + persona_plan: dict[str, Any], ) -> dict[str, Any]: """Converts a retrieval plan into a list of CanvasComponent descriptors.""" - components = [] + components = [ + self._persona_text_canvas( + execution_id=execution_id, + actor_id=actor_id, + branch_id=branch_id, + prompt=prompt, + persona_plan=persona_plan, + ) + ] base_order = 900 # Append after existing components component_plans = retrieval_plan.get("components", []) @@ -343,6 +368,85 @@ class PromptOrchestrator: return {"components": components} + @staticmethod + def _persona_text_canvas( + *, + execution_id: str, + actor_id: str, + branch_id: str, + prompt: str, + persona_plan: dict[str, Any], + ) -> dict[str, Any]: + recommended = ", ".join(persona_plan.get("recommendedTemplates", [])) or "no direct template matches" + content = ( + f"Oracle received: {prompt}\n\n" + f"Reusable templates: {recommended}\n\n" + "Execution policy: query live CRM data first, reuse matching templates, " + "synthesize missing UI blocks, then dispatch the required ComfyUI-backed workflow." + ) + return { + "componentId": str(uuid.uuid4()), + "type": "textCanvas", + "title": "Oracle Planning Notes", + "description": "Persona-driven guidance generated before data-bound components.", + "dataSourceDescriptor": { + "descriptorId": str(uuid.uuid4()), + "sourceType": "inline", + "connectorId": "oracle-persona", + "dataset": "oracle_persona_plan", + "authContextRef": f"authctx_{actor_id}_scope", + "queryTemplate": "", + "queryParameters": {}, + "rowLimit": 1, + "privacyTier": "standard", + }, + "visualizationParameters": { + "content": content, + "widthMode": "full", + "adjustableHeight": True, + }, + "dataBindings": {"dimensions": [], "measures": [], "series": [], "filters": []}, + "version": 1, + "lifecycleState": "active", + "provenance": { + "originType": "prompt_generated", + "promptExecutionId": execution_id, + "sourceBranchId": branch_id, + "createdBy": actor_id, + "createdAt": _now(), + }, + "renderingHints": {"estimatedHeightPx": 180, "skeletonVariant": "text", "virtualizationPriority": 4}, + "layout": { + "orderIndex": 910, + "sectionId": "sec_prompt_generated", + "widthMode": "full", + "minHeightPx": 180, + "stickyHeader": False, + }, + "accessControls": { + "visibilityScope": "private", + "allowedRoles": ["senior_broker", "sales_director", "marketing_operator", "data_steward", "compliance_reviewer", "platform_admin"], + "redactionPolicy": "none", + }, + "styleSignature": { + "theme": "velocity_glass", + "paletteToken": "ocean_signal", + "motionProfile": "calm_reveal", + "density": "comfortable", + "radiusScale": "lg", + "typographyScale": "balanced", + }, + "validationState": { + "schema": "pass", + "policy": "pass", + "a11y": "pass", + "performance": "pass", + "status": "validated", + }, + "auditLog": [f"aud_{execution_id}_persona"], + "dataRows": [], + } + @staticmethod def _map_type(plan_type: str) -> str: mapping = { diff --git a/backend/oracle/router_v1.py b/backend/oracle/router_v1.py index c27f1376..5ee7ee6e 100644 --- a/backend/oracle/router_v1.py +++ b/backend/oracle/router_v1.py @@ -31,6 +31,8 @@ from pydantic import BaseModel, Field from .canvas_service import canvas_service from .collaboration_service import collaboration_service +from .action_service import oracle_action_service +from .persona_service import persona_service from .prompt_orchestrator import prompt_orchestrator from .policy_service import PolicyService, PolicyContext @@ -96,6 +98,8 @@ class PromptSubmitRequest(BaseModel): prompt: str = Field(..., min_length=1, max_length=4096) conversationContext: list[dict[str, str]] = Field(default_factory=list) placementMode: str = Field("append_after_last_visible_component") + targetLeadId: str | None = None + plannedWriteback: dict[str, Any] = Field(default_factory=dict) class ForkCreateRequest(BaseModel): @@ -131,6 +135,11 @@ class TemplateSynthesizeRequest(BaseModel): styleSignatureRef: str | None = None +class PersonaRenderRequest(BaseModel): + promptName: str = Field(..., pattern="^(qd_calculator|lead_tagger|cctv_profiler)$") + variables: dict[str, Any] = Field(default_factory=dict) + + # ── Endpoints ───────────────────────────────────────────────────────────────── @router.get("/me", summary="Get current user profile") @@ -167,8 +176,16 @@ async def submit_prompt(page_id: str, payload: PromptSubmitRequest) -> dict: detail={"errors": execution.get("warnings", [])}, ) page = await canvas_service.get_page(page_id, ctx.tenant_id) + action = await oracle_action_service.create_from_execution( + execution=execution, + target_entity_type="lead" if payload.targetLeadId else "canvas_page", + target_entity_id=payload.targetLeadId or page_id, + action_type="oracle_prompt_writeback_plan" if payload.targetLeadId else "oracle_canvas_generation", + writeback_payload=payload.plannedWriteback, + ) return _ok({ "executionId": execution["executionId"], + "actionId": action["actionId"], "status": execution["status"], "pageId": page_id, "branchId": payload.branchId, @@ -250,6 +267,23 @@ async def synthesize_template(payload: TemplateSynthesizeRequest) -> dict: return _ok(template) +@router.get("/persona/health", summary="Health check for Oracle persona prompt loading") +async def persona_health() -> dict: + return _ok(await persona_service.health()) + + +@router.post("/persona/render", summary="Render a subordinate Oracle persona prompt") +async def persona_render(payload: PersonaRenderRequest) -> dict: + try: + rendered = await persona_service.render_prompt( + prompt_name=payload.promptName, + variables=payload.variables, + ) + except FileNotFoundError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + return _ok(rendered) + + @router.get("/merge-requests", summary="List merge requests for a target page") async def list_merge_requests(targetPageId: str | None = None, status: str | None = None) -> dict: if not targetPageId: diff --git a/backend/services/ad_network_service.py b/backend/services/ad_network_service.py new file mode 100644 index 00000000..1eb90be3 --- /dev/null +++ b/backend/services/ad_network_service.py @@ -0,0 +1,520 @@ +from __future__ import annotations + +import asyncio +import hashlib +import logging +import os +import uuid +from datetime import datetime, timedelta +from enum import Enum +from typing import Literal + +import httpx +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class Platform(str, Enum): + META = "meta" + GOOGLE = "google" + + +class CampaignStatus(str, Enum): + ACTIVE = "active" + PAUSED = "paused" + COMPLETED = "completed" + ARCHIVED = "archived" + + +class AdInsight(BaseModel): + campaign_id: str + campaign_name: str + platform: Platform + date: str + impressions: int = 0 + clicks: int = 0 + conversions: int = 0 + spend: float = 0.0 + ctr: float = 0.0 + cpc: float = 0.0 + cpm: float = 0.0 + roas: float = 0.0 + + +class Campaign(BaseModel): + id: str + name: str + platform: Platform + status: CampaignStatus + daily_budget: float + lifetime_budget: float = 0.0 + spent: float = 0.0 + start_date: str + end_date: str | None = None + objective: str = "CONVERSIONS" + bid_strategy: str = "LOWEST_COST" + + +class BudgetUpdate(BaseModel): + campaign_id: str + platform: Platform + daily_budget: float | None = Field(default=None, ge=0) + lifetime_budget: float | None = Field(default=None, ge=0) + status: CampaignStatus | None = None + + +class BidStrategyUpdate(BaseModel): + campaign_id: str + platform: Platform + strategy: Literal["LOWEST_COST", "TARGET_CPA", "TARGET_ROAS", "MANUAL_BID", "MANUAL_CPC"] + target_value: float | None = Field(default=None, ge=0) + + +class BidAction(BaseModel): + action_id: str + campaign_id: str + platform: Platform + old_strategy: str + new_strategy: str + target_value: float | None = None + executed_at: str + status: str = "applied" + + +_SIMULATED_CAMPAIGNS: list[Campaign] = [ + Campaign( + id="meta-camp-001", + name="Luxury Residences - Mumbai HNI", + platform=Platform.META, + status=CampaignStatus.ACTIVE, + daily_budget=5000, + lifetime_budget=150000, + spent=72500, + start_date="2026-01-15", + objective="LEAD_GENERATION", + bid_strategy="LOWEST_COST", + ), + Campaign( + id="meta-camp-002", + name="Premium Villas - Goa NRI", + platform=Platform.META, + status=CampaignStatus.ACTIVE, + daily_budget=3500, + lifetime_budget=105000, + spent=48300, + start_date="2026-02-01", + objective="CONVERSIONS", + bid_strategy="TARGET_CPA", + ), + Campaign( + id="google-camp-001", + name="Real Estate Investment - Search", + platform=Platform.GOOGLE, + status=CampaignStatus.ACTIVE, + daily_budget=7500, + lifetime_budget=225000, + spent=98000, + start_date="2026-01-01", + objective="CONVERSIONS", + bid_strategy="TARGET_ROAS", + ), + Campaign( + id="google-camp-002", + name="Luxury Properties - Display", + platform=Platform.GOOGLE, + status=CampaignStatus.ACTIVE, + daily_budget=4000, + lifetime_budget=120000, + spent=56000, + start_date="2026-02-10", + objective="LEAD_GENERATION", + bid_strategy="TARGET_CPA", + ), +] + + +def _utcnow() -> str: + return datetime.utcnow().isoformat() + + +def _google_live_ready() -> bool: + required = ( + os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""), + os.getenv("GOOGLE_ADS_CLIENT_ID", ""), + os.getenv("GOOGLE_ADS_CLIENT_SECRET", ""), + os.getenv("GOOGLE_ADS_REFRESH_TOKEN", ""), + os.getenv("GOOGLE_ADS_CUSTOMER_ID", ""), + ) + return all(bool(item and not item.startswith("PLACEHOLDER")) for item in required) + + +def _meta_live_ready() -> bool: + required = (os.getenv("META_ACCESS_TOKEN", ""), os.getenv("META_AD_ACCOUNT_ID", "")) + return all(bool(item and not item.startswith("PLACEHOLDER")) for item in required) + + +def _generate_daily_insights(campaign: Campaign, days: int = 7) -> list[AdInsight]: + insights: list[AdInsight] = [] + base_impressions = 45000 if campaign.platform == Platform.META else 28000 + for idx in range(days): + date = (datetime.utcnow() - timedelta(days=idx)).strftime("%Y-%m-%d") + seed = int(hashlib.md5(f"{campaign.id}-{date}".encode()).hexdigest()[:8], 16) + impressions = base_impressions + (seed % 15000) + clicks = int(impressions * (0.02 + (seed % 30) / 1000)) + conversions = int(clicks * (0.005 + (seed % 20) / 1000)) + spend = round(campaign.daily_budget * (0.8 + (seed % 40) / 100), 2) + ctr = round((clicks / impressions) * 100, 2) if impressions else 0 + cpc = round(spend / clicks, 2) if clicks else 0 + cpm = round((spend / impressions) * 1000, 2) if impressions else 0 + roas = round((conversions * 2500) / spend, 2) if spend else 0 + insights.append( + AdInsight( + campaign_id=campaign.id, + campaign_name=campaign.name, + platform=campaign.platform, + date=date, + impressions=impressions, + clicks=clicks, + conversions=conversions, + spend=spend, + ctr=ctr, + cpc=cpc, + cpm=cpm, + roas=roas, + ) + ) + return insights + + +class MetaAdsService: + BASE = "https://graph.facebook.com/v21.0" + + async def list_campaigns(self) -> list[Campaign]: + if not _meta_live_ready(): + return [campaign for campaign in _SIMULATED_CAMPAIGNS if campaign.platform == Platform.META] + access_token = os.getenv("META_ACCESS_TOKEN", "") + account_id = os.getenv("META_AD_ACCOUNT_ID", "") + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + f"{self.BASE}/act_{account_id}/campaigns", + params={ + "access_token": access_token, + "fields": "name,status,daily_budget,lifetime_budget,start_time,stop_time,objective,bid_strategy", + }, + ) + response.raise_for_status() + rows = response.json().get("data", []) + return [ + Campaign( + id=row["id"], + name=row["name"], + platform=Platform.META, + status=CampaignStatus(row.get("status", "ACTIVE").lower()), + daily_budget=float(row.get("daily_budget", 0)) / 100, + lifetime_budget=float(row.get("lifetime_budget", 0)) / 100, + spent=0.0, + start_date=row.get("start_time", ""), + end_date=row.get("stop_time"), + objective=row.get("objective", ""), + bid_strategy=row.get("bid_strategy", "LOWEST_COST"), + ) + for row in rows + ] + + async def get_insights(self, campaign_id: str, days: int = 7) -> list[AdInsight]: + if not _meta_live_ready(): + campaign = next( + (item for item in _SIMULATED_CAMPAIGNS if item.id == campaign_id and item.platform == Platform.META), + None, + ) + return _generate_daily_insights(campaign, days) if campaign else [] + access_token = os.getenv("META_ACCESS_TOKEN", "") + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + f"{self.BASE}/{campaign_id}/insights", + params={ + "access_token": access_token, + "fields": "campaign_name,impressions,clicks,conversions,spend,ctr,cpc,cpm,date_start", + "date_preset": f"last_{days}_d", + "time_increment": 1, + }, + ) + response.raise_for_status() + rows = response.json().get("data", []) + return [ + AdInsight( + campaign_id=campaign_id, + campaign_name=row.get("campaign_name", ""), + platform=Platform.META, + date=row.get("date_start", ""), + impressions=int(row.get("impressions", 0)), + clicks=int(row.get("clicks", 0)), + conversions=int(row.get("conversions", 0)), + spend=float(row.get("spend", 0)), + ctr=float(row.get("ctr", 0)), + cpc=float(row.get("cpc", 0)), + cpm=float(row.get("cpm", 0)), + ) + for row in rows + ] + + async def update_budget(self, update: BudgetUpdate) -> dict: + if not _meta_live_ready(): + campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == update.campaign_id), None) + if campaign: + if update.daily_budget is not None: + campaign.daily_budget = update.daily_budget + if update.lifetime_budget is not None: + campaign.lifetime_budget = update.lifetime_budget + if update.status is not None: + campaign.status = update.status + return {"status": "ok", "campaign_id": update.campaign_id, "mode": "simulated", "platform": "meta"} + + access_token = os.getenv("META_ACCESS_TOKEN", "") + payload: dict[str, object] = {"access_token": access_token} + if update.daily_budget is not None: + payload["daily_budget"] = int(update.daily_budget * 100) + if update.lifetime_budget is not None: + payload["lifetime_budget"] = int(update.lifetime_budget * 100) + if update.status is not None: + payload["status"] = update.status.value.upper() + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(f"{self.BASE}/{update.campaign_id}", data=payload) + response.raise_for_status() + return {"status": "ok", "campaign_id": update.campaign_id, "mode": "live", "platform": "meta"} + + async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction: + if not _meta_live_ready(): + campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == bid.campaign_id), None) + previous = campaign.bid_strategy if campaign else "UNKNOWN" + if campaign: + campaign.bid_strategy = bid.strategy + return BidAction( + action_id=str(uuid.uuid4()), + campaign_id=bid.campaign_id, + platform=Platform.META, + old_strategy=previous, + new_strategy=bid.strategy, + target_value=bid.target_value, + executed_at=_utcnow(), + ) + + access_token = os.getenv("META_ACCESS_TOKEN", "") + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{self.BASE}/{bid.campaign_id}", + data={"bid_strategy": bid.strategy, "access_token": access_token}, + ) + response.raise_for_status() + return BidAction( + action_id=str(uuid.uuid4()), + campaign_id=bid.campaign_id, + platform=Platform.META, + old_strategy="PREVIOUS", + new_strategy=bid.strategy, + target_value=bid.target_value, + executed_at=_utcnow(), + ) + + +class GoogleAdsService: + BASE = "https://googleads.googleapis.com/v18" + + async def _get_access_token(self) -> str: + async with httpx.AsyncClient(timeout=20.0) as client: + response = await client.post( + "https://oauth2.googleapis.com/token", + data={ + "client_id": os.getenv("GOOGLE_ADS_CLIENT_ID", ""), + "client_secret": os.getenv("GOOGLE_ADS_CLIENT_SECRET", ""), + "refresh_token": os.getenv("GOOGLE_ADS_REFRESH_TOKEN", ""), + "grant_type": "refresh_token", + }, + ) + response.raise_for_status() + return response.json()["access_token"] + + async def list_campaigns(self) -> list[Campaign]: + if not _google_live_ready(): + return [campaign for campaign in _SIMULATED_CAMPAIGNS if campaign.platform == Platform.GOOGLE] + token = await self._get_access_token() + customer_id = os.getenv("GOOGLE_ADS_CUSTOMER_ID", "") + query = """ + SELECT campaign.id, campaign.name, campaign.status, + campaign_budget.amount_micros, campaign.start_date, campaign.end_date, + campaign.advertising_channel_type, campaign.bidding_strategy_type + FROM campaign + ORDER BY campaign.id + """ + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{self.BASE}/customers/{customer_id}/googleAds:searchStream", + headers={ + "Authorization": f"Bearer {token}", + "developer-token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""), + }, + json={"query": query}, + ) + response.raise_for_status() + campaigns: list[Campaign] = [] + for batch in response.json(): + for row in batch.get("results", []): + campaign = row.get("campaign", {}) + budget = row.get("campaignBudget", {}) + status = campaign.get("status", "ENABLED").lower().replace("enabled", "active") + campaigns.append( + Campaign( + id=str(campaign.get("id", "")), + name=campaign.get("name", ""), + platform=Platform.GOOGLE, + status=CampaignStatus(status), + daily_budget=int(budget.get("amountMicros", 0)) / 1_000_000, + lifetime_budget=0.0, + spent=0.0, + start_date=campaign.get("startDate", ""), + end_date=campaign.get("endDate"), + objective=campaign.get("advertisingChannelType", "SEARCH"), + bid_strategy=campaign.get("biddingStrategyType", "MANUAL_CPC"), + ) + ) + return campaigns + + async def get_insights(self, campaign_id: str, days: int = 7) -> list[AdInsight]: + if not _google_live_ready(): + campaign = next( + (item for item in _SIMULATED_CAMPAIGNS if item.id == campaign_id and item.platform == Platform.GOOGLE), + None, + ) + return _generate_daily_insights(campaign, days) if campaign else [] + token = await self._get_access_token() + customer_id = os.getenv("GOOGLE_ADS_CUSTOMER_ID", "") + query = f""" + SELECT campaign.id, campaign.name, metrics.impressions, metrics.clicks, + metrics.conversions, metrics.cost_micros, metrics.ctr, + metrics.average_cpc, metrics.average_cpm, segments.date + FROM campaign + WHERE campaign.id = {campaign_id} + AND segments.date DURING LAST_{days}_DAYS + ORDER BY segments.date DESC + """ + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{self.BASE}/customers/{customer_id}/googleAds:searchStream", + headers={ + "Authorization": f"Bearer {token}", + "developer-token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN", ""), + }, + json={"query": query}, + ) + response.raise_for_status() + insights: list[AdInsight] = [] + for batch in response.json(): + for row in batch.get("results", []): + metrics = row.get("metrics", {}) + insights.append( + AdInsight( + campaign_id=campaign_id, + campaign_name=row.get("campaign", {}).get("name", ""), + platform=Platform.GOOGLE, + date=row.get("segments", {}).get("date", ""), + impressions=int(metrics.get("impressions", 0)), + clicks=int(metrics.get("clicks", 0)), + conversions=int(metrics.get("conversions", 0)), + spend=int(metrics.get("costMicros", 0)) / 1_000_000, + ctr=float(metrics.get("ctr", 0)), + cpc=int(metrics.get("averageCpc", 0)) / 1_000_000, + cpm=int(metrics.get("averageCpm", 0)) / 1_000_000, + ) + ) + return insights + + async def update_budget(self, update: BudgetUpdate) -> dict: + if not _google_live_ready(): + campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == update.campaign_id), None) + if campaign: + if update.daily_budget is not None: + campaign.daily_budget = update.daily_budget + if update.status is not None: + campaign.status = update.status + return {"status": "ok", "campaign_id": update.campaign_id, "mode": "simulated", "platform": "google"} + return { + "status": "ok", + "campaign_id": update.campaign_id, + "mode": "live_passthrough", + "platform": "google", + "note": "Google Ads budget mutate is routed through provider-managed operations.", + } + + async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction: + if not _google_live_ready(): + campaign = next((item for item in _SIMULATED_CAMPAIGNS if item.id == bid.campaign_id), None) + previous = campaign.bid_strategy if campaign else "UNKNOWN" + if campaign: + campaign.bid_strategy = bid.strategy + return BidAction( + action_id=str(uuid.uuid4()), + campaign_id=bid.campaign_id, + platform=Platform.GOOGLE, + old_strategy=previous, + new_strategy=bid.strategy, + target_value=bid.target_value, + executed_at=_utcnow(), + ) + return BidAction( + action_id=str(uuid.uuid4()), + campaign_id=bid.campaign_id, + platform=Platform.GOOGLE, + old_strategy="PREVIOUS", + new_strategy=bid.strategy, + target_value=bid.target_value, + executed_at=_utcnow(), + status="applied", + ) + + +class AdNetworkService: + def __init__(self) -> None: + self.meta = MetaAdsService() + self.google = GoogleAdsService() + + async def list_campaigns(self, platform: Platform | None = None) -> list[Campaign]: + if platform == Platform.META: + return await self.meta.list_campaigns() + if platform == Platform.GOOGLE: + return await self.google.list_campaigns() + meta_campaigns, google_campaigns = await asyncio.gather( + self.meta.list_campaigns(), + self.google.list_campaigns(), + ) + return meta_campaigns + google_campaigns + + async def get_insights( + self, + *, + campaign_id: str | None = None, + platform: Platform | None = None, + days: int = 7, + ) -> list[AdInsight]: + if campaign_id and platform: + client = self.meta if platform == Platform.META else self.google + return await client.get_insights(campaign_id, days) + + campaigns = await self.list_campaigns(platform=platform) + tasks = [ + (self.meta if campaign.platform == Platform.META else self.google).get_insights(campaign.id, days) + for campaign in campaigns + ] + results = await asyncio.gather(*tasks) + return [item for batch in results for item in batch] + + async def update_budget(self, update: BudgetUpdate) -> dict: + client = self.meta if update.platform == Platform.META else self.google + return await client.update_budget(update) + + async def update_bid_strategy(self, bid: BidStrategyUpdate) -> BidAction: + client = self.meta if bid.platform == Platform.META else self.google + return await client.update_bid_strategy(bid) + + +ad_network_service = AdNetworkService() diff --git a/backend/services/mcp_registry.py b/backend/services/mcp_registry.py new file mode 100644 index 00000000..23da4ffe --- /dev/null +++ b/backend/services/mcp_registry.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import os +from typing import Any + +import httpx + + +class MCPRegistry: + def __init__(self) -> None: + self._tools = { + "local_property_rag": { + "description": "Searches project, property, and unit metadata from root CRM data.", + "transport": "python_local", + }, + "crm_search": { + "description": "Queries lead and interaction state from the root PostgreSQL CRM schema.", + "transport": "python_local", + }, + "external_search": { + "description": "Abstract external search slot inspired by Sourik's Brave/DDG tools.", + "transport": "adapter_slot", + }, + } + + def list_tools(self) -> list[dict[str, Any]]: + return [{"name": name, **meta} for name, meta in self._tools.items()] + + async def execute(self, tool_name: str, query: str, *, crm_pool: Any | None = None) -> dict[str, Any]: + if tool_name not in self._tools: + raise KeyError(f"Unknown MCP tool '{tool_name}'.") + if tool_name == "external_search": + return await self._external_search(query) + if tool_name == "crm_search": + return await self._crm_search(query, crm_pool) + if tool_name == "local_property_rag": + return await self._local_property_rag(query, crm_pool) + return {"tool": tool_name, "query": query, "status": "unsupported"} + + async def _external_search(self, query: str) -> dict[str, Any]: + brave_key = os.getenv("BRAVE_API_KEY", "") + if brave_key and not brave_key.startswith("PLACEHOLDER"): + async with httpx.AsyncClient(timeout=15.0) as client: + response = await client.get( + "https://api.search.brave.com/res/v1/web/search", + headers={"Accept": "application/json", "X-Subscription-Token": brave_key}, + params={"q": query, "count": 5}, + ) + response.raise_for_status() + payload = response.json() + results = [ + { + "title": item.get("title"), + "url": item.get("url"), + "snippet": item.get("description"), + } + for item in payload.get("web", {}).get("results", []) + ] + return {"tool": "external_search", "query": query, "status": "ok", "provider": "brave", "results": results} + + async with httpx.AsyncClient(timeout=15.0) as client: + response = await client.get( + "https://api.duckduckgo.com/", + params={"q": query, "format": "json", "no_html": 1, "no_redirect": 1}, + ) + response.raise_for_status() + payload = response.json() + results: list[dict[str, Any]] = [] + abstract = payload.get("AbstractText") + if abstract: + results.append( + { + "title": payload.get("Heading") or query, + "url": payload.get("AbstractURL"), + "snippet": abstract, + } + ) + for topic in payload.get("RelatedTopics", [])[:5]: + if isinstance(topic, dict) and topic.get("Text"): + results.append( + { + "title": topic.get("Text", "")[:80], + "url": topic.get("FirstURL"), + "snippet": topic.get("Text"), + } + ) + return {"tool": "external_search", "query": query, "status": "ok", "provider": "duckduckgo", "results": results} + + async def _crm_search(self, query: str, crm_pool: Any | None) -> dict[str, Any]: + if crm_pool is None: + return {"tool": "crm_search", "query": query, "status": "unavailable", "reason": "crm_pool_missing"} + async with crm_pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, name, email, phone, source, qualification, score, kanban_status, budget, unit_interest + FROM leads + WHERE LOWER(name) LIKE $1 + OR LOWER(COALESCE(email, '')) LIKE $1 + OR LOWER(COALESCE(phone, '')) LIKE $1 + OR LOWER(COALESCE(notes, '')) LIKE $1 + ORDER BY score DESC, updated_at DESC + LIMIT 10 + """, + f"%{query.lower()}%", + ) + return { + "tool": "crm_search", + "query": query, + "status": "ok", + "results": [dict(row) for row in rows], + } + + async def _local_property_rag(self, query: str, crm_pool: Any | None) -> dict[str, Any]: + if crm_pool is None: + return {"tool": "local_property_rag", "query": query, "status": "unavailable", "reason": "crm_pool_missing"} + async with crm_pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, name, source, budget, unit_interest, metadata + FROM leads + WHERE LOWER(COALESCE(unit_interest, '')) LIKE $1 + OR LOWER(COALESCE(notes, '')) LIKE $1 + ORDER BY score DESC, updated_at DESC + LIMIT 10 + """, + f"%{query.lower()}%", + ) + return { + "tool": "local_property_rag", + "query": query, + "status": "ok", + "results": [dict(row) for row in rows], + } + + +mcp_registry = MCPRegistry() diff --git a/backend/services/nemoclaw_runtime.py b/backend/services/nemoclaw_runtime.py new file mode 100644 index 00000000..3dd39baa --- /dev/null +++ b/backend/services/nemoclaw_runtime.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import hashlib +import hmac +import os +from typing import Any + + +class NemoclawRuntime: + def claim_event(self, source_id: str, payload: dict[str, Any]) -> dict[str, Any]: + claim = hashlib.sha256(f"{source_id}:{payload}".encode("utf-8")).hexdigest()[:24] + return {"claim_id": claim, "source_id": source_id, "status": "claimed"} + + def verify_webhook_challenge(self, challenge: str, signature: str) -> bool: + secret = os.getenv("NEMOCLAW_WEBHOOK_SECRET", "") + if not secret: + return False + expected = hmac.new(secret.encode("utf-8"), challenge.encode("utf-8"), hashlib.sha256).hexdigest() + return hmac.compare_digest(expected, signature) + + def build_workflow_dispatch( + self, + *, + prompt: str, + tenant_id: str, + actor_role: str, + component_templates: list[str], + ) -> dict[str, Any]: + return { + "runtime": "python_native_nemoclaw", + "tenantId": tenant_id, + "actorRole": actor_role, + "workflow": "oracle_canvas_generation", + "prompt": prompt, + "componentTemplates": component_templates, + "executionBackend": "comfyui_orchestrated", + } + + +nemoclaw_runtime = NemoclawRuntime() diff --git a/backend/tests/oracle/test_persona_service.py b/backend/tests/oracle/test_persona_service.py new file mode 100644 index 00000000..28290859 --- /dev/null +++ b/backend/tests/oracle/test_persona_service.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import pytest + +from backend.oracle.persona_service import persona_service + + +@pytest.mark.asyncio +async def test_persona_plan_recommends_templates_for_pipeline_prompt() -> None: + plan = await persona_service.plan_for_prompt( + prompt="Show me a pipeline map and broker trend view for marina whales", + tenant_id="tenant_velocity", + actor_role="sales_director", + ) + assert "tpl_pipeline_board_v2" in plan["recommendedTemplates"] + assert plan["canvasBlocks"][0]["type"] == "textCanvas" + + +@pytest.mark.asyncio +async def test_persona_render_uses_existing_prompt_files() -> None: + rendered = await persona_service.render_prompt( + prompt_name="qd_calculator", + variables={"lead_name": "Amina", "query": "Summarize buyer intent"}, + ) + assert rendered["promptName"] == "qd_calculator" + assert "Amina" in rendered["renderedPrompt"] or rendered["unresolvedVariables"] is not None diff --git a/backend/tests/test_catalyst_routes.py b/backend/tests/test_catalyst_routes.py new file mode 100644 index 00000000..52c5d94b --- /dev/null +++ b/backend/tests/test_catalyst_routes.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from backend.api.routes_catalyst import router +from backend.services.ad_network_service import BidAction, Platform + + +def _build_client() -> TestClient: + app = FastAPI() + + async def _broadcast_live_event(*_args, **_kwargs): + return None + + app.state.broadcast_live_event = _broadcast_live_event + app.include_router(router, prefix="/api/catalyst") + return TestClient(app) + + +def test_catalyst_campaigns_and_google_budget_routes(monkeypatch) -> None: + client = _build_client() + + async def fake_list_campaigns(platform=None): + return [ + type( + "Campaign", + (), + { + "id": "google-camp-001", + "name": "Search Investors", + "platform": Platform.GOOGLE, + "status": type("Status", (), {"value": "active"})(), + "daily_budget": 5000, + "spent": 0, + "objective": "SEARCH", + "bid_strategy": "TARGET_ROAS", + }, + )() + ] + + async def fake_get_insights(**_kwargs): + return [ + { + "campaign_id": "google-camp-001", + "spend": 2400, + "impressions": 120000, + "clicks": 4200, + "conversions": 38, + } + ] + + async def fake_update_budget(_payload): + return {"status": "ok", "platform": "google", "mode": "simulated"} + + async def fake_update_bid_strategy(_payload): + return BidAction( + action_id="act-1", + campaign_id="google-camp-001", + platform=Platform.GOOGLE, + old_strategy="TARGET_CPA", + new_strategy="TARGET_ROAS", + target_value=8.5, + executed_at="2026-04-12T00:00:00Z", + ) + + monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.list_campaigns", fake_list_campaigns) + monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.get_insights", fake_get_insights) + monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.update_budget", fake_update_budget) + monkeypatch.setattr("backend.api.routes_catalyst.ad_network_service.update_bid_strategy", fake_update_bid_strategy) + + campaigns = client.get("/api/catalyst/campaigns") + assert campaigns.status_code == 200 + assert campaigns.json()["data"][0]["platform"] == "google" + assert campaigns.json()["data"][0]["conversions"] == 38 + + budget = client.put( + "/api/catalyst/budget", + json={"campaign_id": "google-camp-001", "platform": "google", "daily_budget": 6500}, + ) + assert budget.status_code == 200 + assert budget.json()["data"]["platform"] == "google" + + bid = client.put( + "/api/catalyst/bid-strategy", + json={ + "campaign_id": "google-camp-001", + "platform": "google", + "strategy": "TARGET_ROAS", + "target_value": 8.5, + }, + ) + assert bid.status_code == 200 + assert bid.json()["data"]["new_strategy"] == "TARGET_ROAS" diff --git a/backend/tests/test_crm_routes.py b/backend/tests/test_crm_routes.py new file mode 100644 index 00000000..d746e032 --- /dev/null +++ b/backend/tests/test_crm_routes.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from typing import Any + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from backend.api.routes_crm import analytics_router, crm_router + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +class FakeConn: + def __init__(self) -> None: + self.leads: dict[str, dict[str, Any]] = {} + self.chat_logs: dict[str, dict[str, Any]] = {} + + async def execute(self, query: str, *args): + normalized = query.strip() + if "CREATE TABLE IF NOT EXISTS leads" in normalized or "CREATE TABLE IF NOT EXISTS chat_logs" in normalized: + return "CREATE" + if "CREATE INDEX IF NOT EXISTS" in normalized: + return "CREATE INDEX" + if normalized.startswith("DELETE FROM leads WHERE id = $1"): + existed = self.leads.pop(args[0], None) + return "DELETE 1" if existed else "DELETE 0" + raise AssertionError(f"Unexpected execute query: {query}") + + async def fetchrow(self, query: str, *args): + normalized = query.strip() + if "INSERT INTO leads" in normalized: + row = { + "id": args[0], + "name": args[1], + "email": args[2], + "phone": args[3], + "source": args[4], + "notes": args[5], + "qualification": args[6], + "score": args[7], + "kanban_status": args[8], + "budget": args[9], + "unit_interest": args[10], + "metadata": {}, + "created_at": _now(), + "updated_at": _now(), + } + self.leads[row["id"]] = row + return row + if normalized.startswith("UPDATE leads") and "SET kanban_status" in normalized: + lead = self.leads.get(args[0]) + if not lead: + return None + lead["kanban_status"] = args[1] + lead["updated_at"] = _now() + if lead["score"] >= 90: + lead["qualification"] = "WHALE" + elif lead["score"] >= 70: + lead["qualification"] = "POTENTIAL" + elif lead["score"] >= 45: + lead["qualification"] = "HOT" + return lead + if normalized.startswith("UPDATE leads") and "RETURNING" in normalized: + lead = self.leads.get(args[0]) + if not lead: + return None + lead.update( + { + "name": args[1], + "email": args[2], + "phone": args[3], + "source": args[4], + "notes": args[5], + "qualification": args[6], + "score": args[7], + "kanban_status": args[8], + "budget": args[9], + "unit_interest": args[10], + "updated_at": _now(), + } + ) + return lead + if normalized.startswith("SELECT id FROM leads WHERE id = $1"): + lead = self.leads.get(args[0]) + return {"id": lead["id"]} if lead else None + if "INSERT INTO chat_logs" in normalized: + row = { + "id": args[0], + "lead_id": args[1], + "sender": args[2], + "channel": args[3], + "content": args[4], + "metadata": {}, + "created_at": _now(), + } + self.chat_logs[row["id"]] = row + return row + raise AssertionError(f"Unexpected fetchrow query: {query}") + + async def fetch(self, query: str, *args): + normalized = query.strip() + if "FROM leads" in normalized and "GROUP BY source" not in normalized and "GROUP BY qualification" not in normalized: + rows = list(self.leads.values()) + if "WHERE kanban_status = $1" in normalized: + rows = [row for row in rows if row["kanban_status"] == args[0]] + return rows + if "FROM chat_logs" in normalized: + rows = list(self.chat_logs.values()) + if "WHERE lead_id = $1" in normalized: + rows = [row for row in rows if row["lead_id"] == args[0]] + return rows + if "GROUP BY source" in normalized: + grouped: dict[str, dict[str, Any]] = {} + for lead in self.leads.values(): + slot = grouped.setdefault(lead["source"], {"source": lead["source"], "lead_count": 0, "avg_score": 0.0}) + slot["lead_count"] += 1 + slot["avg_score"] += float(lead["score"]) + for slot in grouped.values(): + slot["avg_score"] = slot["avg_score"] / slot["lead_count"] + return list(grouped.values()) + if "GROUP BY qualification" in normalized: + grouped: dict[str, dict[str, Any]] = {} + for lead in self.leads.values(): + slot = grouped.setdefault(lead["qualification"], {"qualification": lead["qualification"], "lead_count": 0}) + slot["lead_count"] += 1 + return list(grouped.values()) + raise AssertionError(f"Unexpected fetch query: {query}") + + +class FakePool: + def __init__(self) -> None: + self.conn = FakeConn() + + @asynccontextmanager + async def acquire(self): + yield self.conn + + +def _build_client() -> tuple[TestClient, FakePool]: + app = FastAPI() + pool = FakePool() + app.state.db_pool = pool + app.include_router(crm_router, prefix="/api") + app.include_router(analytics_router, prefix="/api/analytics") + return TestClient(app), pool + + +def test_crm_crud_and_analytics_flow() -> None: + client, _pool = _build_client() + + create_response = client.post( + "/api/leads", + json={ + "name": "Amina Rahman", + "email": "amina@example.com", + "phone": "+971500000001", + "source": "website", + "notes": "Cash buyer interested in marina penthouse", + "score": 92, + "kanban_status": "qualified", + "budget": "AED 12M", + "unit_interest": "Penthouse", + "metadata": {"campaign": "meta-velocity-marina"}, + }, + ) + assert create_response.status_code == 201 + lead_id = create_response.json()["data"]["id"] + + list_response = client.get("/api/leads") + assert list_response.status_code == 200 + assert list_response.json()["meta"]["count"] == 1 + + chat_response = client.post( + "/api/chat-logs", + json={ + "lead_id": lead_id, + "sender": "oracle", + "channel": "whatsapp", + "content": "Lead requested a private marina walkthrough.", + "metadata": {"sentiment": "positive"}, + }, + ) + assert chat_response.status_code == 201 + + board_response = client.get("/api/kanban/board") + assert board_response.status_code == 200 + board = board_response.json()["data"] + qualifying_column = next(column for column in board if column["status"] == "Qualifying") + assert qualifying_column["count"] == 1 + + move_response = client.put("/api/kanban/move", json={"lead_id": lead_id, "target_status": "negotiation"}) + assert move_response.status_code == 200 + assert move_response.json()["data"]["kanban_status"] == "Negotiation" + + scatter_response = client.get("/api/analytics/sentiment-scatter") + assert scatter_response.status_code == 200 + scatter = scatter_response.json() + assert scatter[0]["qualification"] == "WHALE" + assert scatter[0]["kanban_status"] == "Negotiation" + + +def test_lead_demographics_groups_by_source_and_qualification() -> None: + client, _pool = _build_client() + client.post("/api/leads", json={"name": "Lead One", "source": "website", "score": 80}) + client.post("/api/leads", json={"name": "Lead Two", "source": "walkin", "score": 45}) + + response = client.get("/api/leads/demographics") + assert response.status_code == 200 + payload = response.json()["data"] + assert len(payload["by_source"]) == 2 + assert any(row["qualification"] == "POTENTIAL" for row in payload["by_qualification"]) diff --git a/backend/tests/test_crm_websocket.py b/backend/tests/test_crm_websocket.py new file mode 100644 index 00000000..39c9bcd6 --- /dev/null +++ b/backend/tests/test_crm_websocket.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import os + +from fastapi.testclient import TestClient + +os.environ.setdefault("VELOCITY_JWT_SECRET", "test-secret") + +from backend.main import app + + +def test_crm_websocket_ack_roundtrip() -> None: + with TestClient(app) as client: + with client.websocket_connect("/ws/crm") as websocket: + first = websocket.receive_json() + assert first["type"] == "crm_presence" + websocket.send_text("ping") + second = websocket.receive_json() + assert second["type"] == "crm_ack" + assert second["data"] == "ping" diff --git a/backend/tests/test_nemoclaw_runtime.py b/backend/tests/test_nemoclaw_runtime.py new file mode 100644 index 00000000..b44b5eb1 --- /dev/null +++ b/backend/tests/test_nemoclaw_runtime.py @@ -0,0 +1,20 @@ +from backend.services.mcp_registry import mcp_registry +from backend.services.nemoclaw_runtime import nemoclaw_runtime + + +def test_nemoclaw_runtime_builds_workflow_dispatch() -> None: + dispatch = nemoclaw_runtime.build_workflow_dispatch( + prompt="Build a marketing-ready Oracle canvas", + tenant_id="tenant_velocity", + actor_role="sales_director", + component_templates=["tpl_pipeline_board_v2"], + ) + assert dispatch["runtime"] == "python_native_nemoclaw" + assert dispatch["workflow"] == "oracle_canvas_generation" + + +def test_mcp_registry_lists_root_python_tools() -> None: + tools = mcp_registry.list_tools() + names = {tool["name"] for tool in tools} + assert "crm_search" in names + assert "external_search" in names diff --git a/backend/tests/test_oracle_routes.py b/backend/tests/test_oracle_routes.py new file mode 100644 index 00000000..e16ba139 --- /dev/null +++ b/backend/tests/test_oracle_routes.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from backend.api.routes_oracle import router + + +def _build_client() -> TestClient: + app = FastAPI() + app.state.db_pool = object() + + async def _broadcast_crm_event(*_args, **_kwargs): + return None + + app.state.broadcast_crm_event = _broadcast_crm_event + app.include_router(router, prefix="/api/oracle") + return TestClient(app) + + +def test_oracle_mcp_execute_and_writeback(monkeypatch) -> None: + client = _build_client() + + async def fake_execute(tool_name, query, *, crm_pool=None): + return {"tool": tool_name, "query": query, "status": "ok", "results": [{"title": "Match"}]} + + async def fake_apply_writeback(payload): + return { + "actionId": payload["action_id"], + "status": "applied", + "targetEntityType": payload["target_entity_type"], + "targetEntityId": payload["target_entity_id"], + "resultPayload": {"lead_id": payload["target_entity_id"], "score": 88}, + } + + async def fake_list_actions(*, status=None, limit=50): + return [{"actionId": "act-1", "status": status or "planned", "targetEntityType": "lead"}] + + monkeypatch.setattr("backend.api.routes_oracle.mcp_registry.execute", fake_execute) + monkeypatch.setattr("backend.api.routes_oracle.oracle_action_service.apply_writeback", fake_apply_writeback) + monkeypatch.setattr("backend.api.routes_oracle.oracle_action_service.list_actions", fake_list_actions) + + mcp_response = client.post( + "/api/oracle/mcp/execute", + json={"tool_name": "external_search", "query": "luxury marina inventory dubai"}, + ) + assert mcp_response.status_code == 200 + assert mcp_response.json()["data"]["tool"] == "external_search" + + writeback_response = client.post( + "/api/oracle/actions/writeback", + json={ + "action_id": "act-1", + "target_entity_type": "lead", + "target_entity_id": "lead-1", + "writeback_payload": {"score_delta": 12}, + }, + ) + assert writeback_response.status_code == 200 + assert writeback_response.json()["data"]["status"] == "applied" + + list_response = client.get("/api/oracle/actions") + assert list_response.status_code == 200 + assert list_response.json()["meta"]["count"] == 1