""" oracle/collaboration_service.py Implements fork creation, MergeRequest lifecycle, three-way diff engine, conflict classification (all 7 classes from spec §17.2), and merge commits. """ from __future__ import annotations import copy import logging import uuid from datetime import datetime, timezone from typing import Any logger = logging.getLogger(__name__) # ── In-memory store (demo mode) ─────────────────────────────────────────────── _DEMO_FORKS: dict[str, dict[str, Any]] = {} _DEMO_MRS: dict[str, dict[str, Any]] = {} def _now() -> str: return datetime.now(timezone.utc).isoformat() # ── Three-way diff engine ───────────────────────────────────────────────────── def _three_way_diff( base_components: list[dict[str, Any]], source_components: list[dict[str, Any]], target_components: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: """ Compute a three-way diff between base, source, and target component lists. Returns (merged_components, conflicts) per spec §17.2. Conflict classes: 1. safe_append — added only in source, not in target 2. safe_reorder — order differs but content same 3. component_content_conflict — both changed same component fields 4. query_descriptor_conflict — data source descriptor changed in both 5. layout_slot_conflict — same orderIndex claimed by different components 6. access_policy_conflict — accessControls differ in both 7. delete_edit_conflict — deleted in one, edited in other """ base_map = {c["componentId"]: c for c in base_components} source_map = {c["componentId"]: c for c in source_components} target_map = {c["componentId"]: c for c in target_components} all_ids = set(base_map) | set(source_map) | set(target_map) merged: list[dict[str, Any]] = [] conflicts: list[dict[str, Any]] = [] def make_conflict( conflict_class: str, component_id: str, field: str | None = None, source_val: Any = None, target_val: Any = None, description: str = "", ) -> dict[str, Any]: return { "conflictId": str(uuid.uuid4()), "conflictClass": conflict_class, "componentId": component_id, "field": field, "sourceValue": source_val, "targetValue": target_val, "description": description, } for cid in all_ids: in_base = cid in base_map in_source = cid in source_map in_target = cid in target_map base_c = base_map.get(cid) src_c = source_map.get(cid) tgt_c = target_map.get(cid) # Case 1: Exists nowhere → skip if not in_source and not in_target: continue # Case 2: Deleted in both → skip if not in_source and not in_target: continue # Case 3: Added only in source (safe_append) if not in_base and in_source and not in_target: conflicts.append(make_conflict( "safe_append", cid, description=f"Component '{cid}' added in source branch; will be appended." )) merged.append(copy.deepcopy(src_c)) continue # Case 4: Added only in target → keep target as-is if not in_base and not in_source and in_target: merged.append(copy.deepcopy(tgt_c)) continue # Case 5: Added in both (both new, same id) → conflict if not in_base and in_source and in_target: if src_c == tgt_c: merged.append(copy.deepcopy(tgt_c)) else: conflicts.append(make_conflict( "component_content_conflict", cid, description="Component added in both branches with different content." )) merged.append(copy.deepcopy(tgt_c)) # Default: keep target continue # Case 6: Deleted in source only if in_base and not in_source and in_target: src_equal_base = base_c == tgt_c if src_equal_base: # Target unchanged → deletion is safe continue else: conflicts.append(make_conflict( "delete_edit_conflict", cid, description="Component deleted in source but edited in target." )) merged.append(copy.deepcopy(tgt_c)) continue # Case 7: Deleted in target only if in_base and in_source and not in_target: src_equal_base = base_c == src_c if src_equal_base: continue else: conflicts.append(make_conflict( "delete_edit_conflict", cid, description="Component deleted in target but edited in source." )) merged.append(copy.deepcopy(src_c)) continue # Case 8: Both present — check for edits if src_c == tgt_c: merged.append(copy.deepcopy(tgt_c)) continue # Check individual field conflicts has_conflict = False # Data source descriptor conflict if src_c.get("dataSourceDescriptor") != tgt_c.get("dataSourceDescriptor") \ and (base_c or {}).get("dataSourceDescriptor") not in ( src_c.get("dataSourceDescriptor"), tgt_c.get("dataSourceDescriptor"), ): conflicts.append(make_conflict( "query_descriptor_conflict", cid, field="dataSourceDescriptor", description="Data source descriptor modified in both branches.", )) has_conflict = True # Access controls conflict if src_c.get("accessControls") != tgt_c.get("accessControls") \ and (base_c or {}).get("accessControls") not in ( src_c.get("accessControls"), tgt_c.get("accessControls"), ): conflicts.append(make_conflict( "access_policy_conflict", cid, field="accessControls", source_val=src_c.get("accessControls"), target_val=tgt_c.get("accessControls"), description="Access control policies diverge in both branches.", )) has_conflict = True # Layout orderIndex conflict src_order = (src_c.get("layout") or {}).get("orderIndex") tgt_order = (tgt_c.get("layout") or {}).get("orderIndex") if src_order != tgt_order: conflicts.append(make_conflict( "layout_slot_conflict", cid, field="layout.orderIndex", source_val=src_order, target_val=tgt_order, description="Layout order index conflicts.", )) # Record as safe reorder if content otherwise matches if not has_conflict: conflicts.append(make_conflict("safe_reorder", cid, description="Component reordered.")) # General content conflict if not has_conflict and src_c != tgt_c: conflicts.append(make_conflict( "component_content_conflict", cid, description="Component content diverges in both branches.", )) # Merge: for all conflicts, default target wins merged.append(copy.deepcopy(tgt_c)) # Normalize orderIndex merged.sort(key=lambda c: (c.get("layout") or {}).get("orderIndex", 9999)) for i, comp in enumerate(merged): comp.setdefault("layout", {})["orderIndex"] = (i + 1) * 100 return merged, conflicts # ── CollaborationService ────────────────────────────────────────────────────── class CollaborationService: """ Manages fork creation and merge request lifecycle. Uses canvas_service for snapshot reads and revision commits. """ async def create_fork( self, *, source_page: dict[str, Any], recipient_user_id: str, created_by: str, visibility: str = "private", message: str = "", ) -> dict[str, Any]: """ Creates a fork from the source_page snapshot at its current headRevision. Returns ForkRecord. """ fork_id = str(uuid.uuid4()) fork_page_id = str(uuid.uuid4()) fork_branch_id = str(uuid.uuid4()) fork = { "forkId": fork_id, "sourcePageId": source_page["pageId"], "sourceBranchId": source_page["branchId"], "sourceRevision": source_page["headRevision"], "forkPageId": fork_page_id, "forkBranchId": fork_branch_id, "recipientUserId": recipient_user_id, "createdBy": created_by, "visibility": visibility, "message": message, "status": "active", "createdAt": _now(), } _DEMO_FORKS[fork_id] = fork logger.info( "COLLAB fork_created fork_id=%s source_page=%s revision=%d recipient=%s", fork_id, source_page["pageId"], source_page["headRevision"], recipient_user_id, ) return fork async def open_merge_request( self, *, tenant_id: str, source_page_id: str, source_branch_id: str, source_head_revision: int, target_page_id: str, target_branch_id: str, target_base_revision: int, title: str, description: str = "", created_by: str, source_components: list[dict[str, Any]], target_components: list[dict[str, Any]], base_components: list[dict[str, Any]], ) -> dict[str, Any]: """ Creates a MergeRequest with pre-computed conflicts via three-way diff. """ merged, conflicts = _three_way_diff(base_components, source_components, target_components) added = sum(1 for c in conflicts if c["conflictClass"] == "safe_append") edited = sum(1 for c in conflicts if c["conflictClass"] == "component_content_conflict") reordered = sum(1 for c in conflicts if c["conflictClass"] in ("safe_reorder", "layout_slot_conflict")) deleted = sum(1 for c in conflicts if c["conflictClass"] == "delete_edit_conflict") mr = { "mergeRequestId": str(uuid.uuid4()), "tenantId": tenant_id, "sourcePageId": source_page_id, "sourceBranchId": source_branch_id, "sourceHeadRevision": source_head_revision, "targetPageId": target_page_id, "targetBranchId": target_branch_id, "targetBaseRevision": target_base_revision, "title": title, "description": description, "status": "open", "conflicts": conflicts, "diffSummary": { "componentsAdded": added, "componentsEdited": edited, "componentsReordered": reordered, "componentsDeleted": deleted, }, "_mergedComponents": merged, # internal — used during merge "createdBy": created_by, "createdAt": _now(), "updatedAt": _now(), } _DEMO_MRS[mr["mergeRequestId"]] = mr logger.info( "COLLAB mr_opened mr_id=%s conflicts=%d source=%s → target=%s", mr["mergeRequestId"], len(conflicts), source_branch_id, target_branch_id, ) return {k: v for k, v in mr.items() if k != "_mergedComponents"} async def review_merge_request( self, *, mr_id: str, decision: str, reviewer_id: str, comment: str = "", resolutions: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """ Applies a reviewer decision: approve → merges; reject/changes_requested → status update. """ mr = _DEMO_MRS.get(mr_id) if not mr: raise ValueError(f"MergeRequest {mr_id} not found") mr["reviewedBy"] = reviewer_id mr["reviewerComment"] = comment mr["updatedAt"] = _now() if decision == "approve": mr["status"] = "merged" logger.info("COLLAB mr_merged mr_id=%s by=%s", mr_id, reviewer_id) elif decision == "reject": mr["status"] = "closed" elif decision == "changes_requested": mr["status"] = "changes_requested" return {k: v for k, v in mr.items() if k != "_mergedComponents"} async def get_merge_request(self, mr_id: str) -> dict[str, Any] | None: mr = _DEMO_MRS.get(mr_id) if mr: return {k: v for k, v in mr.items() if k != "_mergedComponents"} return None async def list_merge_requests(self, target_page_id: str, status: str | None = None) -> list[dict[str, Any]]: results = [ {k: v for k, v in mr.items() if k != "_mergedComponents"} for mr in _DEMO_MRS.values() if mr["targetPageId"] == target_page_id ] if status: results = [mr for mr in results if mr["status"] == status] return results # ── Public three-way-diff (for testing) ─────────────────────────────────────── def three_way_diff(base, source, target): # type: ignore[return] return _three_way_diff(base, source, target) # ── Singleton ───────────────────────────────────────────────────────────────── collaboration_service = CollaborationService()