Files
Project_Velocity/backend/services/comms_waha_provider.py
2026-04-28 13:41:14 +05:30

96 lines
3.2 KiB
Python

"""
WAHA (https://github.com/devlikeapro/waha) adapter.
WAHA exposes a simple HTTP API for WhatsApp Web.
"""
import httpx
from typing import Any, Dict, Optional
from .comms_provider import CommsProvider
class WahaProvider(CommsProvider):
async def _request(self, method: str, path: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/api{path}"
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-Api-Key"] = self.api_key
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.request(method, url, headers=headers, json=json_data)
resp.raise_for_status()
return resp.json()
async def send_message(self, phone: str, message: str, message_type: str = "text", **kwargs) -> Dict[str, Any]:
chat_id = f"{phone}@c.us"
payload = {
"chatId": chat_id,
"text": message,
"session": self.instance_id or "default",
}
if message_type == "image" and kwargs.get("media_url"):
payload["caption"] = message
payload["media"] = kwargs["media_url"]
result = await self._request("POST", "/sendImage", payload)
else:
result = await self._request("POST", "/sendText", payload)
return {
"success": True,
"provider": "waha",
"external_message_id": result.get("id"),
"status": "sent",
"raw": result,
}
async def normalize_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
WAHA webhook payload shape (v2024):
{
"event": "message",
"session": "default",
"payload": {
"id": "true_123@c.us_3EB0...",
"timestamp": 1710000000,
"from": "123@c.us",
"to": "456@c.us",
"body": "Hello",
"hasMedia": false, ...
}
}
"""
event = payload.get("event", "")
pl = payload.get("payload", {})
from_jid = pl.get("from", "")
phone = from_jid.replace("@c.us", "").replace("@g.us", "")
direction = "inbound" if event == "message" and not pl.get("fromMe") else "outbound"
return {
"provider": "waha",
"external_message_id": pl.get("id"),
"phone_e164": phone,
"direction": direction,
"message_type": "image" if pl.get("hasMedia") else "text",
"body": pl.get("body", ""),
"media_url": pl.get("mediaUrl"),
"raw": payload,
"timestamp": pl.get("timestamp"),
}
async def test_connection(self) -> Dict[str, Any]:
try:
sessions = await self._request("GET", "/sessions?all=true")
return {
"success": True,
"message": f"Connected to WAHA. Sessions: {len(sessions)}",
"account_info": {"sessions": sessions},
}
except Exception as exc:
return {
"success": False,
"message": f"WAHA connection failed: {exc}",
}
async def get_media(self, media_id: str) -> Optional[bytes]:
return None