Files
Project_Velocity/backend/tests/oracle/test_policy_service.py
2026-04-12 02:01:36 +05:30

143 lines
5.5 KiB
Python

"""
test_policy_service.py — Unit tests for Oracle policy engine.
"""
import sys
import os
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from oracle.policy_service import PolicyService, PolicyContext
@pytest.fixture
def svc():
return PolicyService()
def _ctx(role: str = "sales_director") -> PolicyContext:
return PolicyContext(
tenant_id="tenant_test_001",
actor_id="user_test_001",
actor_role=role,
)
# ── Privacy tier tests ────────────────────────────────────────────────────────
def test_junior_broker_denied_restricted(svc):
plan = {"dataset": "lead_contacts", "privacyTier": "restricted", "rowLimit": 50, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("junior_broker"))
assert not result.passed
assert any("POLICY_PRIVACY_TIER_ESCALATION" in e for e in result.errors)
def test_senior_broker_allowed_restricted_with_redaction(svc):
plan = {"dataset": "lead_contacts", "privacyTier": "restricted", "rowLimit": 50, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("senior_broker"))
assert result.passed
assert result.redaction_policy == "aggregate_only"
def test_junior_broker_denied_sensitive(svc):
plan = {"dataset": "pii_records", "privacyTier": "sensitive", "rowLimit": 10, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("junior_broker"))
assert not result.passed
def test_data_steward_allowed_sensitive(svc):
plan = {"dataset": "pii_records", "privacyTier": "sensitive", "rowLimit": 100, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("data_steward"))
assert result.passed
# ── Row limit tests ───────────────────────────────────────────────────────────
def test_row_limit_capped_for_junior_broker(svc):
plan = {"dataset": "leads", "privacyTier": "standard", "rowLimit": 5000, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("junior_broker"))
assert result.passed
assert result.effective_row_limit == 100
assert any("ROW_LIMIT_CAPPED" in w for w in result.warnings)
def test_row_limit_respected_for_admin(svc):
plan = {"dataset": "leads", "privacyTier": "standard", "rowLimit": 5000, "joins": []}
result = svc.validate_retrieval_plan(plan, _ctx("platform_admin"))
assert result.passed
assert result.effective_row_limit == 5000
# ── Cross-tenant join tests ───────────────────────────────────────────────────
def test_cross_tenant_join_denied(svc):
plan = {
"dataset": "global_lead_market",
"privacyTier": "standard",
"rowLimit": 50,
"joins": [],
}
result = svc.validate_retrieval_plan(plan, _ctx("sales_director"))
assert not result.passed
assert any("POLICY_CROSS_TENANT_JOIN_DENIED" in e for e in result.errors)
def test_explicit_cross_tenant_join_denied(svc):
plan = {
"dataset": "deals",
"privacyTier": "standard",
"rowLimit": 50,
"joins": [{"tenantId": "tenant_other_999"}],
}
result = svc.validate_retrieval_plan(plan, _ctx("sales_director"))
assert not result.passed
# ── Tenant predicate enforcement ──────────────────────────────────────────────
def test_enforce_tenant_predicate_overrides(svc):
params = {"tenant_id": "attacker_tenant", "limit": 100}
ctx = _ctx("sales_director")
enforced = svc.enforce_tenant_predicate(params, ctx)
assert enforced["tenant_id"] == "tenant_test_001"
assert enforced["limit"] == 100
# ── Component access control ──────────────────────────────────────────────────
def test_component_access_granted_for_allowed_role(svc):
ac = {"allowedRoles": ["sales_director", "senior_broker"], "visibilityScope": "private"}
assert svc.validate_component_access(ac, _ctx("sales_director")) is True
def test_component_access_denied_for_wrong_role(svc):
ac = {"allowedRoles": ["data_steward"], "visibilityScope": "private"}
assert svc.validate_component_access(ac, _ctx("junior_broker")) is False
# ── Redaction tests ───────────────────────────────────────────────────────────
def test_redact_full():
svc = PolicyService()
rows = [{"name": "Alice", "email": "alice@test.com", "deal": 1000}]
redacted = svc.redact(rows, "full_redact")
assert redacted == [{"__redacted__": True, "count": 1}]
def test_redact_aggregate_only():
svc = PolicyService()
rows = [{"name": "Alice", "count": 5, "stage": "Qualified", "email": "alice@test.com"}]
redacted = svc.redact(rows, "aggregate_only")
assert len(redacted) == 1
assert "email" not in redacted[0]
assert "name" not in redacted[0]
assert redacted[0].get("count") == 5
assert redacted[0].get("stage") == "Qualified"
def test_redact_none_passes_through():
svc = PolicyService()
rows = [{"name": "Alice", "value": 999}]
result = svc.redact(rows, "none")
assert result == rows