Files
Project_Velocity/backend/oracle/canvas_service.py
2026-04-11 19:33:13 +05:30

597 lines
23 KiB
Python

"""
oracle/canvas_service.py
Canvas persistence for Oracle pages, revisions, and current component projections.
"""
from __future__ import annotations
import json
import logging
import os
import uuid
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any
try:
import asyncpg # type: ignore
except Exception: # pragma: no cover
asyncpg = None # type: ignore
logger = logging.getLogger(__name__)
_DB_URL = os.getenv("DATABASE_URL", "")
_DEMO_PAGES: dict[str, dict[str, Any]] = {}
_DEMO_REVISIONS: dict[str, list[dict[str, Any]]] = {}
_DEMO_COMPONENTS: dict[str, list[dict[str, Any]]] = {}
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _allow_in_memory() -> bool:
return (
os.getenv("ORACLE_ALLOW_IN_MEMORY_FALLBACK", "").lower() in {"1", "true", "yes"}
or "PYTEST_CURRENT_TEST" in os.environ
)
def _db_ready() -> bool:
return bool(_DB_URL and not _DB_URL.startswith("PLACEHOLDER") and asyncpg is not None)
def _is_demo() -> bool:
return not _db_ready() and _allow_in_memory()
def _ensure_ready() -> None:
if _db_ready() or _is_demo():
return
if asyncpg is None:
raise RuntimeError("Oracle backend requires asyncpg to connect to PostgreSQL.")
raise RuntimeError("Oracle backend requires DATABASE_URL for production persistence.")
def _stringify(value: Any) -> str:
return str(value) if value is not None else ""
def _normalize_component(component: dict[str, Any]) -> dict[str, Any]:
normalized = deepcopy(component)
normalized["componentId"] = _stringify(normalized.get("componentId"))
descriptor = normalized.get("dataSourceDescriptor") or {}
if descriptor.get("descriptorId") is not None:
descriptor["descriptorId"] = _stringify(descriptor["descriptorId"])
normalized["dataSourceDescriptor"] = descriptor
return normalized
def _deserialize_component_row(row: Any) -> dict[str, Any]:
return _normalize_component(
{
"componentId": _stringify(row["component_id"]),
"type": row["type"],
"title": row["title"],
"description": row["description"],
"version": row["version"],
"lifecycleState": row["lifecycle_state"],
"dataSourceDescriptor": row["data_source_descriptor"],
"visualizationParameters": row["visualization_parameters"],
"dataBindings": row["data_bindings"],
"provenance": row["provenance"],
"renderingHints": row["rendering_hints"],
"layout": row["layout"],
"accessControls": row["access_controls"],
"styleSignature": row["style_signature"],
"validationState": row["validation_state"],
"auditLog": list(row["audit_log"] or []),
}
)
def _deserialize_page_row(row: Any, components: list[dict[str, Any]]) -> dict[str, Any]:
page_id = _stringify(row["page_id"])
branch_id = _stringify(row["branch_id"])
head_revision = int(row["head_revision"])
return {
"pageId": page_id,
"tenantId": row["tenant_id"],
"ownerId": row["owner_id"],
"branchId": branch_id,
"branchName": row["branch_name"],
"pageType": row["page_type"],
"title": row["title"],
"isShared": bool(row["is_shared"]),
"headRevision": head_revision,
"baseRevision": int(row["base_revision"]),
"sharingPolicy": row["sharing_policy"] or {
"shareMode": "direct_fork_only",
"allowReshare": False,
"defaultForkVisibility": "private",
},
"forks": [],
"lineage": [],
"audit": {"lastAuditEventId": "", "eventCount": 0},
"presence": {"activeViewers": 0, "activeEditors": 0, "lastPresenceAt": row["updated_at"].isoformat()},
"mainBranchPointer": {"pageId": page_id, "branchId": branch_id, "revision": head_revision},
"components": components,
"createdAt": row["created_at"].isoformat(),
"updatedAt": row["updated_at"].isoformat(),
}
class CanvasService:
async def create_page(
self,
*,
tenant_id: str,
owner_id: str,
title: str = "Untitled Canvas",
page_type: str = "main",
branch_name: str = "main",
sharing_policy: dict[str, Any] | None = None,
) -> dict[str, Any]:
_ensure_ready()
if _is_demo():
page_id = str(uuid.uuid4())
branch_id = str(uuid.uuid4())
page = {
"pageId": page_id,
"tenantId": tenant_id,
"ownerId": owner_id,
"branchId": branch_id,
"branchName": branch_name,
"pageType": page_type,
"title": title,
"isShared": False,
"headRevision": 0,
"baseRevision": 0,
"sharingPolicy": sharing_policy or {"shareMode": "direct_fork_only", "allowReshare": False, "defaultForkVisibility": "private"},
"forks": [],
"lineage": [],
"audit": {"lastAuditEventId": "", "eventCount": 0},
"presence": {"activeViewers": 0, "activeEditors": 0, "lastPresenceAt": _now()},
"mainBranchPointer": {"pageId": page_id, "branchId": branch_id, "revision": 0},
"components": [],
"createdAt": _now(),
"updatedAt": _now(),
}
_DEMO_PAGES[page_id] = page
_DEMO_REVISIONS[page_id] = []
_DEMO_COMPONENTS[page_id] = []
return page
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
row = await conn.fetchrow(
"""
INSERT INTO oracle_canvas_pages (
tenant_id, owner_id, branch_id, branch_name, page_type, title, sharing_policy
)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
RETURNING *
""",
tenant_id,
owner_id,
str(uuid.uuid4()),
branch_name,
page_type,
title,
json.dumps(sharing_policy or {"shareMode": "direct_fork_only", "allowReshare": False, "defaultForkVisibility": "private"}),
)
return _deserialize_page_row(row, [])
finally:
await conn.close()
async def ensure_default_page(
self,
*,
tenant_id: str,
owner_id: str,
title: str = "Oracle Main Canvas",
) -> dict[str, Any]:
page = await self.get_first_page_for_owner(tenant_id=tenant_id, owner_id=owner_id)
if page:
return page
return await self.create_page(tenant_id=tenant_id, owner_id=owner_id, title=title)
async def get_first_page_for_owner(self, *, tenant_id: str, owner_id: str) -> dict[str, Any] | None:
_ensure_ready()
if _is_demo():
for page in _DEMO_PAGES.values():
if page["tenantId"] == tenant_id and page["ownerId"] == owner_id:
return {**page, "components": deepcopy(_DEMO_COMPONENTS.get(page["pageId"], []))}
return None
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
row = await conn.fetchrow(
"""
SELECT *
FROM oracle_canvas_pages
WHERE tenant_id = $1 AND owner_id = $2
ORDER BY created_at ASC
LIMIT 1
""",
tenant_id,
owner_id,
)
if not row:
return None
components = await self._pg_fetch_components(conn, _stringify(row["page_id"]), tenant_id)
return _deserialize_page_row(row, components)
finally:
await conn.close()
async def get_page(self, page_id: str, tenant_id: str) -> dict[str, Any] | None:
_ensure_ready()
if _is_demo():
page = _DEMO_PAGES.get(page_id)
if page and page["tenantId"] == tenant_id:
return {**page, "components": deepcopy(_DEMO_COMPONENTS.get(page_id, []))}
return None
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
row = await conn.fetchrow(
"""
SELECT *
FROM oracle_canvas_pages
WHERE page_id = $1::uuid AND tenant_id = $2
""",
page_id,
tenant_id,
)
if not row:
return None
components = await self._pg_fetch_components(conn, page_id, tenant_id)
return _deserialize_page_row(row, components)
finally:
await conn.close()
async def commit_revision(
self,
*,
page_id: str,
tenant_id: str,
actor_id: str,
commit_kind: str,
commit_summary: str,
components: list[dict[str, Any]],
execution_id: str | None = None,
merge_request_id: str | None = None,
idempotency_key: str | None = None,
) -> dict[str, Any]:
_ensure_ready()
if _is_demo():
page = _DEMO_PAGES.get(page_id)
if not page or page["tenantId"] != tenant_id:
raise ValueError(f"Page {page_id} not found for tenant {tenant_id}")
if idempotency_key:
existing = next((r for r in _DEMO_REVISIONS.get(page_id, []) if r.get("idempotencyKey") == idempotency_key), None)
if existing:
return existing
new_revision_num = page["headRevision"] + 1
revision = {
"revisionId": str(uuid.uuid4()),
"pageId": page_id,
"tenantId": tenant_id,
"revisionNumber": new_revision_num,
"commitKind": commit_kind,
"commitSummary": commit_summary,
"actorId": actor_id,
"executionId": execution_id,
"mergeRequestId": merge_request_id,
"componentsSnapshot": json.dumps(components),
"idempotencyKey": idempotency_key,
"createdAt": _now(),
}
_DEMO_REVISIONS.setdefault(page_id, []).append(revision)
_DEMO_COMPONENTS[page_id] = deepcopy([_normalize_component(component) for component in components])
page["headRevision"] = new_revision_num
page["mainBranchPointer"]["revision"] = new_revision_num
page["updatedAt"] = _now()
return revision
assert asyncpg is not None
normalized_components = [_normalize_component(component) for component in components]
conn = await asyncpg.connect(_DB_URL)
try:
async with conn.transaction():
if idempotency_key:
existing = await conn.fetchrow(
"""
SELECT *
FROM oracle_canvas_page_revisions
WHERE idempotency_key = $1
""",
idempotency_key,
)
if existing:
return {
"revisionId": _stringify(existing["revision_id"]),
"pageId": _stringify(existing["page_id"]),
"tenantId": existing["tenant_id"],
"revisionNumber": int(existing["revision_number"]),
"commitKind": existing["commit_kind"],
"commitSummary": existing["commit_summary"],
"actorId": existing["actor_id"],
"executionId": _stringify(existing["execution_id"]) if existing["execution_id"] else None,
"mergeRequestId": _stringify(existing["merge_request_id"]) if existing["merge_request_id"] else None,
"componentsSnapshot": json.dumps(existing["components_snapshot"]),
"idempotencyKey": existing["idempotency_key"],
"createdAt": existing["created_at"].isoformat(),
}
page = await conn.fetchrow(
"""
SELECT *
FROM oracle_canvas_pages
WHERE page_id = $1::uuid AND tenant_id = $2
FOR UPDATE
""",
page_id,
tenant_id,
)
if not page:
raise ValueError(f"Page {page_id} not found for tenant {tenant_id}")
new_revision_number = int(page["head_revision"]) + 1
revision = await conn.fetchrow(
"""
INSERT INTO oracle_canvas_page_revisions (
page_id, tenant_id, revision_number, commit_kind, commit_summary,
actor_id, execution_id, merge_request_id, components_snapshot, idempotency_key
)
VALUES (
$1::uuid, $2, $3, $4, $5,
$6, NULLIF($7, '')::uuid, NULLIF($8, '')::uuid, $9::jsonb, $10
)
RETURNING *
""",
page_id,
tenant_id,
new_revision_number,
commit_kind,
commit_summary,
actor_id,
execution_id or "",
merge_request_id or "",
json.dumps(normalized_components),
idempotency_key,
)
await conn.execute(
"""
UPDATE oracle_canvas_pages
SET head_revision = $3, updated_at = NOW()
WHERE page_id = $1::uuid AND tenant_id = $2
""",
page_id,
tenant_id,
new_revision_number,
)
await self._pg_replace_components(conn, page_id=page_id, tenant_id=tenant_id, components=normalized_components)
return {
"revisionId": _stringify(revision["revision_id"]),
"pageId": _stringify(revision["page_id"]),
"tenantId": revision["tenant_id"],
"revisionNumber": int(revision["revision_number"]),
"commitKind": revision["commit_kind"],
"commitSummary": revision["commit_summary"],
"actorId": revision["actor_id"],
"executionId": _stringify(revision["execution_id"]) if revision["execution_id"] else None,
"mergeRequestId": _stringify(revision["merge_request_id"]) if revision["merge_request_id"] else None,
"componentsSnapshot": json.dumps(revision["components_snapshot"]),
"idempotencyKey": revision["idempotency_key"],
"createdAt": revision["created_at"].isoformat(),
}
finally:
await conn.close()
async def rollback(
self,
*,
page_id: str,
tenant_id: str,
actor_id: str,
target_revision: int,
idempotency_key: str,
) -> dict[str, Any]:
_ensure_ready()
if _is_demo():
page = _DEMO_PAGES.get(page_id)
if not page:
raise ValueError(f"Page {page_id} not found")
revisions = _DEMO_REVISIONS.get(page_id, [])
target_rev = next((r for r in revisions if r["revisionNumber"] == target_revision), None)
if not target_rev:
raise ValueError(f"Revision {target_revision} not found for page {page_id}")
snapshot = json.loads(target_rev["componentsSnapshot"])
return await self.commit_revision(
page_id=page_id,
tenant_id=tenant_id,
actor_id=actor_id,
commit_kind="rollback",
commit_summary=f"Rollback to revision {target_revision}",
components=snapshot,
idempotency_key=idempotency_key,
)
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
revision = await conn.fetchrow(
"""
SELECT components_snapshot
FROM oracle_canvas_page_revisions
WHERE page_id = $1::uuid AND tenant_id = $2 AND revision_number = $3
""",
page_id,
tenant_id,
target_revision,
)
if not revision:
raise ValueError(f"Revision {target_revision} not found for page {page_id}")
return await self.commit_revision(
page_id=page_id,
tenant_id=tenant_id,
actor_id=actor_id,
commit_kind="rollback",
commit_summary=f"Rollback to revision {target_revision}",
components=list(revision["components_snapshot"]),
idempotency_key=idempotency_key,
)
finally:
await conn.close()
async def list_revisions(self, page_id: str, tenant_id: str) -> list[dict[str, Any]]:
_ensure_ready()
if _is_demo():
page = _DEMO_PAGES.get(page_id)
if not page or page["tenantId"] != tenant_id:
return []
return sorted(_DEMO_REVISIONS.get(page_id, []), key=lambda r: r["revisionNumber"], reverse=True)
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
rows = await conn.fetch(
"""
SELECT revision_id, page_id, tenant_id, revision_number, commit_kind, commit_summary,
actor_id, execution_id, merge_request_id, created_at
FROM oracle_canvas_page_revisions
WHERE page_id = $1::uuid AND tenant_id = $2
ORDER BY revision_number DESC
""",
page_id,
tenant_id,
)
return [
{
"revisionId": _stringify(row["revision_id"]),
"pageId": _stringify(row["page_id"]),
"tenantId": row["tenant_id"],
"revisionNumber": int(row["revision_number"]),
"commitKind": row["commit_kind"],
"commitSummary": row["commit_summary"],
"actorId": row["actor_id"],
"executionId": _stringify(row["execution_id"]) if row["execution_id"] else None,
"mergeRequestId": _stringify(row["merge_request_id"]) if row["merge_request_id"] else None,
"createdAt": row["created_at"].isoformat(),
}
for row in rows
]
finally:
await conn.close()
async def upsert_component(
self,
*,
page_id: str,
tenant_id: str,
component: dict[str, Any],
) -> dict[str, Any]:
_ensure_ready()
if _is_demo():
comps = _DEMO_COMPONENTS.setdefault(page_id, [])
normalized = _normalize_component(component)
existing_idx = next((i for i, c in enumerate(comps) if c.get("componentId") == normalized.get("componentId")), None)
if existing_idx is not None:
comps[existing_idx] = normalized
else:
comps.append(normalized)
return normalized
assert asyncpg is not None
conn = await asyncpg.connect(_DB_URL)
try:
await self._pg_upsert_component(conn, page_id=page_id, tenant_id=tenant_id, component=_normalize_component(component))
return _normalize_component(component)
finally:
await conn.close()
async def _pg_fetch_components(self, conn: Any, page_id: str, tenant_id: str) -> list[dict[str, Any]]:
rows = await conn.fetch(
"""
SELECT *
FROM oracle_canvas_components
WHERE page_id = $1::uuid AND tenant_id = $2
ORDER BY COALESCE((layout->>'orderIndex')::int, 999999), created_at ASC
""",
page_id,
tenant_id,
)
return [_deserialize_component_row(row) for row in rows]
async def _pg_replace_components(self, conn: Any, *, page_id: str, tenant_id: str, components: list[dict[str, Any]]) -> None:
await conn.execute(
"""
DELETE FROM oracle_canvas_components
WHERE page_id = $1::uuid AND tenant_id = $2
""",
page_id,
tenant_id,
)
for component in components:
await self._pg_upsert_component(conn, page_id=page_id, tenant_id=tenant_id, component=component)
async def _pg_upsert_component(self, conn: Any, *, page_id: str, tenant_id: str, component: dict[str, Any]) -> None:
await conn.execute(
"""
INSERT INTO oracle_canvas_components (
component_id, page_id, tenant_id, type, title, description, version, lifecycle_state,
data_source_descriptor, visualization_parameters, data_bindings, provenance,
rendering_hints, layout, access_controls, style_signature, validation_state, audit_log
)
VALUES (
$1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8,
$9::jsonb, $10::jsonb, $11::jsonb, $12::jsonb,
$13::jsonb, $14::jsonb, $15::jsonb, $16::jsonb, $17::jsonb, $18::text[]
)
ON CONFLICT (component_id)
DO UPDATE SET
title = EXCLUDED.title,
description = EXCLUDED.description,
version = EXCLUDED.version,
lifecycle_state = EXCLUDED.lifecycle_state,
data_source_descriptor = EXCLUDED.data_source_descriptor,
visualization_parameters = EXCLUDED.visualization_parameters,
data_bindings = EXCLUDED.data_bindings,
provenance = EXCLUDED.provenance,
rendering_hints = EXCLUDED.rendering_hints,
layout = EXCLUDED.layout,
access_controls = EXCLUDED.access_controls,
style_signature = EXCLUDED.style_signature,
validation_state = EXCLUDED.validation_state,
audit_log = EXCLUDED.audit_log,
updated_at = NOW()
""",
component["componentId"],
page_id,
tenant_id,
component["type"],
component["title"],
component.get("description"),
int(component.get("version", 1)),
component.get("lifecycleState", "active"),
json.dumps(component.get("dataSourceDescriptor", {})),
json.dumps(component.get("visualizationParameters", {})),
json.dumps(component.get("dataBindings", {})),
json.dumps(component.get("provenance", {})),
json.dumps(component.get("renderingHints", {})),
json.dumps(component.get("layout", {})),
json.dumps(component.get("accessControls", {})),
json.dumps(component.get("styleSignature", {})),
json.dumps(component.get("validationState", {})),
list(component.get("auditLog", [])),
)
canvas_service = CanvasService()