""" test_policy_service.py — Unit tests for Oracle policy engine. """ import sys import os import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) from oracle.policy_service import PolicyService, PolicyContext @pytest.fixture def svc(): return PolicyService() def _ctx(role: str = "sales_director") -> PolicyContext: return PolicyContext( tenant_id="tenant_test_001", actor_id="user_test_001", actor_role=role, ) # ── Privacy tier tests ──────────────────────────────────────────────────────── def test_junior_broker_denied_restricted(svc): plan = {"dataset": "lead_contacts", "privacyTier": "restricted", "rowLimit": 50, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("junior_broker")) assert not result.passed assert any("POLICY_PRIVACY_TIER_ESCALATION" in e for e in result.errors) def test_senior_broker_allowed_restricted_with_redaction(svc): plan = {"dataset": "lead_contacts", "privacyTier": "restricted", "rowLimit": 50, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("senior_broker")) assert result.passed assert result.redaction_policy == "aggregate_only" def test_junior_broker_denied_sensitive(svc): plan = {"dataset": "pii_records", "privacyTier": "sensitive", "rowLimit": 10, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("junior_broker")) assert not result.passed def test_data_steward_allowed_sensitive(svc): plan = {"dataset": "pii_records", "privacyTier": "sensitive", "rowLimit": 100, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("data_steward")) assert result.passed # ── Row limit tests ─────────────────────────────────────────────────────────── def test_row_limit_capped_for_junior_broker(svc): plan = {"dataset": "leads", "privacyTier": "standard", "rowLimit": 5000, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("junior_broker")) assert result.passed assert result.effective_row_limit == 100 assert any("ROW_LIMIT_CAPPED" in w for w in result.warnings) def test_row_limit_respected_for_admin(svc): plan = {"dataset": "leads", "privacyTier": "standard", "rowLimit": 5000, "joins": []} result = svc.validate_retrieval_plan(plan, _ctx("platform_admin")) assert result.passed assert result.effective_row_limit == 5000 # ── Cross-tenant join tests ─────────────────────────────────────────────────── def test_cross_tenant_join_denied(svc): plan = { "dataset": "global_lead_market", "privacyTier": "standard", "rowLimit": 50, "joins": [], } result = svc.validate_retrieval_plan(plan, _ctx("sales_director")) assert not result.passed assert any("POLICY_CROSS_TENANT_JOIN_DENIED" in e for e in result.errors) def test_explicit_cross_tenant_join_denied(svc): plan = { "dataset": "deals", "privacyTier": "standard", "rowLimit": 50, "joins": [{"tenantId": "tenant_other_999"}], } result = svc.validate_retrieval_plan(plan, _ctx("sales_director")) assert not result.passed # ── Tenant predicate enforcement ────────────────────────────────────────────── def test_enforce_tenant_predicate_overrides(svc): params = {"tenant_id": "attacker_tenant", "limit": 100} ctx = _ctx("sales_director") enforced = svc.enforce_tenant_predicate(params, ctx) assert enforced["tenant_id"] == "tenant_test_001" assert enforced["limit"] == 100 # ── Component access control ────────────────────────────────────────────────── def test_component_access_granted_for_allowed_role(svc): ac = {"allowedRoles": ["sales_director", "senior_broker"], "visibilityScope": "private"} assert svc.validate_component_access(ac, _ctx("sales_director")) is True def test_component_access_denied_for_wrong_role(svc): ac = {"allowedRoles": ["data_steward"], "visibilityScope": "private"} assert svc.validate_component_access(ac, _ctx("junior_broker")) is False # ── Redaction tests ─────────────────────────────────────────────────────────── def test_redact_full(): svc = PolicyService() rows = [{"name": "Alice", "email": "alice@test.com", "deal": 1000}] redacted = svc.redact(rows, "full_redact") assert redacted == [{"__redacted__": True, "count": 1}] def test_redact_aggregate_only(): svc = PolicyService() rows = [{"name": "Alice", "count": 5, "stage": "Qualified", "email": "alice@test.com"}] redacted = svc.redact(rows, "aggregate_only") assert len(redacted) == 1 assert "email" not in redacted[0] assert "name" not in redacted[0] assert redacted[0].get("count") == 5 assert redacted[0].get("stage") == "Qualified" def test_redact_none_passes_through(): svc = PolicyService() rows = [{"name": "Alice", "value": 999}] result = svc.redact(rows, "none") assert result == rows