The complete code integration is done. Co-authored-by: Sagnik <sagnik7896@gmail.com> Reviewed-on: #18
137 lines
5.5 KiB
Python
137 lines
5.5 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
class MCPRegistry:
|
|
def __init__(self) -> None:
|
|
self._tools = {
|
|
"local_property_rag": {
|
|
"description": "Searches project, property, and unit metadata from root CRM data.",
|
|
"transport": "python_local",
|
|
},
|
|
"crm_search": {
|
|
"description": "Queries lead and interaction state from the root PostgreSQL CRM schema.",
|
|
"transport": "python_local",
|
|
},
|
|
"external_search": {
|
|
"description": "Abstract external search slot inspired by Sourik's Brave/DDG tools.",
|
|
"transport": "adapter_slot",
|
|
},
|
|
}
|
|
|
|
def list_tools(self) -> list[dict[str, Any]]:
|
|
return [{"name": name, **meta} for name, meta in self._tools.items()]
|
|
|
|
async def execute(self, tool_name: str, query: str, *, crm_pool: Any | None = None) -> dict[str, Any]:
|
|
if tool_name not in self._tools:
|
|
raise KeyError(f"Unknown MCP tool '{tool_name}'.")
|
|
if tool_name == "external_search":
|
|
return await self._external_search(query)
|
|
if tool_name == "crm_search":
|
|
return await self._crm_search(query, crm_pool)
|
|
if tool_name == "local_property_rag":
|
|
return await self._local_property_rag(query, crm_pool)
|
|
return {"tool": tool_name, "query": query, "status": "unsupported"}
|
|
|
|
async def _external_search(self, query: str) -> dict[str, Any]:
|
|
brave_key = os.getenv("BRAVE_API_KEY", "")
|
|
if brave_key and not brave_key.startswith("PLACEHOLDER"):
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(
|
|
"https://api.search.brave.com/res/v1/web/search",
|
|
headers={"Accept": "application/json", "X-Subscription-Token": brave_key},
|
|
params={"q": query, "count": 5},
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
results = [
|
|
{
|
|
"title": item.get("title"),
|
|
"url": item.get("url"),
|
|
"snippet": item.get("description"),
|
|
}
|
|
for item in payload.get("web", {}).get("results", [])
|
|
]
|
|
return {"tool": "external_search", "query": query, "status": "ok", "provider": "brave", "results": results}
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
response = await client.get(
|
|
"https://api.duckduckgo.com/",
|
|
params={"q": query, "format": "json", "no_html": 1, "no_redirect": 1},
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
results: list[dict[str, Any]] = []
|
|
abstract = payload.get("AbstractText")
|
|
if abstract:
|
|
results.append(
|
|
{
|
|
"title": payload.get("Heading") or query,
|
|
"url": payload.get("AbstractURL"),
|
|
"snippet": abstract,
|
|
}
|
|
)
|
|
for topic in payload.get("RelatedTopics", [])[:5]:
|
|
if isinstance(topic, dict) and topic.get("Text"):
|
|
results.append(
|
|
{
|
|
"title": topic.get("Text", "")[:80],
|
|
"url": topic.get("FirstURL"),
|
|
"snippet": topic.get("Text"),
|
|
}
|
|
)
|
|
return {"tool": "external_search", "query": query, "status": "ok", "provider": "duckduckgo", "results": results}
|
|
|
|
async def _crm_search(self, query: str, crm_pool: Any | None) -> dict[str, Any]:
|
|
if crm_pool is None:
|
|
return {"tool": "crm_search", "query": query, "status": "unavailable", "reason": "crm_pool_missing"}
|
|
async with crm_pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, name, email, phone, source, qualification, score, kanban_status, budget, unit_interest
|
|
FROM leads
|
|
WHERE LOWER(name) LIKE $1
|
|
OR LOWER(COALESCE(email, '')) LIKE $1
|
|
OR LOWER(COALESCE(phone, '')) LIKE $1
|
|
OR LOWER(COALESCE(notes, '')) LIKE $1
|
|
ORDER BY score DESC, updated_at DESC
|
|
LIMIT 10
|
|
""",
|
|
f"%{query.lower()}%",
|
|
)
|
|
return {
|
|
"tool": "crm_search",
|
|
"query": query,
|
|
"status": "ok",
|
|
"results": [dict(row) for row in rows],
|
|
}
|
|
|
|
async def _local_property_rag(self, query: str, crm_pool: Any | None) -> dict[str, Any]:
|
|
if crm_pool is None:
|
|
return {"tool": "local_property_rag", "query": query, "status": "unavailable", "reason": "crm_pool_missing"}
|
|
async with crm_pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, name, source, budget, unit_interest, metadata
|
|
FROM leads
|
|
WHERE LOWER(COALESCE(unit_interest, '')) LIKE $1
|
|
OR LOWER(COALESCE(notes, '')) LIKE $1
|
|
ORDER BY score DESC, updated_at DESC
|
|
LIMIT 10
|
|
""",
|
|
f"%{query.lower()}%",
|
|
)
|
|
return {
|
|
"tool": "local_property_rag",
|
|
"query": query,
|
|
"status": "ok",
|
|
"results": [dict(row) for row in rows],
|
|
}
|
|
|
|
|
|
mcp_registry = MCPRegistry()
|