The complete code integration is done. Co-authored-by: Sagnik <sagnik7896@gmail.com> Reviewed-on: #18
41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import os
|
|
from typing import Any
|
|
|
|
|
|
class NemoclawRuntime:
|
|
def claim_event(self, source_id: str, payload: dict[str, Any]) -> dict[str, Any]:
|
|
claim = hashlib.sha256(f"{source_id}:{payload}".encode("utf-8")).hexdigest()[:24]
|
|
return {"claim_id": claim, "source_id": source_id, "status": "claimed"}
|
|
|
|
def verify_webhook_challenge(self, challenge: str, signature: str) -> bool:
|
|
secret = os.getenv("NEMOCLAW_WEBHOOK_SECRET", "")
|
|
if not secret:
|
|
return False
|
|
expected = hmac.new(secret.encode("utf-8"), challenge.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
return hmac.compare_digest(expected, signature)
|
|
|
|
def build_workflow_dispatch(
|
|
self,
|
|
*,
|
|
prompt: str,
|
|
tenant_id: str,
|
|
actor_role: str,
|
|
component_templates: list[str],
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"runtime": "python_native_nemoclaw",
|
|
"tenantId": tenant_id,
|
|
"actorRole": actor_role,
|
|
"workflow": "oracle_canvas_generation",
|
|
"prompt": prompt,
|
|
"componentTemplates": component_templates,
|
|
"executionBackend": "comfyui_orchestrated",
|
|
}
|
|
|
|
|
|
nemoclaw_runtime = NemoclawRuntime()
|