diff --git a/changelog/7741-pbac-evaluation-service.yaml b/changelog/7741-pbac-evaluation-service.yaml new file mode 100644 index 0000000000..65a8d2c3f4 --- /dev/null +++ b/changelog/7741-pbac-evaluation-service.yaml @@ -0,0 +1,4 @@ +type: Added +description: Add PBAC evaluation service for purpose-based access control checks +pr: 7741 +labels: [] diff --git a/clients/admin-ui/src/features/common/nav/nav-config.tsx b/clients/admin-ui/src/features/common/nav/nav-config.tsx index 5fcd091c07..2c6a90ac0a 100644 --- a/clients/admin-ui/src/features/common/nav/nav-config.tsx +++ b/clients/admin-ui/src/features/common/nav/nav-config.tsx @@ -407,7 +407,7 @@ if (process.env.NEXT_PUBLIC_APP_ENV === "development") { requiresPlus: true, }, { - title: "Seed Data", + title: "Seed data", path: routes.SEED_DATA_ROUTE, scopes: [ScopeRegistryEnum.DEVELOPER_READ], requiresPlus: true, diff --git a/pyproject.toml b/pyproject.toml index 7497dfd4a7..83cbae4512 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -104,6 +104,7 @@ dependencies = [ "sqlalchemy-redshift==0.8.11", "sqlalchemy-stubs==0.4", "sqlalchemy[asyncio]==1.4.27", + "sqlglot~=30.0.3", "sshtunnel==0.4.0", "starlette~=0.50.0", "stream-zip==0.0.83", diff --git a/src/fides/service/pbac/README.md b/src/fides/service/pbac/README.md new file mode 100644 index 0000000000..505fa23a77 --- /dev/null +++ b/src/fides/service/pbac/README.md @@ -0,0 +1,195 @@ +# Purpose-Based Access Control (PBAC) + +Open-source evaluation service for purpose-based data access control. Determines whether a data consumer has the declared purposes required to access datasets, collections, and fields. + +## Quick Start + +### 1. Register consumers (who accesses data) + +``` +POST /api/v1/data-consumer +{ + "name": "Analytics Team", + "type": "group", + "external_id": "analytics-team", + "contact_email": "analytics-lead@company.com", + "members": ["analyst@company.com", "lead@company.com"] +} +``` + +### 2. Define purposes (why data is accessed) + +``` +POST /api/v1/data-purpose +{ + "fides_key": "marketing_analytics", + "name": "Marketing Analytics", + "data_use": "marketing.advertising" +} +``` + +### 3. Assign purposes to consumers + +``` +PUT /api/v1/data-consumer/{id}/purposes +{ "purpose_fides_keys": ["marketing_analytics"] } +``` + +### 4. Annotate datasets with purposes + +Datasets declare their allowed purposes via fideslang `data_purposes` on datasets, collections, and fields. + +### 5. Evaluate a SQL query + +``` +POST /api/v1/pbac/evaluate +{ + "query_text": "SELECT email, purchase_history FROM customers.orders", + "user_identity": "analyst@company.com" +} +``` + +Response: + +```json +{ + "query_id": "abc-123", + "is_compliant": false, + "consumer": { + "name": "Analytics Team", + "external_id": "analytics-team", + "purpose_fides_keys": ["marketing_analytics"] + }, + "violations": [ + { + "dataset_key": "customers", + "consumer_purposes": ["marketing_analytics"], + "dataset_purposes": ["billing_operations"], + "reason": "Consumer purposes do not overlap with dataset purposes", + "control": "purpose_restriction" + } + ] +} +``` + +## How It Works + +``` +raw SQL + | + v +SQL Parser (sqlglot) -- extracts table references + | + v +RawQueryLogEntry + | + v +PBACEvaluationService + | + +-- 1. Identity Resolution + | user email --> consumer (via contact_email, external_id, or members list) + | + +-- 2. Dataset Resolution + | table references --> fides dataset keys + | + +-- 3. Purpose Map + | consumer --> declared purposes + | dataset --> declared purposes + | + +-- 4. PBAC Engine + | Do consumer purposes overlap with dataset purposes? + | yes --> COMPLIANT + | no --> VIOLATION + | + +-- 5. Policy v2 Engine (optional) + | Does any access policy override the violation? + | ALLOW --> suppress violation + | DENY --> confirm violation + | NO_DECISION --> confirm violation + | + v +EvaluationResult + is_compliant: bool + violations: [...] +``` + +## Identity Resolution + +The system resolves who is running a query by matching the user identity against registered consumers. + +**Resolution chain (in order):** + +1. **contact_email** -- exact match on the consumer's contact email +2. **external_id** -- match on the consumer's external identifier (group name, role ID, etc.) + +> **Note:** Members-based resolution (matching by members list) is not yet implemented in the OSS path. The `RedisIdentityResolver` currently supports steps 1 and 2 only. + +If no consumer matches, the user is marked as "unresolved" with no declared purposes, which means all dataset accesses are violations. + +### Group membership + +Consumers can represent groups or teams. Add user emails to the `members` list: + +```json +{ + "name": "Analytics Team", + "type": "group", + "external_id": "analytics-team", + "members": [ + "analyst@company.com", + "data-scientist@company.com", + "intern@company.com" + ] +} +``` + +When `analyst@company.com` runs a query, they inherit the Analytics Team's purposes. + +## Policy v2 (Override Engine) + +When PBAC finds a violation (purposes don't overlap), the Policy v2 engine gets a chance to override it. Policies use priority-based, first-decisive-match-wins evaluation. + +A policy can ALLOW access even when purposes don't match: + +``` +POST /api/v1/policy +{ + "fides_key": "allow_analytics_on_orders", + "decision": "ALLOW", + "priority": 100, + "match": { + "data_use": { "any": ["marketing.advertising"] } + } +} +``` + +PBAC is always checked first. Policies only evaluated when purposes don't match. + +## Package Structure + +``` +fides/service/pbac/ + types.py -- RawQueryLogEntry, TableRef + sql_parser.py -- Generic SQL --> RawQueryLogEntry (sqlglot) + engine/ -- PBAC evaluation engine (zero dependencies) + types.py -- ConsumerPurposes, DatasetPurposes, QueryAccess, etc. + evaluate.py -- evaluate_access() + reason.py -- Human-readable violation reasons + evaluation/ -- Service boundary + types.py -- EvaluationResult, ResolvedConsumer, EvaluationViolation + interface.py -- PBACEvaluationService Protocol + identity/ -- Consumer identity resolution + interface.py -- IdentityResolver Protocol + basic.py -- BasicIdentityResolver (email + external_id + members) + policies/ -- Policy v2 evaluation interface + interface.py -- AccessPolicyEvaluator Protocol + types + noop.py -- NoOpPolicyEvaluator (default) +``` + +## Extending (Fidesplus) + +Fidesplus adds platform-specific capabilities on top of the OSS evaluation: + +- **Platform connectors** -- BigQuery, Snowflake, Databricks audit log ingestion +- **Platform identity resolution** -- queries BigQuery IAM / Snowflake RBAC to resolve user roles, then matches roles against consumer `external_id` +- **Access control dashboard** -- violation logs, timeseries, per-consumer breakdowns diff --git a/src/fides/service/pbac/__init__.py b/src/fides/service/pbac/__init__.py new file mode 100644 index 0000000000..f6644db68b --- /dev/null +++ b/src/fides/service/pbac/__init__.py @@ -0,0 +1,12 @@ +"""Purpose-Based Access Control (PBAC) evaluation service. + +Import types and pure functions directly:: + + from fides.service.pbac.types import RawQueryLogEntry, EvaluationResult + from fides.service.pbac.evaluate import evaluate_access + from fides.service.pbac.service import PBACEvaluationService + +The service module (PBACEvaluationService, InProcessPBACEvaluationService) +requires Redis/Celery deps and should be imported explicitly from +``fides.service.pbac.service``. +""" diff --git a/src/fides/service/pbac/consumers/__init__.py b/src/fides/service/pbac/consumers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/fides/service/pbac/consumers/entities.py b/src/fides/service/pbac/consumers/entities.py new file mode 100644 index 0000000000..1ade9e285d --- /dev/null +++ b/src/fides/service/pbac/consumers/entities.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import dataclasses +from dataclasses import dataclass, field +from datetime import datetime +from typing import TYPE_CHECKING, Any, Optional + +from fides.service.pbac.purposes.entities import DataPurposeEntity + +if TYPE_CHECKING: + from fides.api.models.data_consumer import ( # type: ignore[import-not-found] + DataConsumer, + ) + from fides.api.models.sql_models import System # type: ignore[attr-defined] + + +@dataclass +class DataConsumerEntity: + """Unified domain entity for both DataConsumer rows and System-as-consumer.""" + + id: str + name: str + type: str + created_at: datetime + updated_at: datetime + description: Optional[str] = None + external_id: Optional[str] = None + purposes: list[DataPurposeEntity] = field(default_factory=list) + purpose_fides_keys: list[str] = field(default_factory=list) + system_fides_key: Optional[str] = None + vendor_id: Optional[str] = None + egress: Optional[dict[str, Any]] = None + ingress: Optional[dict[str, Any]] = None + data_shared_with_third_parties: Optional[bool] = None + third_parties: Optional[str] = None + shared_categories: Optional[list[str]] = None + tags: list[str] = field(default_factory=list) + contact_email: Optional[str] = None + contact_slack_channel: Optional[str] = None + contact_details: Optional[dict[str, Any]] = None + + def to_dict(self) -> dict: + d = dataclasses.asdict(self) + d["created_at"] = self.created_at.isoformat() + d["updated_at"] = self.updated_at.isoformat() + # Store purpose references as fides_keys only (not nested entities) + d["purpose_fides_keys"] = self.purpose_fides_keys + del d["purposes"] + return d + + @classmethod + def from_dict(cls, d: dict) -> DataConsumerEntity: + d = dict(d) + d["created_at"] = datetime.fromisoformat(d["created_at"]) + d["updated_at"] = datetime.fromisoformat(d["updated_at"]) + d.setdefault("purpose_fides_keys", []) + d.setdefault("purposes", []) + return cls(**d) + + @classmethod + def from_consumer(cls, obj: DataConsumer) -> DataConsumerEntity: + purposes = [ + DataPurposeEntity.from_orm(cp.data_purpose) for cp in obj.consumer_purposes + ] + return cls( + id=obj.id, + name=obj.name, + description=obj.description, + type=obj.type, + external_id=obj.external_id, + purposes=purposes, + purpose_fides_keys=[p.fides_key for p in purposes], + egress=obj.egress, + ingress=obj.ingress, + data_shared_with_third_parties=obj.data_shared_with_third_parties, + third_parties=obj.third_parties, + shared_categories=obj.shared_categories or [], + tags=obj.tags or [], + contact_email=obj.contact_email, + contact_slack_channel=obj.contact_slack_channel, + contact_details=obj.contact_details, + created_at=obj.created_at, + updated_at=obj.updated_at, + ) + + @classmethod + def from_system(cls, obj: System) -> DataConsumerEntity: + purposes = [ + DataPurposeEntity.from_orm(sp.data_purpose) for sp in obj.system_purposes + ] + return cls( + id=obj.id, + name=obj.name or obj.fides_key, + description=obj.description, + type="system", + purposes=purposes, + purpose_fides_keys=[p.fides_key for p in purposes], + system_fides_key=obj.fides_key, + vendor_id=getattr(obj, "vendor_id", None), + egress=obj.egress if isinstance(obj.egress, dict) else None, + ingress=obj.ingress if isinstance(obj.ingress, dict) else None, + tags=obj.tags or [], + created_at=obj.created_at, + updated_at=obj.updated_at, + ) diff --git a/src/fides/service/pbac/consumers/repository.py b/src/fides/service/pbac/consumers/repository.py new file mode 100644 index 0000000000..e3c5dd18ea --- /dev/null +++ b/src/fides/service/pbac/consumers/repository.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Optional +from uuid import uuid4 + +from fastapi_pagination import Page, Params + +from fides.api.util.cache import FidesopsRedis +from fides.service.pbac.consumers.entities import DataConsumerEntity +from fides.service.pbac.exceptions import ResourceNotFoundError +from fides.service.pbac.purposes.repository import DataPurposeRedisRepository +from fides.service.pbac.redis_repository import RedisRepository + + +class DataConsumerRedisRepository(RedisRepository[DataConsumerEntity]): + """Redis-backed repository for non-system DataConsumer entities.""" + + PREFIX = "data_consumer" + + def __init__( + self, + cache: FidesopsRedis, + purpose_repo: DataPurposeRedisRepository, + ) -> None: + super().__init__(cache) + self._purpose_repo = purpose_repo + + def _serialize(self, entity: DataConsumerEntity) -> str: + return json.dumps(entity.to_dict()) + + def _deserialize(self, data: str) -> DataConsumerEntity: + return DataConsumerEntity.from_dict(json.loads(data)) + + def _get_pk(self, entity: DataConsumerEntity) -> str: + return entity.id + + def _get_index_entries(self, entity: DataConsumerEntity) -> list[tuple[str, str]]: + entries: list[tuple[str, str]] = [("type", entity.type)] + if entity.contact_email: + entries.append(("contact_email", entity.contact_email)) + if entity.external_id: + entries.append(("external_id", entity.external_id)) + for tag in entity.tags: + entries.append(("tag", tag)) + for fk in entity.purpose_fides_keys: + entries.append(("purpose", fk)) + return entries + + def _resolve_purposes(self, entity: DataConsumerEntity) -> DataConsumerEntity: + """Populate the purposes list from purpose_fides_keys.""" + if entity.purpose_fides_keys: + entity.purposes = [ + p + for fk in entity.purpose_fides_keys + if (p := self._purpose_repo.get(fk)) is not None + ] + else: + entity.purposes = [] + return entity + + # Override get/get_many to auto-resolve purposes + + def get(self, pk: str) -> Optional[DataConsumerEntity]: + entity = super().get(pk) + if entity: + return self._resolve_purposes(entity) + return None + + def get_many(self, pks: list[str]) -> list[DataConsumerEntity]: + entities = super().get_many(pks) + return [self._resolve_purposes(e) for e in entities] + + # ── Domain methods ──────────────────────────────────────────────── + + def create(self, data: dict) -> DataConsumerEntity: + now = datetime.now(timezone.utc) + entity = DataConsumerEntity( + id=str(uuid4()), + name=data["name"], + type=data["type"], + description=data.get("description"), + external_id=data.get("external_id"), + egress=data.get("egress"), + ingress=data.get("ingress"), + data_shared_with_third_parties=data.get("data_shared_with_third_parties"), + third_parties=data.get("third_parties"), + shared_categories=data.get("shared_categories"), + tags=data.get("tags") or [], + contact_email=data.get("contact_email"), + contact_slack_channel=data.get("contact_slack_channel"), + contact_details=data.get("contact_details"), + purpose_fides_keys=[], + purposes=[], + created_at=now, + updated_at=now, + ) + return self.save(entity) + + def update(self, id: str, data: dict) -> DataConsumerEntity: + entity = self.get(id) + if not entity: + raise ResourceNotFoundError("DataConsumer", id) + MUTABLE_FIELDS = { + "name", + "description", + "external_id", + "egress", + "ingress", + "data_shared_with_third_parties", + "third_parties", + "shared_categories", + "tags", + "contact_email", + "contact_slack_channel", + "contact_details", + "vendor_id", + "type", + } + for field, value in data.items(): + if field in MUTABLE_FIELDS: + setattr(entity, field, value) + entity.updated_at = datetime.now(timezone.utc) + self.save(entity) + return self._resolve_purposes(entity) + + def delete_by_id(self, id: str) -> None: + if not self.delete(id): + raise ResourceNotFoundError("DataConsumer", id) + + def assign_purposes( + self, + id: str, + purpose_fides_keys: list[str], + ) -> DataConsumerEntity: + """Replace all purposes (replace semantics).""" + entity = self.get(id) + if not entity: + raise ResourceNotFoundError("DataConsumer", id) + # Validate all purpose keys exist + self._validate_purpose_keys(purpose_fides_keys) + entity.purpose_fides_keys = purpose_fides_keys + entity.updated_at = datetime.now(timezone.utc) + self.save(entity) + return self._resolve_purposes(entity) + + def add_purpose( + self, + id: str, + purpose_fides_key: str, + ) -> DataConsumerEntity: + """Add a single purpose (idempotent).""" + entity = self.get(id) + if not entity: + raise ResourceNotFoundError("DataConsumer", id) + self._validate_purpose_keys([purpose_fides_key]) + if purpose_fides_key not in entity.purpose_fides_keys: + entity.purpose_fides_keys.append(purpose_fides_key) + entity.updated_at = datetime.now(timezone.utc) + self.save(entity) + return self._resolve_purposes(entity) + + def remove_purpose( + self, + id: str, + purpose_fides_key: str, + ) -> DataConsumerEntity: + """Remove a single purpose.""" + entity = self.get(id) + if not entity: + raise ResourceNotFoundError("DataConsumer", id) + if purpose_fides_key in entity.purpose_fides_keys: + entity.purpose_fides_keys.remove(purpose_fides_key) + entity.updated_at = datetime.now(timezone.utc) + self.save(entity) + return self._resolve_purposes(entity) + + def remove_purpose_from_all(self, purpose_fides_key: str) -> None: + """Remove a purpose reference from all consumers that have it.""" + consumers = self.get_by_index("purpose", purpose_fides_key) + for consumer in consumers: + if purpose_fides_key in consumer.purpose_fides_keys: + consumer.purpose_fides_keys.remove(purpose_fides_key) + consumer.updated_at = datetime.now(timezone.utc) + # Use base save to skip purpose resolution overhead + RedisRepository.save(self, consumer) + + def list_page( # type: ignore[override] + self, + params: Params, + type: Optional[str] = None, + purpose_fides_key: Optional[str] = None, + tags: Optional[list[str]] = None, + ) -> Page[DataConsumerEntity]: + filters: list[tuple[str, str]] = [] + if type: + filters.append(("type", type)) + if purpose_fides_key: + filters.append(("purpose", purpose_fides_key)) + if tags: + for tag in tags: + filters.append(("tag", tag)) + return super().list_page(params, filters=filters or None) + + def _validate_purpose_keys(self, fides_keys: list[str]) -> None: + """Check that all purpose fides_keys exist in the purpose repo.""" + for fk in fides_keys: + if not self._purpose_repo.exists(fk): + raise ResourceNotFoundError("DataPurpose", fk) diff --git a/src/fides/service/pbac/engine/__init__.py b/src/fides/service/pbac/engine/__init__.py new file mode 100644 index 0000000000..4faf38c2f7 --- /dev/null +++ b/src/fides/service/pbac/engine/__init__.py @@ -0,0 +1,21 @@ +"""Backward-compatible re-exports — types moved to fides.service.pbac.types.""" + +from fides.service.pbac.evaluate import evaluate_access +from fides.service.pbac.reason import generate_violation_reason +from fides.service.pbac.types import ( + ConsumerPurposes, + DatasetPurposes, + QueryAccess, + ValidationResult, + Violation, +) + +__all__ = [ + "ConsumerPurposes", + "DatasetPurposes", + "QueryAccess", + "ValidationResult", + "Violation", + "evaluate_access", + "generate_violation_reason", +] diff --git a/src/fides/service/pbac/evaluate.py b/src/fides/service/pbac/evaluate.py new file mode 100644 index 0000000000..bd2e4461a5 --- /dev/null +++ b/src/fides/service/pbac/evaluate.py @@ -0,0 +1,135 @@ +"""Purpose-based access control evaluation engine. + +This module has ZERO external dependencies. Given a consumer's purposes, +a map of dataset purposes, and a query access event, it determines whether +the access is compliant and produces a list of violations. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from fides.service.pbac.types import ( + ConsumerPurposes, + DatasetPurposes, + QueryAccess, + ValidationResult, + Violation, +) + + +def evaluate_access( + consumer: ConsumerPurposes, + datasets: dict[str, DatasetPurposes], + query: QueryAccess, +) -> ValidationResult: + """Evaluate a query access event against purpose assignments. + + Rules: + 1. If the consumer has NO declared purposes, every dataset access is a + violation ("consumer has no declared purposes"). + 2. If a dataset has declared purposes AND the consumer's purposes do not + intersect with the dataset's effective purposes, it is a violation. + 3. If a dataset has NO declared purposes, access is considered compliant + (no restrictions on this dataset yet). + + When collections are specified in the query, each collection is checked + individually using additive purpose inheritance. + """ + violations: list[Violation] = [] + total_accesses = 0 + + for dataset_key in query.dataset_keys: + ds_purposes = datasets.get(dataset_key) + collections = query.collections.get(dataset_key, ()) + + if collections: + # Check each collection individually + for collection in collections: + total_accesses += 1 + violation = _check_access( + consumer=consumer, + ds_purposes=ds_purposes, + dataset_key=dataset_key, + collection=collection, + query_id=query.query_id, + ) + if violation: + violations.append(violation) + else: + # No collection info — check at dataset level + total_accesses += 1 + violation = _check_access( + consumer=consumer, + ds_purposes=ds_purposes, + dataset_key=dataset_key, + collection=None, + query_id=query.query_id, + ) + if violation: + violations.append(violation) + + return ValidationResult( + violations=violations, + is_compliant=len(violations) == 0, + total_accesses=total_accesses, + checked_at=datetime.now(timezone.utc), + ) + + +def _check_access( + *, + consumer: ConsumerPurposes, + ds_purposes: DatasetPurposes | None, + dataset_key: str, + collection: str | None, + query_id: str, +) -> Violation | None: + """Check a single dataset/collection access against consumer purposes.""" + + # Rule 1: consumer has no purposes at all. + # This is intentionally asymmetric with Rule 3: a consumer with no purposes + # is always a violation (even if the dataset also has no purposes), because + # an unregistered consumer should never silently pass evaluation. + if not consumer.purpose_keys: + effective = ( + ds_purposes.effective_purposes(collection) if ds_purposes else frozenset() + ) + return Violation( + query_id=query_id, + consumer_id=consumer.consumer_id, + consumer_name=consumer.consumer_name, + dataset_key=dataset_key, + collection=collection, + consumer_purposes=consumer.purpose_keys, + dataset_purposes=effective, + reason="Consumer has no declared purposes", + ) + + # Dataset not registered in Fides — no purpose metadata available + if ds_purposes is None: + return None + + effective = ds_purposes.effective_purposes(collection) + + # Rule 3: dataset has no declared purposes — no restrictions + if not effective: + return None + + # Rule 2: check intersection + if not consumer.purpose_keys & effective: + return Violation( + query_id=query_id, + consumer_id=consumer.consumer_id, + consumer_name=consumer.consumer_name, + dataset_key=dataset_key, + collection=collection, + consumer_purposes=consumer.purpose_keys, + dataset_purposes=effective, + reason=( + f"Consumer purposes {sorted(consumer.purpose_keys)} do not overlap " + f"with dataset purposes {sorted(effective)}" + ), + ) + + return None diff --git a/src/fides/service/pbac/evaluation/__init__.py b/src/fides/service/pbac/evaluation/__init__.py new file mode 100644 index 0000000000..4ab10e87f0 --- /dev/null +++ b/src/fides/service/pbac/evaluation/__init__.py @@ -0,0 +1,34 @@ +"""Backward-compatible re-exports. + +Types moved to ``fides.service.pbac.types``. +Service moved to ``fides.service.pbac.service``. + +Import directly from the canonical modules instead of this shim. +""" + +from fides.service.pbac.types import ( + EvaluationResult, + EvaluationViolation, + ResolvedConsumer, +) + +__all__ = [ + "EvaluationResult", + "EvaluationViolation", + "ResolvedConsumer", +] + + +def __getattr__(name: str) -> object: + """Lazy imports for service classes that pull in Redis/Celery deps.""" + if name in ("PBACEvaluationService", "InProcessPBACEvaluationService"): + from fides.service.pbac.service import ( + InProcessPBACEvaluationService, + PBACEvaluationService, + ) + + return { + "PBACEvaluationService": PBACEvaluationService, + "InProcessPBACEvaluationService": InProcessPBACEvaluationService, + }[name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/fides/service/pbac/exceptions.py b/src/fides/service/pbac/exceptions.py new file mode 100644 index 0000000000..3836aa9726 --- /dev/null +++ b/src/fides/service/pbac/exceptions.py @@ -0,0 +1,15 @@ +class ResourceNotFoundError(Exception): + """Raised when a requested resource does not exist.""" + + def __init__(self, resource_type: str, resource_id: str): + self.resource_type = resource_type + self.resource_id = resource_id + super().__init__(f"{resource_type} '{resource_id}' not found") + + +class ResourceAlreadyExistsError(Exception): + """Raised when creating a resource that already exists.""" + + +class ResourceInUseError(Exception): + """Raised when deleting a resource that is still referenced.""" diff --git a/src/fides/service/pbac/identity/__init__.py b/src/fides/service/pbac/identity/__init__.py new file mode 100644 index 0000000000..a83513add2 --- /dev/null +++ b/src/fides/service/pbac/identity/__init__.py @@ -0,0 +1,14 @@ +"""Identity resolution for PBAC evaluation.""" + +from fides.service.pbac.identity.interface import IdentityResolver + +__all__ = ["IdentityResolver", "RedisIdentityResolver"] + + +def __getattr__(name: str) -> object: + """Lazy import for RedisIdentityResolver (requires Redis deps).""" + if name == "RedisIdentityResolver": + from fides.service.pbac.identity.resolver import RedisIdentityResolver + + return RedisIdentityResolver + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/fides/service/pbac/identity/basic.py b/src/fides/service/pbac/identity/basic.py new file mode 100644 index 0000000000..b49eed796c --- /dev/null +++ b/src/fides/service/pbac/identity/basic.py @@ -0,0 +1,77 @@ +"""BasicIdentityResolver — OSS identity resolution. + +Resolves user identities to consumers using: +1. contact_email match +2. external_id match +3. members list match +""" + +from __future__ import annotations + +from fides.service.pbac.types import ResolvedConsumer + + +class BasicIdentityResolver: + """Resolve user identities using in-memory consumer data. + + This is the OSS implementation. Consumers are loaded from a + consumer store (list of ResolvedConsumer) and matched against + user email or principal subject. + + Fidesplus extends this with PlatformIdentityResolver that queries + BigQuery IAM / Snowflake RBAC before falling back to this chain. + """ + + def __init__( + self, + consumers: list[ResolvedConsumer], + members_map: dict[str, list[str]] | None = None, + ) -> None: + """Initialize with a list of consumers. + + Args: + consumers: All registered consumers. + members_map: Optional mapping of consumer ID -> member emails. + Used for group membership resolution. + """ + self._consumers = consumers + self._members_map = members_map or {} + + # Build indexes for fast lookup + self._by_email: dict[str, ResolvedConsumer] = {} + self._by_external_id: dict[str, ResolvedConsumer] = {} + for c in consumers: + if c.email: + self._by_email[c.email] = c + if c.external_id: + self._by_external_id[c.external_id] = c + + def resolve( + self, + user_email: str, + principal_subject: str | None = None, + ) -> ResolvedConsumer | None: + """Resolve a user identity to a consumer. + + Resolution chain: + 1. Exact match on consumer contact_email + 2. Exact match on consumer external_id + 3. Members list match (user email in consumer's members) + """ + # 1. contact_email match + if user_email in self._by_email: + return self._by_email[user_email] + + # 2. external_id match (try principal_subject first, then email) + if principal_subject and principal_subject in self._by_external_id: + return self._by_external_id[principal_subject] + if user_email in self._by_external_id: + return self._by_external_id[user_email] + + # 3. members list match + for consumer in self._consumers: + if consumer.id and consumer.id in self._members_map: + if user_email in self._members_map[consumer.id]: + return consumer + + return None diff --git a/src/fides/service/pbac/identity/interface.py b/src/fides/service/pbac/identity/interface.py new file mode 100644 index 0000000000..ba54ece0a2 --- /dev/null +++ b/src/fides/service/pbac/identity/interface.py @@ -0,0 +1,37 @@ +"""IdentityResolver Protocol — maps user identities to consumers. + +OSS provides BasicIdentityResolver (email, external_id, members). +Fidesplus extends with PlatformIdentityResolver (BigQuery IAM, Snowflake RBAC). +""" + +from __future__ import annotations + +from typing import Protocol, runtime_checkable + +from fides.service.pbac.types import ResolvedConsumer + + +@runtime_checkable +class IdentityResolver(Protocol): + """Resolve a query user identity to a consumer. + + Resolution should attempt to match the user to a registered + consumer. Returns None if the identity cannot be resolved. + """ + + def resolve( + self, + user_email: str, + principal_subject: str | None = None, + ) -> ResolvedConsumer | None: + """Resolve a user identity to a consumer. + + Args: + user_email: Email of the user who ran the query. + principal_subject: Optional platform-specific principal + (e.g., BigQuery service account, Snowflake role). + + Returns: + ResolvedConsumer if matched, None if unresolved. + """ + ... diff --git a/src/fides/service/pbac/identity/resolver.py b/src/fides/service/pbac/identity/resolver.py new file mode 100644 index 0000000000..36e8e903d9 --- /dev/null +++ b/src/fides/service/pbac/identity/resolver.py @@ -0,0 +1,112 @@ +"""Identity and dataset resolution for query log entries. + +Maps platform-specific identifiers (emails, table references) to Fides +domain entities (DataConsumer, Dataset). +""" + +from __future__ import annotations + +from typing import Optional + +from loguru import logger + +from fides.api.util.cache import get_cache +from fides.service.pbac.consumers.entities import DataConsumerEntity +from fides.service.pbac.consumers.repository import DataConsumerRedisRepository +from fides.service.pbac.purposes.repository import DataPurposeRedisRepository +from fides.service.pbac.types import TableRef + + +class RedisIdentityResolver: + """Resolve a query log user identity to a DataConsumer. + + Resolution chain: + 1. Exact match on DataConsumer.contact_email + 2. Match on DataConsumer.external_id (for group/project consumers) + + Returns None if the identity cannot be resolved (recorded as a gap). + """ + + def __init__(self, consumer_repo: DataConsumerRedisRepository) -> None: + self._consumer_repo = consumer_repo + + def resolve( + self, + user_email: str, + principal_subject: str | None = None, + ) -> Optional[DataConsumerEntity]: + """Attempt to resolve a user identity to a DataConsumer.""" + + # Strategy 1: match by contact_email + matches = self._consumer_repo.get_by_index("contact_email", user_email) + if matches: + return matches[0] + + # Strategy 2: match by external_id (e.g., "user:email@example.com") + if principal_subject: + matches = self._consumer_repo.get_by_index("external_id", principal_subject) + if matches: + return matches[0] + + # Also try the email as external_id + matches = self._consumer_repo.get_by_index("external_id", user_email) + if matches: + return matches[0] + + logger.debug( + "Could not resolve identity for user_email={}, principal_subject={}", + user_email, + principal_subject, + ) + return None + + +class DatasetResolver: + """Resolve platform table references to Fides dataset fides_keys. + + Uses a configurable mapping strategy: + - Direct match: ``{dataset}.{table}`` → Fides dataset ``fides_key`` + - Qualified match: ``{project}.{dataset}.{table}`` → Fides dataset + - Custom prefix mapping (e.g., strip project prefix) + """ + + def __init__( + self, + dataset_mappings: Optional[dict[str, str]] = None, + ) -> None: + # qualified_name -> fides_key + self._mappings: dict[str, str] = dataset_mappings or {} + + def resolve(self, table_ref: TableRef) -> str | None: + """Resolve a table reference to a Fides dataset fides_key. + + Returns None if no mapping is found. + """ + # Try full qualified name first + if table_ref.qualified_name in self._mappings: + return self._mappings[table_ref.qualified_name] + + # Try dataset.table (without project) + short_name = f"{table_ref.dataset}.{table_ref.table}" + if short_name in self._mappings: + return self._mappings[short_name] + + # Try just the dataset name as fides_key + if table_ref.dataset in self._mappings: + return self._mappings[table_ref.dataset] + + # Fall back to using the dataset name directly as fides_key + return table_ref.dataset + + def add_mapping(self, platform_ref: str, fides_key: str) -> None: + """Add a table reference → fides_key mapping.""" + self._mappings[platform_ref] = fides_key + + +def build_identity_resolver( + purpose_repo: DataPurposeRedisRepository, +) -> RedisIdentityResolver: + """Factory to create a RedisIdentityResolver with proper dependencies.""" + cache = get_cache() + consumer_repo = DataConsumerRedisRepository(cache, purpose_repo) + return RedisIdentityResolver(consumer_repo) diff --git a/src/fides/service/pbac/policies/IMPLEMENTATION_GUIDE.md b/src/fides/service/pbac/policies/IMPLEMENTATION_GUIDE.md new file mode 100644 index 0000000000..296873d595 --- /dev/null +++ b/src/fides/service/pbac/policies/IMPLEMENTATION_GUIDE.md @@ -0,0 +1,314 @@ +# Policy v2 Implementation Guide + +This guide covers how to implement the Policy v2 evaluation engine. The interface is defined, the integration point is wired up, and a `NoOpPolicyEvaluator` is in place. The next step is to replace the no-op with real policy matching and evaluation. + +## How it fits in + +``` +SQL query arrives + | + v +PBAC engine checks purpose overlap + | + +-- purposes match --> COMPLIANT (policies never consulted) + | + +-- purposes DON'T match --> policy evaluator runs here + | + v + AccessPolicyEvaluator.evaluate(request) + | + +-- ALLOW --> violation suppressed + +-- DENY --> violation confirmed + +-- NO_DECISION --> violation confirmed (default today) +``` + +PBAC always runs first. The policy evaluator is only called when purposes don't overlap. An `ALLOW` result suppresses the violation. Anything else keeps it. + +## The interface to implement + +```python +# fides/service/pbac/policies/interface.py + +class AccessPolicyEvaluator(Protocol): + def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: ... +``` + +### Input: `AccessEvaluationRequest` + +Fields available to the evaluator: + +```python +@dataclass(frozen=True) +class AccessEvaluationRequest: + # Who is accessing (from the PBAC violation) + consumer_id: str # fides ID or email if unresolved + consumer_name: str + consumer_purposes: frozenset[str] # purposes the consumer declared + + # What they're accessing + dataset_key: str + dataset_purposes: frozenset[str] # purposes declared on the dataset + collection: str | None # specific collection, if known + + # For policy match resolution (enrichment from the consumer's system) + system_fides_key: str | None # the consumer's system in fides + data_uses: tuple[str, ...] # from system privacy declarations + data_categories: tuple[str, ...] # from system privacy declarations + data_subjects: tuple[str, ...] # from system privacy declarations + + # Runtime context for unless conditions + context: dict[str, Any] # e.g. {"environment": {"geo_location": "US-CA"}, + # "subject": {"email": "user@example.com"}} +``` + +### Output: `PolicyEvaluationResult` + +```python +@dataclass +class PolicyEvaluationResult: + decision: PolicyDecision # ALLOW, DENY, or NO_DECISION + decisive_policy_key: str | None # which policy decided (None if NO_DECISION) + decisive_policy_priority: int | None + unless_triggered: bool # True if the decisive policy's unless block fired + evaluated_policies: list[EvaluatedPolicyInfo] # audit trail of all matched policies + action: PolicyAction | None # denial message from the decisive policy + reason: str | None # human-readable explanation +``` + +## What you need to build + +### 1. Policy storage + +Policies need to be stored and retrievable. Each policy looks like: + +```yaml +fides_key: ccpa_sale_blocker +decision: ALLOW # or DENY +priority: 100 # higher = evaluated first +enabled: true +controls: [ccpa_compliance] # grouping (organizational, not evaluated) + +match: # which declarations this applies to + data_use: + any: [marketing.advertising] + data_category: + all: [user.contact.email] + +unless: # conditions that invert the decision + - type: consent + privacy_notice_key: ccpa_do_not_sell + requirement: opt_out + - type: geo_location + field: environment.geo_location + operator: in + values: ["US-CA"] + +action: # what to show when blocking + message: "User opted out of data sales." +``` + +For now, follow the `RedisRepository` pattern used by consumers and purposes (`fides/service/pbac/redis_repository.py`). This keeps things consistent with the rest of the PBAC layer while we settle on the final models. We'll migrate to Postgres once the models are finalized. + +### 2. Match evaluation + +The `match` block determines which policies apply to a given request. All dimensions are AND'd. + +**Built-in dimensions** (match against `AccessEvaluationRequest` fields): + +| Match dimension | Request field | Operator | +|----------------|---------------|----------| +| `data_use` | `request.data_uses` | `any` or `all` | +| `data_category` | `request.data_categories` | `any` or `all` | +| `data_subject` | `request.data_subjects` | `any` or `all` | +| `` | looked up via taxonomy key | `any` or `all` | + +**Hierarchical matching**: `user.contact` matches `user.contact.email` (child) but not `user` (parent). + +**Empty match** (`match: {}`) matches everything -- used for catch-all default policies. + +### 3. Unless evaluation + +The `unless` block defines conditions that **invert** the decision when ALL conditions are met (AND'd). + +Behavior: +- ALLOW + unless triggered = **DENY** (decisive, stops evaluation) +- DENY + unless triggered = **suppressed** (not decisive, evaluation continues) + +Condition types to implement: + +| Type | What it checks | Where the data comes from | +|------|---------------|--------------------------| +| `consent` | Privacy notice opt-in/opt-out status | `request.context["subject"]` -> consent DB lookup | +| `geo_location` | Geographic location of the request | `request.context["environment"]["geo_location"]` | +| `data_flow` | Ingress/egress system relationships | System's ingress/egress config in fides | + +### 4. Evaluation algorithm + +```python +def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: + # 1. Load all enabled policies, sorted by priority (highest first) + policies = self._policy_store.get_enabled_sorted_by_priority() + + # 2. Find matching policies + evaluated = [] + for policy in policies: + if not self._matches(policy, request): + continue + + # 3. Check unless conditions + unless_triggered = self._evaluate_unless(policy, request) + + if unless_triggered: + if policy.decision == "ALLOW": + # ALLOW inverted to DENY -- decisive, stop + return PolicyEvaluationResult( + decision=PolicyDecision.DENY, + decisive_policy_key=policy.fides_key, + decisive_policy_priority=policy.priority, + unless_triggered=True, + action=policy.action, + evaluated_policies=evaluated, + ) + else: + # DENY suppressed -- not decisive, continue + evaluated.append(EvaluatedPolicyInfo( + policy_key=policy.fides_key, + priority=policy.priority, + matched=True, + result="SUPPRESSED", + unless_triggered=True, + )) + continue + else: + # Decision stands as-is -- decisive, stop + return PolicyEvaluationResult( + decision=PolicyDecision(policy.decision), + decisive_policy_key=policy.fides_key, + decisive_policy_priority=policy.priority, + unless_triggered=False, + action=policy.action if policy.decision == "DENY" else None, + evaluated_policies=evaluated, + ) + + # 4. No policy matched + return PolicyEvaluationResult(decision=PolicyDecision.NO_DECISION) +``` + +### 5. Wire it up + +The `NoOpPolicyEvaluator` gets replaced in the evaluation service. `InProcessPBACEvaluationService.__init__` in `fides/service/pbac/service.py` accepts a `policy_evaluator` parameter: + +```python +# Before (current default): +self._policy_evaluator = policy_evaluator or NoOpPolicyEvaluator() + +# After (with the real implementation): +self._policy_evaluator = policy_evaluator or PolicyV2Evaluator(policy_store) +``` + +The DI factory in fidesplus `deps.py` creates the evaluation service: + +```python +def get_pbac_evaluation_service() -> PBACEvaluationService: + return InProcessPBACEvaluationService() # uses NoOp by default +``` + +Update it to inject the real evaluator: + +```python +def get_pbac_evaluation_service() -> PBACEvaluationService: + return InProcessPBACEvaluationService( + policy_evaluator=PolicyV2Evaluator(policy_store=...) + ) +``` + +## Files to create + +| File | Purpose | +|------|---------| +| `fides/service/pbac/policies/evaluator.py` | `PolicyV2Evaluator` class | +| `fides/service/pbac/policies/store.py` | Policy storage (follow `RedisRepository` pattern from `fides/service/pbac/redis_repository.py`) | +| `fides/service/pbac/policies/match.py` | Match block evaluation (hierarchy, operators) | +| `fides/service/pbac/policies/unless.py` | Unless condition evaluation (consent, geo, data_flow) | +| `fides/service/pbac/policies/entities.py` | Policy entity dataclass (follow `DataPurposeEntity` pattern) | + +## Files that shouldn't need changes + +| File | Why | +|------|-----| +| `fides/service/pbac/evaluate.py` | PBAC engine -- runs before the policy evaluator | +| `fides/service/pbac/service.py` | Orchestrator -- calls the evaluator automatically | +| `fides/service/pbac/types.py` | Shared types -- already defined | +| `fides/service/pbac/policies/interface.py` | The contract -- already defined | + +## Testing + +```python +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + PolicyDecision, + PolicyEvaluationResult, +) + +def test_allow_policy_suppresses_violation(): + evaluator = PolicyV2Evaluator(policy_store=InMemoryPolicyStore([ + Policy(fides_key="allow_marketing", decision="ALLOW", priority=100, + match={"data_use": {"any": ["marketing.advertising"]}}), + ])) + request = AccessEvaluationRequest( + consumer_id="c1", + consumer_name="Analytics Team", + consumer_purposes=frozenset({"marketing"}), + dataset_key="ds1", + dataset_purposes=frozenset({"billing"}), + data_uses=("marketing.advertising",), + ) + result = evaluator.evaluate(request) + assert result.decision == PolicyDecision.ALLOW + assert result.decisive_policy_key == "allow_marketing" + + +def test_no_matching_policy_returns_no_decision(): + evaluator = PolicyV2Evaluator(policy_store=InMemoryPolicyStore([])) + request = AccessEvaluationRequest( + consumer_id="c1", + consumer_name="Test", + consumer_purposes=frozenset(), + dataset_key="ds1", + dataset_purposes=frozenset(), + ) + result = evaluator.evaluate(request) + assert result.decision == PolicyDecision.NO_DECISION + + +def test_unless_inverts_allow_to_deny(): + evaluator = PolicyV2Evaluator(policy_store=InMemoryPolicyStore([ + Policy(fides_key="ccpa_blocker", decision="ALLOW", priority=100, + match={"data_use": {"any": ["marketing"]}}, + unless=[{"type": "consent", "privacy_notice_key": "do_not_sell", + "requirement": "opt_out"}]), + ])) + request = AccessEvaluationRequest( + consumer_id="c1", + consumer_name="Test", + consumer_purposes=frozenset({"marketing"}), + dataset_key="ds1", + dataset_purposes=frozenset({"billing"}), + data_uses=("marketing",), + context={"subject": {"email": "user@example.com"}}, + ) + # Mock consent lookup to return opt_out=True + result = evaluator.evaluate(request) + assert result.decision == PolicyDecision.DENY + assert result.unless_triggered is True +``` + +## PRD reference + +The full policy schema, evaluation algorithm, and examples are in the Confluence PRD: +[Fides Policy Engine v2 Schema Redesign](https://ethyca.atlassian.net/wiki/spaces/PM/pages/4244275202) + +Key sections: +- Section 2: Policy schema (match, unless, action, controls) +- Section 3: Evaluation algorithm (priority-based, first-decisive-match-wins) +- Section 4: Worked examples with evaluation walkthroughs diff --git a/src/fides/service/pbac/policies/__init__.py b/src/fides/service/pbac/policies/__init__.py new file mode 100644 index 0000000000..01278e4098 --- /dev/null +++ b/src/fides/service/pbac/policies/__init__.py @@ -0,0 +1,26 @@ +"""Access policy evaluation interface for Policies v2. + +Defines the Protocol boundary between PBAC enforcement and the +Policy v2 evaluation engine. PBAC checks purpose overlap first; +when purposes do NOT match, the AccessPolicyEvaluator is consulted. +""" + +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + AccessPolicyEvaluator, + EvaluatedPolicyInfo, + PolicyAction, + PolicyDecision, + PolicyEvaluationResult, +) +from fides.service.pbac.policies.noop import NoOpPolicyEvaluator + +__all__ = [ + "AccessEvaluationRequest", + "AccessPolicyEvaluator", + "EvaluatedPolicyInfo", + "NoOpPolicyEvaluator", + "PolicyAction", + "PolicyDecision", + "PolicyEvaluationResult", +] diff --git a/src/fides/service/pbac/policies/interface.py b/src/fides/service/pbac/policies/interface.py new file mode 100644 index 0000000000..73a05da14a --- /dev/null +++ b/src/fides/service/pbac/policies/interface.py @@ -0,0 +1,88 @@ +"""Protocol and types for the Policies v2 evaluation interface. + +Evaluation priority: + 1. PBAC checks consumer-purpose / dataset-purpose overlap. + 2. If purposes match -> compliant (evaluator never called). + 3. If purposes do NOT match -> AccessPolicyEvaluator.evaluate() + 4. ALLOW -> suppress the PBAC violation. + 5. DENY / NO_DECISION -> violation stands. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any, Protocol, runtime_checkable + + +class PolicyDecision(StrEnum): + """Possible outcomes of a policy evaluation.""" + + ALLOW = "ALLOW" + DENY = "DENY" + NO_DECISION = "NO_DECISION" + + +@dataclass(frozen=True) +class AccessEvaluationRequest: + """Context provided to the policy evaluator after a PBAC violation.""" + + # From PBAC violation context + consumer_id: str + consumer_name: str + consumer_purposes: frozenset[str] + dataset_key: str + dataset_purposes: frozenset[str] + collection: str | None = None + + # For Policy v2 match resolution + system_fides_key: str | None = None + data_uses: tuple[str, ...] = () + data_categories: tuple[str, ...] = () + data_subjects: tuple[str, ...] = () + + # Runtime context for unless conditions + context: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class PolicyAction: + """Action block from a decisive policy.""" + + message: str | None = None + + +@dataclass(frozen=True) +class EvaluatedPolicyInfo: + """Summary of a single policy that was evaluated.""" + + policy_key: str + priority: int + matched: bool + result: str # "ALLOW", "DENY", or "SUPPRESSED" + unless_triggered: bool = False + + +@dataclass +class PolicyEvaluationResult: + """Result of evaluating access policies against a request.""" + + decision: PolicyDecision + decisive_policy_key: str | None = None + decisive_policy_priority: int | None = None + unless_triggered: bool = False + evaluated_policies: list[EvaluatedPolicyInfo] = field(default_factory=list) + action: PolicyAction | None = None + reason: str | None = None + + +@runtime_checkable +class AccessPolicyEvaluator(Protocol): + """Interface for the Policies v2 evaluation engine. + + Called only when PBAC purpose checks find a violation. + """ + + def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: + """Evaluate access policies for the given request.""" + ... diff --git a/src/fides/service/pbac/policies/noop.py b/src/fides/service/pbac/policies/noop.py new file mode 100644 index 0000000000..1406fd7655 --- /dev/null +++ b/src/fides/service/pbac/policies/noop.py @@ -0,0 +1,21 @@ +"""No-op policy evaluator -- default until Policy v2 is implemented.""" + +from __future__ import annotations + +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + PolicyDecision, + PolicyEvaluationResult, +) + + +class NoOpPolicyEvaluator: + """Always returns NO_DECISION. + + Used as the default evaluator so that all PBAC violations are + recorded as-is. Replace with the real Policy v2 evaluator by + swapping the factory in deps.py. + """ + + def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: + return PolicyEvaluationResult(decision=PolicyDecision.NO_DECISION) diff --git a/src/fides/service/pbac/purposes/__init__.py b/src/fides/service/pbac/purposes/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/fides/service/pbac/purposes/entities.py b/src/fides/service/pbac/purposes/entities.py new file mode 100644 index 0000000000..ce28c669f3 --- /dev/null +++ b/src/fides/service/pbac/purposes/entities.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import dataclasses +from dataclasses import dataclass +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from fides.api.models.data_purpose import ( # type: ignore[import-not-found] + DataPurpose, + ) + + +@dataclass +class DataPurposeEntity: + id: str + fides_key: str + name: str + description: Optional[str] + organization_fides_key: Optional[str] + tags: Optional[list[str]] + data_use: str + data_subject: Optional[str] + data_categories: list[str] + legal_basis_for_processing: Optional[str] + flexible_legal_basis_for_processing: bool + special_category_legal_basis: Optional[str] + impact_assessment_location: Optional[str] + retention_period: Optional[str] + features: list[str] + created_at: datetime + updated_at: datetime + + def to_dict(self) -> dict: + d = dataclasses.asdict(self) + d["created_at"] = self.created_at.isoformat() + d["updated_at"] = self.updated_at.isoformat() + return d + + @classmethod + def from_dict(cls, d: dict) -> DataPurposeEntity: + d = dict(d) + d["created_at"] = datetime.fromisoformat(d["created_at"]) + d["updated_at"] = datetime.fromisoformat(d["updated_at"]) + return cls(**d) + + @classmethod + def from_orm(cls, obj: DataPurpose) -> DataPurposeEntity: + return cls( + id=obj.id, + fides_key=obj.fides_key, + name=obj.name, + description=obj.description, + organization_fides_key=obj.organization_fides_key, + tags=obj.tags, + data_use=obj.data_use, + data_subject=obj.data_subject, + data_categories=obj.data_categories or [], + legal_basis_for_processing=obj.legal_basis_for_processing, + flexible_legal_basis_for_processing=obj.flexible_legal_basis_for_processing, + special_category_legal_basis=obj.special_category_legal_basis, + impact_assessment_location=obj.impact_assessment_location, + retention_period=obj.retention_period, + features=obj.features or [], + created_at=obj.created_at, + updated_at=obj.updated_at, + ) diff --git a/src/fides/service/pbac/purposes/repository.py b/src/fides/service/pbac/purposes/repository.py new file mode 100644 index 0000000000..3068f886f6 --- /dev/null +++ b/src/fides/service/pbac/purposes/repository.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import uuid4 + +from fastapi_pagination import Page, Params + +from fides.service.pbac.exceptions import ( + ResourceAlreadyExistsError, + ResourceNotFoundError, +) +from fides.service.pbac.purposes.entities import DataPurposeEntity +from fides.service.pbac.redis_repository import RedisRepository + + +class DataPurposeRedisRepository(RedisRepository[DataPurposeEntity]): + """Redis-backed repository for DataPurpose entities.""" + + PREFIX = "data_purpose" + + def _serialize(self, entity: DataPurposeEntity) -> str: + return json.dumps(entity.to_dict()) + + def _deserialize(self, data: str) -> DataPurposeEntity: + return DataPurposeEntity.from_dict(json.loads(data)) + + def _get_pk(self, entity: DataPurposeEntity) -> str: + return entity.fides_key + + def _get_index_entries(self, entity: DataPurposeEntity) -> list[tuple[str, str]]: + entries: list[tuple[str, str]] = [("data_use", entity.data_use)] + if entity.data_subject: + entries.append(("data_subject", entity.data_subject)) + return entries + + # ── Domain methods ──────────────────────────────────────────────── + + def get_by_fides_key(self, fides_key: str) -> Optional[DataPurposeEntity]: + return self.get(fides_key) + + def create(self, **kwargs: Any) -> DataPurposeEntity: + fides_key = kwargs["fides_key"] + if self.exists(fides_key): + raise ResourceAlreadyExistsError( + f"DataPurpose with fides_key '{fides_key}' already exists." + ) + now = datetime.now(timezone.utc) + entity = DataPurposeEntity( + id=kwargs.get("id", str(uuid4())), + fides_key=fides_key, + name=kwargs["name"], + description=kwargs.get("description"), + organization_fides_key=kwargs.get("organization_fides_key"), + tags=kwargs.get("tags"), + data_use=kwargs["data_use"], + data_subject=kwargs.get("data_subject"), + data_categories=kwargs.get("data_categories", []), + legal_basis_for_processing=kwargs.get("legal_basis_for_processing"), + flexible_legal_basis_for_processing=kwargs.get( + "flexible_legal_basis_for_processing", False + ), + special_category_legal_basis=kwargs.get("special_category_legal_basis"), + impact_assessment_location=kwargs.get("impact_assessment_location"), + retention_period=kwargs.get("retention_period"), + features=kwargs.get("features", []), + created_at=now, + updated_at=now, + ) + return self.save(entity) + + def update(self, fides_key: str, **kwargs: Any) -> DataPurposeEntity: + entity = self.get(fides_key) + if not entity: + raise ResourceNotFoundError("DataPurpose", fides_key) + MUTABLE_FIELDS = { + "name", + "description", + "organization_fides_key", + "tags", + "data_use", + "data_subject", + "data_categories", + "legal_basis_for_processing", + "flexible_legal_basis_for_processing", + "special_category_legal_basis", + "impact_assessment_location", + "retention_period", + "features", + } + for field, value in kwargs.items(): + if field in MUTABLE_FIELDS: + setattr(entity, field, value) + entity.updated_at = datetime.now(timezone.utc) + return self.save(entity) + + def delete_by_fides_key(self, fides_key: str) -> None: + if not self.delete(fides_key): + raise ResourceNotFoundError("DataPurpose", fides_key) + + def list_page( # type: ignore[override] + self, + params: Params, + data_use: Optional[str] = None, + data_subject: Optional[str] = None, + ) -> Page[DataPurposeEntity]: + filters: list[tuple[str, str]] = [] + if data_use: + filters.append(("data_use", data_use)) + if data_subject: + filters.append(("data_subject", data_subject)) + return super().list_page(params, filters=filters or None) diff --git a/src/fides/service/pbac/reason.py b/src/fides/service/pbac/reason.py new file mode 100644 index 0000000000..91dd7e3b7e --- /dev/null +++ b/src/fides/service/pbac/reason.py @@ -0,0 +1,42 @@ +"""Human-readable violation reason generation. + +This module has ZERO external dependencies. +""" + +from __future__ import annotations + +from fides.service.pbac.types import Violation + + +def generate_violation_reason(violation: Violation) -> str: + """Produce a detailed, human-readable explanation of a violation.""" + parts: list[str] = [] + + parts.append( + f"Consumer '{violation.consumer_name}' (ID: {violation.consumer_id}) " + f"accessed dataset '{violation.dataset_key}'" + ) + + if violation.collection: + parts.append(f", collection '{violation.collection}'") + + parts.append(".\n\n") + + if not violation.consumer_purposes: + parts.append( + "The consumer has no declared data purposes. All dataset accesses " + "are violations until purposes are assigned to this consumer." + ) + else: + consumer_list = ", ".join(sorted(violation.consumer_purposes)) + dataset_list = ", ".join(sorted(violation.dataset_purposes)) + + parts.append(f"Consumer's declared purposes: {consumer_list}\n") + parts.append(f"Dataset's allowed purposes: {dataset_list}\n\n") + parts.append( + "None of the consumer's declared purposes match any of the " + "dataset's allowed purposes. The consumer is not authorized " + "to access this data under its current purpose assignments." + ) + + return "".join(parts) diff --git a/src/fides/service/pbac/redis_repository.py b/src/fides/service/pbac/redis_repository.py new file mode 100644 index 0000000000..f674faa6c2 --- /dev/null +++ b/src/fides/service/pbac/redis_repository.py @@ -0,0 +1,219 @@ +"""Generic Redis-backed repository base class.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Generic, Optional, TypeVar + +from fastapi_pagination import Page, Params +from loguru import logger + +from fides.api.common_exceptions import RedisConnectionError +from fides.api.util.cache import FidesopsRedis + +T = TypeVar("T") + + +class RedisRepository(ABC, Generic[T]): + """Reusable base class for Redis-backed entity storage. + + Subclasses declare PREFIX, implement serialization hooks, and optionally + declare secondary indexes. + + Public interface: + save, get, delete, exists — single-entity CRUD + get_many, get_all — bulk reads + list_page — paginated listing with optional filters + get_by_index — fetch all entities matching a field value + """ + + PREFIX: str # e.g. "data_purpose" + + def __init__(self, cache: FidesopsRedis) -> None: + self._cache = cache + + # ── Key construction (private) ──────────────────────────────────── + + def _entity_key(self, pk: str) -> str: + return f"{self.PREFIX}:{pk}" + + def _all_key(self) -> str: + return f"{self.PREFIX}:_all" + + def _index_key(self, field: str, value: str) -> str: + return f"{self.PREFIX}:idx:{field}:{value}" + + # ── Subclass hooks ──────────────────────────────────────────────── + + @abstractmethod + def _serialize(self, entity: T) -> str: + """Entity -> JSON string.""" + + @abstractmethod + def _deserialize(self, data: str) -> T: + """JSON string -> Entity.""" + + @abstractmethod + def _get_pk(self, entity: T) -> str: + """Extract primary key from entity.""" + + def _get_index_entries(self, entity: T) -> list[tuple[str, str]]: + """Return (field, value) pairs for secondary indexes.""" + return [] + + # ── CRUD ────────────────────────────────────────────────────────── + + def save(self, entity: T) -> T: + """Write entity + update indexes using a pipeline.""" + pk = self._get_pk(entity) + key = self._entity_key(pk) + + try: + pipe = self._cache.pipeline() + # Remove stale index entries if entity already exists + existing_data = self._cache.get(key) + if existing_data: + old_entity = self._deserialize(existing_data) + for field, value in self._get_index_entries(old_entity): + pipe.srem(self._index_key(field, value), pk) + + # Write entity + add new indexes + pipe.set(key, self._serialize(entity)) + pipe.sadd(self._all_key(), pk) + for field, value in self._get_index_entries(entity): + pipe.sadd(self._index_key(field, value), pk) + pipe.execute() + except RedisConnectionError: + logger.error("Redis connection error during save of {}:{}", self.PREFIX, pk) + raise + + return entity + + def get(self, pk: str) -> Optional[T]: + """Read a single entity by primary key.""" + try: + data = self._cache.get(self._entity_key(pk)) + if data: + return self._deserialize(data) + return None + except RedisConnectionError: + logger.error("Redis connection error during get of {}:{}", self.PREFIX, pk) + raise + + def delete(self, pk: str) -> bool: + """Remove entity + clean indexes. Returns True if entity existed.""" + try: + key = self._entity_key(pk) + data = self._cache.get(key) + if not data: + return False + + entity = self._deserialize(data) + pipe = self._cache.pipeline() + pipe.delete(key) + pipe.srem(self._all_key(), pk) + for field, value in self._get_index_entries(entity): + pipe.srem(self._index_key(field, value), pk) + pipe.execute() + return True + except RedisConnectionError: + logger.error( + "Redis connection error during delete of {}:{}", self.PREFIX, pk + ) + raise + + def exists(self, pk: str) -> bool: + """Check if an entity exists by primary key.""" + try: + return bool(self._cache.exists(self._entity_key(pk))) + except RedisConnectionError: + logger.error( + "Redis connection error during exists check of {}:{}", self.PREFIX, pk + ) + raise + + # ── Bulk / pagination ───────────────────────────────────────────── + + def get_many(self, pks: list[str]) -> list[T]: + """Fetch multiple entities by primary key.""" + if not pks: + return [] + try: + keys = [self._entity_key(pk) for pk in pks] + values = self._cache.mget(keys) + return [self._deserialize(v) for v in values if v is not None] + except RedisConnectionError: + logger.error("Redis connection error during get_many of {}", self.PREFIX) + raise + + def get_all(self) -> list[T]: + """Return all entities.""" + try: + pks = self._cache.smembers(self._all_key()) + if not pks: + return [] + return self.get_many(sorted(pks)) + except RedisConnectionError: + logger.error("Redis connection error during get_all of {}", self.PREFIX) + raise + + def list_page( + self, + params: Params, + filters: Optional[list[tuple[str, str]]] = None, + ) -> Page[T]: + """Paginate over entities, optionally filtered by index values. + + Args: + params: Pagination parameters. + filters: Optional list of (field, value) tuples. When provided, + only entities matching ALL filters are returned (intersection). + """ + try: + pks = self._resolve_pks(filters) + sorted_pks = sorted(pks) if pks else [] + total = len(sorted_pks) + + page_num = params.page or 1 + page_size = params.size or 50 + start = (page_num - 1) * page_size + end = start + page_size + page_pks = sorted_pks[start:end] + + items = self.get_many(page_pks) if page_pks else [] + + return Page.create( + items=items, + params=params, + total=total, + ) + except RedisConnectionError: + logger.error("Redis connection error during list_page of {}", self.PREFIX) + raise + + def get_by_index(self, field: str, value: str) -> list[T]: + """Return all entities matching a secondary index value.""" + try: + pks = self._cache.smembers(self._index_key(field, value)) + if not pks: + return [] + return self.get_many(sorted(pks)) + except RedisConnectionError: + logger.error( + "Redis connection error during get_by_index of {}", self.PREFIX + ) + raise + + # ── Private helpers ─────────────────────────────────────────────── + + def _resolve_pks(self, filters: Optional[list[tuple[str, str]]] = None) -> set[str]: + """Resolve the set of primary keys matching the given filters.""" + if not filters: + return self._cache.smembers(self._all_key()) or set() + + index_keys = [self._index_key(field, value) for field, value in filters] + + if len(index_keys) == 1: + return self._cache.smembers(index_keys[0]) or set() + + return self._cache.sinter(index_keys) or set() diff --git a/src/fides/service/pbac/service.py b/src/fides/service/pbac/service.py new file mode 100644 index 0000000000..1a26699a88 --- /dev/null +++ b/src/fides/service/pbac/service.py @@ -0,0 +1,293 @@ +"""PBACEvaluationService Protocol and in-process implementation. + +Input: RawQueryLogEntry (platform-agnostic query log data). +Output: EvaluationResult (flat, serializable). + +The implementation owns consumer resolution, dataset resolution, +purpose lookup, PBAC evaluation, and policy filtering. +""" + +from __future__ import annotations + +from typing import Optional, Protocol, runtime_checkable + +from loguru import logger + +from fides.api.util.cache import FidesopsRedis, get_cache +from fides.service.pbac.consumers.entities import DataConsumerEntity +from fides.service.pbac.consumers.repository import DataConsumerRedisRepository +from fides.service.pbac.evaluate import evaluate_access +from fides.service.pbac.identity.resolver import DatasetResolver, RedisIdentityResolver +from fides.service.pbac.policies import ( + AccessEvaluationRequest, + AccessPolicyEvaluator, + NoOpPolicyEvaluator, + PolicyDecision, +) +from fides.service.pbac.purposes.repository import DataPurposeRedisRepository +from fides.service.pbac.types import ( + ConsumerPurposes, + DatasetPurposes, + EvaluationResult, + EvaluationViolation, + QueryAccess, + RawQueryLogEntry, + ResolvedConsumer, + ValidationResult, + Violation, +) + +PBAC_CONTROL_TYPE = "purpose_restriction" + + +@runtime_checkable +class PBACEvaluationService(Protocol): + """Service boundary for PBAC evaluation. + + Implementations own the full evaluation pipeline: + 1. Resolve consumer identity (email -> consumer + purposes) + 2. Resolve datasets (table refs -> fides keys) + 3. Build purpose maps + 4. Run PBAC engine (purpose overlap check) + 5. Run Policy v2 engine (override check) + 6. Return flat EvaluationResult + + To swap in a Go service later, implement this Protocol as a + gRPC/HTTP client. + """ + + def evaluate( + self, + entry: RawQueryLogEntry, + dataset_purpose_overrides: dict[str, list[str]] | None = None, + ) -> EvaluationResult: + """Evaluate a query log entry for PBAC compliance. + + Args: + entry: Normalized query log entry (from SQL parser or connector). + dataset_purpose_overrides: Optional explicit dataset -> purpose + mapping. When provided, overrides the default purpose lookup. + + Returns: + EvaluationResult with compliance verdict and any violations. + """ + ... + + +class InProcessPBACEvaluationService: + """In-process implementation of the PBAC evaluation service. + + Owns the full evaluation pipeline: + 1. Resolve consumer identity + 2. Resolve datasets + 3. Build purpose maps + 4. Run PBAC engine + 5. Filter through Policy v2 + 6. Return flat EvaluationResult + """ + + def __init__( + self, + cache: Optional[FidesopsRedis] = None, + policy_evaluator: Optional[AccessPolicyEvaluator] = None, + dataset_purposes: dict[str, DatasetPurposes] | None = None, + ) -> None: + if cache is None: + cache = get_cache() + self._cache = cache + self._purpose_repo = DataPurposeRedisRepository(cache) + self._consumer_repo = DataConsumerRedisRepository(cache, self._purpose_repo) + self._identity_resolver = RedisIdentityResolver(self._consumer_repo) + self._dataset_resolver = DatasetResolver() + self._policy_evaluator: AccessPolicyEvaluator = ( + policy_evaluator or NoOpPolicyEvaluator() + ) + self._dataset_purposes = dataset_purposes or {} + + def evaluate( + self, + entry: RawQueryLogEntry, + dataset_purpose_overrides: dict[str, list[str]] | None = None, + ) -> EvaluationResult: + """Evaluate a query log entry for PBAC compliance.""" + # 1. Resolve consumer + consumer = self._identity_resolver.resolve( + user_email=entry.user_email, + principal_subject=entry.principal_subject, + ) + + # 2. Resolve datasets + dataset_keys: list[str] = [] + for table_ref in entry.referenced_tables: + fides_key = self._dataset_resolver.resolve(table_ref) + if fides_key: + dataset_keys.append(fides_key) + + # 3. Build engine inputs + consumer_purposes = self._build_consumer_purposes(consumer, entry) + + if dataset_purpose_overrides: + ds_purposes_map = { + dataset_key: DatasetPurposes( + dataset_key=dataset_key, + purpose_keys=frozenset( + dataset_purpose_overrides.get(dataset_key, []) + ), + ) + for dataset_key in dataset_keys + } + else: + ds_purposes_map = self._build_dataset_purposes(dataset_keys) + + query_access = QueryAccess( + query_id=entry.external_job_id, + consumer_id=consumer.id if consumer else entry.user_email, + dataset_keys=tuple(dataset_keys), + timestamp=entry.timestamp, + raw_query=entry.query_text, + ) + + # 4. PBAC engine + result = evaluate_access(consumer_purposes, ds_purposes_map, query_access) + + # 5. Filter through Policy v2 + result = self._filter_violations_through_policies(result, consumer) + + # 6. Build flat EvaluationResult + resolved = self._build_resolved_consumer(consumer, entry) + violations = self._build_evaluation_violations(result) + + return EvaluationResult( + query_id=entry.external_job_id, + consumer=resolved, + dataset_keys=tuple(dataset_keys), + is_compliant=result.is_compliant, + violations=violations, + total_accesses=result.total_accesses, + timestamp=entry.timestamp, + query_text=entry.query_text, + ) + + # ── Private helpers ──────────────────────────────────────────────── + + def _build_consumer_purposes( + self, + consumer: DataConsumerEntity | None, + entry: RawQueryLogEntry, + ) -> ConsumerPurposes: + if consumer: + return ConsumerPurposes( + consumer_id=consumer.id, + consumer_name=consumer.name, + purpose_keys=frozenset(consumer.purpose_fides_keys), + ) + return ConsumerPurposes( + consumer_id=entry.user_email, + consumer_name=entry.user_email, + purpose_keys=frozenset(), + ) + + def _build_dataset_purposes( + self, + dataset_keys: list[str], + ) -> dict[str, DatasetPurposes]: + result: dict[str, DatasetPurposes] = {} + for dataset_key in dataset_keys: + if dataset_key in self._dataset_purposes: + result[dataset_key] = self._dataset_purposes[dataset_key] + else: + result[dataset_key] = DatasetPurposes( + dataset_key=dataset_key, + purpose_keys=frozenset(), + ) + return result + + def _filter_violations_through_policies( + self, + validation_result: ValidationResult, + consumer: DataConsumerEntity | None, + ) -> ValidationResult: + """Filter PBAC violations through the access policy evaluator.""" + if validation_result.is_compliant: + return validation_result + + remaining: list[Violation] = [] + for violation in validation_result.violations: + request = AccessEvaluationRequest( + consumer_id=violation.consumer_id, + consumer_name=violation.consumer_name, + consumer_purposes=violation.consumer_purposes, + dataset_key=violation.dataset_key, + collection=violation.collection, + dataset_purposes=violation.dataset_purposes, + system_fides_key=consumer.system_fides_key if consumer else None, + ) + result = self._policy_evaluator.evaluate(request) + if result.decision == PolicyDecision.ALLOW: + logger.info( + "Policy '{}' overrides PBAC violation for consumer '{}' on '{}'", + result.decisive_policy_key, + violation.consumer_name, + violation.dataset_key, + ) + else: + remaining.append(violation) + + return ValidationResult( + violations=remaining, + is_compliant=len(remaining) == 0, + total_accesses=validation_result.total_accesses, + checked_at=validation_result.checked_at, + ) + + def _build_resolved_consumer( + self, + consumer: DataConsumerEntity | None, + entry: RawQueryLogEntry, + ) -> ResolvedConsumer: + if consumer: + return ResolvedConsumer( + id=consumer.id, + name=consumer.name, + email=consumer.contact_email, + type=consumer.type, + external_id=consumer.external_id, + system_fides_key=consumer.system_fides_key, + purpose_fides_keys=tuple(consumer.purpose_fides_keys), + ) + return ResolvedConsumer( + id=None, + name=entry.user_email, + email=entry.user_email, + type="unresolved", + external_id=None, + system_fides_key=None, + purpose_fides_keys=(), + ) + + def _build_evaluation_violations( + self, + validation_result: ValidationResult, + ) -> tuple[EvaluationViolation, ...]: + violations: list[EvaluationViolation] = [] + for violation in validation_result.violations: + # Resolve data_use from dataset purposes + data_use = None + for purpose_key in violation.dataset_purposes: + purpose = self._purpose_repo.get(purpose_key) + if purpose and purpose.data_use: + data_use = purpose.data_use + break + + violations.append( + EvaluationViolation( + dataset_key=violation.dataset_key, + collection=violation.collection, + consumer_purposes=tuple(sorted(violation.consumer_purposes)), + dataset_purposes=tuple(sorted(violation.dataset_purposes)), + data_use=data_use, + reason=violation.reason, + control=PBAC_CONTROL_TYPE, + ) + ) + return tuple(violations) diff --git a/src/fides/service/pbac/sql_parser.py b/src/fides/service/pbac/sql_parser.py new file mode 100644 index 0000000000..0f5d5badd6 --- /dev/null +++ b/src/fides/service/pbac/sql_parser.py @@ -0,0 +1,92 @@ +"""Generic SQL parser — extracts table references from SQL text. + +Uses sqlglot for dialect-agnostic SQL parsing. Produces a +RawQueryLogEntry ready for the PBACEvaluationService. +""" + +from __future__ import annotations + +import logging +import re +from datetime import datetime, timezone +from uuid import uuid4 + +import sqlglot +import sqlglot.expressions as exp + +from fides.service.pbac.types import RawQueryLogEntry, TableRef + +logger = logging.getLogger(__name__) + + +def parse_query( + query_text: str, + user_email: str, + timestamp: datetime | None = None, + source_id: str = "sql_parser", +) -> RawQueryLogEntry: + """Parse generic SQL into a RawQueryLogEntry. + + Extracts table references and statement type using sqlglot + (dialect-agnostic). The resulting entry can be passed directly + to ``PBACEvaluationService.evaluate()``. + """ + tables = extract_table_refs(query_text) + stmt_type = detect_statement_type(query_text) + return RawQueryLogEntry( + source_id=source_id, + external_job_id=str(uuid4()), + user_email=user_email, + query_text=query_text, + statement_type=stmt_type, + referenced_tables=tables, + timestamp=timestamp or datetime.now(timezone.utc), + ) + + +def extract_table_refs(query_text: str) -> list[TableRef]: + """Extract table references from SQL using sqlglot.""" + refs: list[TableRef] = [] + try: + parsed = sqlglot.parse(query_text) + except Exception: + logger.warning("Failed to parse SQL query for PBAC evaluation", exc_info=True) + return refs + + for statement in parsed: + if statement is None: + continue + for table in statement.find_all(exp.Table): + # Skip subquery aliases and CTEs that show up as Table nodes + if not table.name: + continue + refs.append( + TableRef( + project=table.catalog or "", + dataset=table.db or "", + table=table.name, + ) + ) + return refs + + +def detect_statement_type(query_text: str) -> str: + """Detect the SQL statement type from the query text.""" + # Strip leading single-line comments (-- ...) and block comments (/* ... */) + normalized = query_text.strip() + normalized = re.sub(r"^(--[^\n]*\n\s*)+", "", normalized, flags=re.MULTILINE) + normalized = re.sub(r"^(/\*.*?\*/\s*)+", "", normalized, flags=re.DOTALL) + normalized = normalized.strip().upper() + for stmt_type in ( + "SELECT", + "INSERT", + "UPDATE", + "DELETE", + "MERGE", + "CREATE", + "DROP", + "ALTER", + ): + if normalized.startswith(stmt_type): + return stmt_type + return "UNKNOWN" diff --git a/src/fides/service/pbac/types.py b/src/fides/service/pbac/types.py new file mode 100644 index 0000000000..56044219f0 --- /dev/null +++ b/src/fides/service/pbac/types.py @@ -0,0 +1,163 @@ +"""All PBAC types: input types, engine types, and service boundary types. + +These types are plain dataclasses with zero external dependencies — directly +serializable to JSON or protobuf. They are the shared contract between the +SQL parser, platform connectors, the evaluation engine, and the service layer. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +# ── Input types ──────────────────────────────────────────────────────── + + +@dataclass +class TableRef: + """A reference to a table in an external data platform.""" + + project: str + dataset: str + table: str + + @property + def qualified_name(self) -> str: + """Full dot-separated identifier.""" + parts = [p for p in (self.project, self.dataset, self.table) if p] + return ".".join(parts) + + +@dataclass +class RawQueryLogEntry: + """Platform-agnostic normalized query log entry. + + Produced by the SQL parser (OSS) or platform connectors (fidesplus). + Consumed by the PBACEvaluationService. + """ + + source_id: str + external_job_id: str + user_email: str + query_text: str + statement_type: str + referenced_tables: list[TableRef] + timestamp: datetime + principal_subject: str | None = None + raw_payload: dict[str, Any] = field(default_factory=dict) + + +# ── Engine types ─────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class ConsumerPurposes: + """The declared purposes for a data consumer.""" + + consumer_id: str + consumer_name: str + purpose_keys: frozenset[str] + + +@dataclass(frozen=True) +class DatasetPurposes: + """The declared purposes for a dataset, including per-collection purposes. + + Purpose inheritance is **additive**: a collection's effective purposes are + ``purpose_keys | collection_purposes.get(collection, frozenset())``. + """ + + dataset_key: str + purpose_keys: frozenset[str] # dataset-level purposes + collection_purposes: dict[str, frozenset[str]] = field( + default_factory=dict + ) # collection name -> additional purposes + + def effective_purposes(self, collection: str | None = None) -> frozenset[str]: + """Return the effective purposes for a collection (additive inheritance).""" + base = self.purpose_keys + if collection and collection in self.collection_purposes: + return base | self.collection_purposes[collection] + return base + + +@dataclass(frozen=True) +class QueryAccess: + """A single query event to validate.""" + + query_id: str + consumer_id: str + dataset_keys: tuple[str, ...] + timestamp: datetime + collections: dict[str, tuple[str, ...]] = field( + default_factory=dict + ) # dataset_key -> (collection_names) + raw_query: str | None = None + + +@dataclass(frozen=True) +class Violation: + """A single purpose-based access violation.""" + + query_id: str + consumer_id: str + consumer_name: str + dataset_key: str + collection: str | None + consumer_purposes: frozenset[str] + dataset_purposes: frozenset[str] + reason: str + + +@dataclass +class ValidationResult: + """The result of validating a query against purpose assignments.""" + + violations: list[Violation] + is_compliant: bool + total_accesses: int + checked_at: datetime + + +# ── Service boundary types ───────────────────────────────────────────── + + +@dataclass(frozen=True) +class ResolvedConsumer: + """Consumer identity as resolved by the evaluation service.""" + + id: str | None # None if unresolved + name: str # email if unresolved + email: str | None + type: str # "group", "project", "system", "unresolved" + external_id: str | None # reference to outside user groups or roles + system_fides_key: str | None + purpose_fides_keys: tuple[str, ...] + + +@dataclass(frozen=True) +class EvaluationViolation: + """A single violation produced by evaluation.""" + + dataset_key: str + collection: str | None + consumer_purposes: tuple[str, ...] + dataset_purposes: tuple[str, ...] + data_use: str | None # resolved from dataset purposes + reason: str + control: str # "purpose_restriction" for PBAC + + +@dataclass(frozen=True) +class EvaluationResult: + """Complete result of evaluating a RawQueryLogEntry.""" + + query_id: str + consumer: ResolvedConsumer + dataset_keys: tuple[str, ...] + is_compliant: bool + violations: tuple[EvaluationViolation, ...] + total_accesses: int + timestamp: datetime + query_text: str | None diff --git a/tests/service/pbac/__init__.py b/tests/service/pbac/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/service/pbac/evaluation/__init__.py b/tests/service/pbac/evaluation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/service/pbac/evaluation/test_evaluate_access.py b/tests/service/pbac/evaluation/test_evaluate_access.py new file mode 100644 index 0000000000..79d757b010 --- /dev/null +++ b/tests/service/pbac/evaluation/test_evaluate_access.py @@ -0,0 +1,242 @@ +"""Tests for the PBAC evaluation engine (evaluate_access). + +Covers: +- Rule 1: consumer with no purposes -> violation for every dataset +- Rule 2: consumer purposes don't intersect dataset purposes -> violation +- Rule 3: dataset with no declared purposes -> compliant +- Collection-level additive purpose inheritance +- Multiple datasets in a single query +""" + +from datetime import datetime, timezone + +import pytest + +from fides.service.pbac.evaluate import evaluate_access +from fides.service.pbac.types import ( + ConsumerPurposes, + DatasetPurposes, + QueryAccess, +) + + +@pytest.fixture +def now(): + return datetime.now(timezone.utc) + + +def _make_consumer(purpose_keys: frozenset[str]) -> ConsumerPurposes: + return ConsumerPurposes( + consumer_id="consumer-1", + consumer_name="test-consumer", + purpose_keys=purpose_keys, + ) + + +def _make_query( + dataset_keys: tuple[str, ...], + collections: dict[str, tuple[str, ...]] | None = None, + now: datetime | None = None, +) -> QueryAccess: + return QueryAccess( + query_id="q-1", + consumer_id="consumer-1", + dataset_keys=dataset_keys, + timestamp=now or datetime.now(timezone.utc), + collections=collections or {}, + ) + + +# --- Rule 1: consumer with no purposes --- + + +def test_no_consumer_purposes_produces_violation(now): + consumer = _make_consumer(frozenset()) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + ) + } + query = _make_query(("ds_billing",), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert not result.is_compliant + assert len(result.violations) == 1 + assert "no declared purposes" in result.violations[0].reason + + +def test_no_consumer_purposes_with_unrestricted_dataset(now): + """Rule 1 takes precedence over Rule 3 -- even if the dataset has no purposes, + a consumer with no purposes is still a violation.""" + consumer = _make_consumer(frozenset()) + datasets = { + "ds_open": DatasetPurposes( + dataset_key="ds_open", + purpose_keys=frozenset(), + ) + } + query = _make_query(("ds_open",), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert not result.is_compliant + assert len(result.violations) == 1 + + +# --- Rule 2: purpose mismatch --- + + +def test_purpose_mismatch_produces_violation(now): + consumer = _make_consumer(frozenset({"analytics"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + ) + } + query = _make_query(("ds_billing",), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert not result.is_compliant + assert len(result.violations) == 1 + assert "do not overlap" in result.violations[0].reason + + +def test_purpose_overlap_is_compliant(now): + consumer = _make_consumer(frozenset({"billing", "analytics"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + ) + } + query = _make_query(("ds_billing",), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert result.is_compliant + assert len(result.violations) == 0 + + +# --- Rule 3: dataset with no declared purposes --- + + +def test_unrestricted_dataset_is_compliant(now): + consumer = _make_consumer(frozenset({"billing"})) + datasets = { + "ds_open": DatasetPurposes( + dataset_key="ds_open", + purpose_keys=frozenset(), + ) + } + query = _make_query(("ds_open",), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert result.is_compliant + + +def test_unknown_dataset_is_compliant(now): + """A dataset not in the purposes map at all is treated as compliant.""" + consumer = _make_consumer(frozenset({"billing"})) + query = _make_query(("ds_unknown",), now=now) + + result = evaluate_access(consumer, {}, query) + + assert result.is_compliant + + +# --- Collection-level additive inheritance --- + + +def test_collection_inherits_dataset_purposes(now): + """A collection with no extra purposes inherits dataset-level purposes.""" + consumer = _make_consumer(frozenset({"billing"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + collection_purposes={}, + ) + } + query = _make_query( + ("ds_billing",), + collections={"ds_billing": ("users",)}, + now=now, + ) + + result = evaluate_access(consumer, datasets, query) + + assert result.is_compliant + + +def test_collection_adds_own_purposes(now): + """Collection purposes are additive -- consumer matches collection-level purpose + even if it doesn't match dataset-level.""" + consumer = _make_consumer(frozenset({"analytics"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + collection_purposes={"events": frozenset({"analytics"})}, + ) + } + query = _make_query( + ("ds_billing",), + collections={"ds_billing": ("events",)}, + now=now, + ) + + result = evaluate_access(consumer, datasets, query) + + assert result.is_compliant + + +def test_collection_mismatch_produces_violation(now): + """Consumer purpose doesn't overlap with either dataset or collection purposes.""" + consumer = _make_consumer(frozenset({"marketing"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + collection_purposes={"events": frozenset({"analytics"})}, + ) + } + query = _make_query( + ("ds_billing",), + collections={"ds_billing": ("events",)}, + now=now, + ) + + result = evaluate_access(consumer, datasets, query) + + assert not result.is_compliant + assert len(result.violations) == 1 + + +# --- Multiple datasets --- + + +def test_multiple_datasets_mixed_compliance(now): + consumer = _make_consumer(frozenset({"billing"})) + datasets = { + "ds_billing": DatasetPurposes( + dataset_key="ds_billing", + purpose_keys=frozenset({"billing"}), + ), + "ds_analytics": DatasetPurposes( + dataset_key="ds_analytics", + purpose_keys=frozenset({"analytics"}), + ), + } + query = _make_query(("ds_billing", "ds_analytics"), now=now) + + result = evaluate_access(consumer, datasets, query) + + assert not result.is_compliant + assert result.total_accesses == 2 + assert len(result.violations) == 1 + assert result.violations[0].dataset_key == "ds_analytics" diff --git a/tests/service/pbac/evaluation/test_evaluation_service.py b/tests/service/pbac/evaluation/test_evaluation_service.py new file mode 100644 index 0000000000..3f5b7b3478 --- /dev/null +++ b/tests/service/pbac/evaluation/test_evaluation_service.py @@ -0,0 +1,242 @@ +"""Integration tests for InProcessPBACEvaluationService.evaluate(). + +Covers the full pipeline through the public interface: +- Consumer resolution via Redis (registered vs unregistered) +- Dataset purpose lookup from the init-time map +- Collection-level purpose inheritance end-to-end +- Interaction between dataset_purpose_overrides and the init map +""" + +from datetime import datetime, timezone + +import pytest + +from fides.service.pbac.consumers.entities import DataConsumerEntity +from fides.service.pbac.consumers.repository import DataConsumerRedisRepository +from fides.service.pbac.purposes.repository import DataPurposeRedisRepository +from fides.service.pbac.service import InProcessPBACEvaluationService +from fides.service.pbac.types import DatasetPurposes, RawQueryLogEntry, TableRef + + +@pytest.fixture +def purpose_repo(cache): + return DataPurposeRedisRepository(cache) + + +@pytest.fixture +def consumer_repo(cache, purpose_repo): + return DataConsumerRedisRepository(cache, purpose_repo) + + +@pytest.fixture +def registered_consumer(consumer_repo): + """A consumer registered in Redis with billing + analytics purposes.""" + now = datetime.now(timezone.utc) + entity = DataConsumerEntity( + id="consumer-integ-1", + name="Billing Pipeline", + type="group", + contact_email="billing@example.com", + purpose_fides_keys=["billing", "analytics"], + created_at=now, + updated_at=now, + ) + consumer_repo.save(entity) + yield entity + # Cleanup + try: + consumer_repo.delete(entity.id) + except Exception: + pass + + +@pytest.fixture +def dataset_purposes_map(): + return { + "billing_db": DatasetPurposes( + dataset_key="billing_db", + purpose_keys=frozenset({"billing"}), + collection_purposes={ + "invoices": frozenset({"accounting"}), + }, + ), + "analytics_db": DatasetPurposes( + dataset_key="analytics_db", + purpose_keys=frozenset({"analytics"}), + ), + "marketing_db": DatasetPurposes( + dataset_key="marketing_db", + purpose_keys=frozenset({"marketing"}), + ), + } + + +def _make_entry( + user_email: str, + tables: list[TableRef], +) -> RawQueryLogEntry: + return RawQueryLogEntry( + source_id="test", + external_job_id="job-1", + user_email=user_email, + query_text="SELECT 1", + statement_type="SELECT", + referenced_tables=tables, + timestamp=datetime.now(timezone.utc), + ) + + +# --- Registered consumer + dataset purposes from init map --- + + +@pytest.mark.integration +class TestRegisteredConsumerWithDatasetPurposes: + def test_compliant_when_purposes_overlap( + self, cache, registered_consumer, dataset_purposes_map + ): + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "billing@example.com", + [TableRef(project="", dataset="billing_db", table="invoices")], + ) + + result = service.evaluate(entry) + + assert result.is_compliant + assert result.consumer.id == "consumer-integ-1" + assert result.consumer.type == "group" + + def test_violation_when_purposes_do_not_overlap( + self, cache, registered_consumer, dataset_purposes_map + ): + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "billing@example.com", + [TableRef(project="", dataset="marketing_db", table="campaigns")], + ) + + result = service.evaluate(entry) + + assert not result.is_compliant + assert len(result.violations) == 1 + assert result.violations[0].dataset_key == "marketing_db" + assert result.violations[0].control == "purpose_restriction" + + def test_mixed_compliance_across_datasets( + self, cache, registered_consumer, dataset_purposes_map + ): + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "billing@example.com", + [ + TableRef(project="", dataset="billing_db", table="invoices"), + TableRef(project="", dataset="marketing_db", table="campaigns"), + ], + ) + + result = service.evaluate(entry) + + assert not result.is_compliant + assert result.total_accesses == 2 + assert len(result.violations) == 1 + assert result.violations[0].dataset_key == "marketing_db" + + +# --- Unregistered consumer --- + + +@pytest.mark.integration +class TestUnregisteredConsumer: + def test_unregistered_consumer_always_violates(self, cache, dataset_purposes_map): + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "unknown@example.com", + [TableRef(project="", dataset="billing_db", table="invoices")], + ) + + result = service.evaluate(entry) + + assert not result.is_compliant + assert result.consumer.type == "unresolved" + assert "no declared purposes" in result.violations[0].reason + + +# --- Dataset not in the purposes map --- + + +@pytest.mark.integration +class TestDatasetNotInMap: + def test_unknown_dataset_is_compliant_for_registered_consumer( + self, cache, registered_consumer, dataset_purposes_map + ): + """A dataset with no entry in the map has no declared purposes -> Rule 3.""" + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "billing@example.com", + [TableRef(project="", dataset="unknown_db", table="things")], + ) + + result = service.evaluate(entry) + + assert result.is_compliant + + +# --- dataset_purpose_overrides takes precedence --- + + +@pytest.mark.integration +class TestDatasetPurposeOverrides: + def test_overrides_take_precedence_over_init_map( + self, cache, registered_consumer, dataset_purposes_map + ): + """Explicit overrides passed to evaluate() override the init-time map.""" + service = InProcessPBACEvaluationService( + cache=cache, + dataset_purposes=dataset_purposes_map, + ) + entry = _make_entry( + "billing@example.com", + [TableRef(project="", dataset="marketing_db", table="campaigns")], + ) + + # marketing_db would normally violate, but override gives it billing purpose + result = service.evaluate( + entry, + dataset_purpose_overrides={"marketing_db": ["billing"]}, + ) + + assert result.is_compliant + + +# --- No dataset purposes provided (backward compat) --- + + +@pytest.mark.integration +class TestNoDatasetPurposes: + def test_no_map_means_all_datasets_unrestricted(self, cache, registered_consumer): + """Without a dataset_purposes map, all datasets fall back to empty + purposes -> Rule 3 (compliant for registered consumers).""" + service = InProcessPBACEvaluationService(cache=cache) + entry = _make_entry( + "billing@example.com", + [TableRef(project="", dataset="billing_db", table="invoices")], + ) + + result = service.evaluate(entry) + + assert result.is_compliant diff --git a/tests/service/pbac/policies/__init__.py b/tests/service/pbac/policies/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/service/pbac/policies/test_interface.py b/tests/service/pbac/policies/test_interface.py new file mode 100644 index 0000000000..0b94c41ce9 --- /dev/null +++ b/tests/service/pbac/policies/test_interface.py @@ -0,0 +1,155 @@ +"""Tests for the Policies v2 evaluation interface types.""" + +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + AccessPolicyEvaluator, + EvaluatedPolicyInfo, + PolicyAction, + PolicyDecision, + PolicyEvaluationResult, +) +from fides.service.pbac.policies.noop import NoOpPolicyEvaluator + + +class TestPolicyDecision: + def test_enum_values(self) -> None: + assert PolicyDecision.ALLOW == "ALLOW" + assert PolicyDecision.DENY == "DENY" + assert PolicyDecision.NO_DECISION == "NO_DECISION" + + def test_enum_membership(self) -> None: + assert "ALLOW" in PolicyDecision.__members__ + assert "DENY" in PolicyDecision.__members__ + assert "NO_DECISION" in PolicyDecision.__members__ + + +class TestAccessEvaluationRequest: + def test_minimal_construction(self) -> None: + request = AccessEvaluationRequest( + consumer_id="consumer-1", + consumer_name="Analytics Team", + consumer_purposes=frozenset({"marketing"}), + dataset_key="postgres_main", + dataset_purposes=frozenset({"billing"}), + ) + assert request.consumer_id == "consumer-1" + assert request.consumer_name == "Analytics Team" + assert request.consumer_purposes == frozenset({"marketing"}) + assert request.dataset_key == "postgres_main" + assert request.dataset_purposes == frozenset({"billing"}) + assert request.collection is None + assert request.system_fides_key is None + assert request.data_uses == () + assert request.data_categories == () + assert request.data_subjects == () + assert request.context == {} + + def test_full_construction(self) -> None: + request = AccessEvaluationRequest( + consumer_id="consumer-1", + consumer_name="Analytics Team", + consumer_purposes=frozenset({"marketing"}), + dataset_key="postgres_main", + dataset_purposes=frozenset({"billing"}), + collection="users", + system_fides_key="analytics_platform", + data_uses=("marketing.advertising",), + data_categories=("user.contact.email",), + data_subjects=("customer",), + context={"environment": {"geo_location": "US-CA"}}, + ) + assert request.collection == "users" + assert request.system_fides_key == "analytics_platform" + assert request.data_uses == ("marketing.advertising",) + assert request.data_categories == ("user.contact.email",) + assert request.data_subjects == ("customer",) + assert request.context == {"environment": {"geo_location": "US-CA"}} + + def test_frozen(self) -> None: + request = AccessEvaluationRequest( + consumer_id="c1", + consumer_name="Test", + consumer_purposes=frozenset(), + dataset_key="ds1", + dataset_purposes=frozenset(), + ) + try: + request.consumer_id = "c2" # type: ignore[misc] + assert False, "Should have raised FrozenInstanceError" + except AttributeError: + pass + + +class TestPolicyEvaluationResult: + def test_no_decision(self) -> None: + result = PolicyEvaluationResult(decision=PolicyDecision.NO_DECISION) + assert result.decision == PolicyDecision.NO_DECISION + assert result.decisive_policy_key is None + assert result.decisive_policy_priority is None + assert result.unless_triggered is False + assert result.evaluated_policies == [] + assert result.action is None + assert result.reason is None + + def test_allow_with_policy(self) -> None: + result = PolicyEvaluationResult( + decision=PolicyDecision.ALLOW, + decisive_policy_key="ccpa_sale_blocker", + decisive_policy_priority=100, + evaluated_policies=[ + EvaluatedPolicyInfo( + policy_key="ccpa_sale_blocker", + priority=100, + matched=True, + result="ALLOW", + ) + ], + ) + assert result.decision == PolicyDecision.ALLOW + assert result.decisive_policy_key == "ccpa_sale_blocker" + assert len(result.evaluated_policies) == 1 + + def test_deny_with_action(self) -> None: + result = PolicyEvaluationResult( + decision=PolicyDecision.DENY, + decisive_policy_key="block_third_party_ads", + decisive_policy_priority=200, + action=PolicyAction(message="Third-party advertising is not permitted."), + reason="Policy explicitly denies this data use.", + ) + assert result.decision == PolicyDecision.DENY + assert result.action is not None + assert result.action.message == "Third-party advertising is not permitted." + assert result.reason is not None + + +class TestEvaluatedPolicyInfo: + def test_construction(self) -> None: + info = EvaluatedPolicyInfo( + policy_key="test_policy", + priority=50, + matched=True, + result="SUPPRESSED", + unless_triggered=True, + ) + assert info.policy_key == "test_policy" + assert info.priority == 50 + assert info.matched is True + assert info.result == "SUPPRESSED" + assert info.unless_triggered is True + + +class TestProtocolCompliance: + def test_noop_satisfies_protocol(self) -> None: + evaluator = NoOpPolicyEvaluator() + assert isinstance(evaluator, AccessPolicyEvaluator) + + def test_custom_evaluator_satisfies_protocol(self) -> None: + class CustomEvaluator: + def evaluate( + self, request: AccessEvaluationRequest + ) -> PolicyEvaluationResult: + return PolicyEvaluationResult(decision=PolicyDecision.ALLOW) + + evaluator = CustomEvaluator() + assert isinstance(evaluator, AccessPolicyEvaluator) diff --git a/tests/service/pbac/policies/test_noop.py b/tests/service/pbac/policies/test_noop.py new file mode 100644 index 0000000000..a1494f0314 --- /dev/null +++ b/tests/service/pbac/policies/test_noop.py @@ -0,0 +1,56 @@ +"""Tests for the NoOp policy evaluator.""" + +import pytest + +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + PolicyDecision, +) +from fides.service.pbac.policies.noop import NoOpPolicyEvaluator + + +@pytest.fixture() +def evaluator() -> NoOpPolicyEvaluator: + return NoOpPolicyEvaluator() + + +def _make_request(**overrides: object) -> AccessEvaluationRequest: + defaults: dict = { + "consumer_id": "consumer-1", + "consumer_name": "Test Consumer", + "consumer_purposes": frozenset({"marketing"}), + "dataset_key": "postgres_main", + "dataset_purposes": frozenset({"billing"}), + } + defaults.update(overrides) + return AccessEvaluationRequest(**defaults) + + +class TestNoOpPolicyEvaluator: + def test_returns_no_decision(self, evaluator: NoOpPolicyEvaluator) -> None: + result = evaluator.evaluate(_make_request()) + assert result.decision == PolicyDecision.NO_DECISION + + def test_no_decisive_policy(self, evaluator: NoOpPolicyEvaluator) -> None: + result = evaluator.evaluate(_make_request()) + assert result.decisive_policy_key is None + assert result.decisive_policy_priority is None + + def test_no_evaluated_policies(self, evaluator: NoOpPolicyEvaluator) -> None: + result = evaluator.evaluate(_make_request()) + assert result.evaluated_policies == [] + + def test_no_action(self, evaluator: NoOpPolicyEvaluator) -> None: + result = evaluator.evaluate(_make_request()) + assert result.action is None + + def test_ignores_request_content(self, evaluator: NoOpPolicyEvaluator) -> None: + result = evaluator.evaluate( + _make_request( + consumer_purposes=frozenset(), + system_fides_key="some_system", + data_uses=("marketing.advertising",), + context={"environment": {"geo_location": "EU"}}, + ) + ) + assert result.decision == PolicyDecision.NO_DECISION diff --git a/tests/service/pbac/policies/test_policy_filtering.py b/tests/service/pbac/policies/test_policy_filtering.py new file mode 100644 index 0000000000..6227302b2a --- /dev/null +++ b/tests/service/pbac/policies/test_policy_filtering.py @@ -0,0 +1,193 @@ +"""Tests for violation filtering through policy evaluation.""" + +from datetime import datetime, timezone + +from fides.service.pbac.policies.interface import ( + AccessEvaluationRequest, + PolicyDecision, + PolicyEvaluationResult, +) +from fides.service.pbac.policies.noop import NoOpPolicyEvaluator +from fides.service.pbac.types import ValidationResult, Violation + +# -- Helpers ----------------------------------------------------------------- + + +def _make_violation( + dataset_key: str = "ds_1", + collection: str | None = None, + consumer_id: str = "consumer-1", + consumer_name: str = "Test Consumer", +) -> Violation: + return Violation( + query_id="query-1", + consumer_id=consumer_id, + consumer_name=consumer_name, + dataset_key=dataset_key, + collection=collection, + consumer_purposes=frozenset({"marketing"}), + dataset_purposes=frozenset({"billing"}), + reason="Purposes do not overlap", + ) + + +def _make_validation_result( + violations: list[Violation] | None = None, +) -> ValidationResult: + viol = violations or [] + return ValidationResult( + violations=viol, + is_compliant=len(viol) == 0, + total_accesses=max(len(viol), 1), + checked_at=datetime.now(timezone.utc), + ) + + +class AllowSpecificDatasetEvaluator: + """Test evaluator that returns ALLOW for specific datasets.""" + + def __init__(self, allowed_datasets: set[str]) -> None: + self._allowed = allowed_datasets + + def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: + if request.dataset_key in self._allowed: + return PolicyEvaluationResult( + decision=PolicyDecision.ALLOW, + decisive_policy_key="test_allow_policy", + decisive_policy_priority=100, + ) + return PolicyEvaluationResult(decision=PolicyDecision.NO_DECISION) + + +class AlwaysDenyEvaluator: + """Test evaluator that always returns DENY.""" + + def evaluate(self, request: AccessEvaluationRequest) -> PolicyEvaluationResult: + return PolicyEvaluationResult( + decision=PolicyDecision.DENY, + decisive_policy_key="test_deny_policy", + decisive_policy_priority=200, + reason="Policy explicitly denies this access.", + ) + + +def _filter_violations( + validation_result: ValidationResult, + evaluator: object, + system_fides_key: str | None = None, +) -> ValidationResult: + """Replicate the filtering logic from InProcessPBACEvaluationService.""" + if validation_result.is_compliant: + return validation_result + + remaining: list[Violation] = [] + for violation in validation_result.violations: + request = AccessEvaluationRequest( + consumer_id=violation.consumer_id, + consumer_name=violation.consumer_name, + consumer_purposes=violation.consumer_purposes, + dataset_key=violation.dataset_key, + collection=violation.collection, + dataset_purposes=violation.dataset_purposes, + system_fides_key=system_fides_key, + ) + result = evaluator.evaluate(request) # type: ignore[union-attr] + if result.decision != PolicyDecision.ALLOW: + remaining.append(violation) + + return ValidationResult( + violations=remaining, + is_compliant=len(remaining) == 0, + total_accesses=validation_result.total_accesses, + checked_at=validation_result.checked_at, + ) + + +# -- Tests ------------------------------------------------------------------- + + +class TestFilterWithNoOp: + def test_compliant_result_passthrough(self) -> None: + result = _make_validation_result(violations=[]) + filtered = _filter_violations(result, NoOpPolicyEvaluator()) + assert filtered.is_compliant is True + assert filtered.violations == [] + + def test_violations_preserved(self) -> None: + violations = [_make_violation("ds_1"), _make_violation("ds_2")] + result = _make_validation_result(violations=violations) + filtered = _filter_violations(result, NoOpPolicyEvaluator()) + assert len(filtered.violations) == 2 + assert filtered.is_compliant is False + + def test_total_accesses_preserved(self) -> None: + result = _make_validation_result(violations=[_make_violation()]) + result.total_accesses = 5 + filtered = _filter_violations(result, NoOpPolicyEvaluator()) + assert filtered.total_accesses == 5 + + +class TestFilterWithAllowEvaluator: + def test_allowed_dataset_suppressed(self) -> None: + violations = [_make_violation("ds_allowed"), _make_violation("ds_denied")] + result = _make_validation_result(violations=violations) + evaluator = AllowSpecificDatasetEvaluator(allowed_datasets={"ds_allowed"}) + filtered = _filter_violations(result, evaluator) + assert len(filtered.violations) == 1 + assert filtered.violations[0].dataset_key == "ds_denied" + assert filtered.is_compliant is False + + def test_all_violations_suppressed(self) -> None: + violations = [_make_violation("ds_a"), _make_violation("ds_b")] + result = _make_validation_result(violations=violations) + evaluator = AllowSpecificDatasetEvaluator(allowed_datasets={"ds_a", "ds_b"}) + filtered = _filter_violations(result, evaluator) + assert len(filtered.violations) == 0 + assert filtered.is_compliant is True + + def test_no_violations_suppressed(self) -> None: + violations = [_make_violation("ds_x")] + result = _make_validation_result(violations=violations) + evaluator = AllowSpecificDatasetEvaluator(allowed_datasets={"ds_other"}) + filtered = _filter_violations(result, evaluator) + assert len(filtered.violations) == 1 + assert filtered.is_compliant is False + + +class TestFilterWithDenyEvaluator: + def test_deny_preserves_violations(self) -> None: + violations = [_make_violation()] + result = _make_validation_result(violations=violations) + filtered = _filter_violations(result, AlwaysDenyEvaluator()) + assert len(filtered.violations) == 1 + assert filtered.is_compliant is False + + +class TestRequestConstruction: + def test_request_fields_from_violation(self) -> None: + violation = Violation( + query_id="q1", + consumer_id="c1", + consumer_name="Analytics Team", + dataset_key="postgres_main", + collection="users", + consumer_purposes=frozenset({"marketing", "analytics"}), + dataset_purposes=frozenset({"billing", "operations"}), + reason="No overlap", + ) + request = AccessEvaluationRequest( + consumer_id=violation.consumer_id, + consumer_name=violation.consumer_name, + consumer_purposes=violation.consumer_purposes, + dataset_key=violation.dataset_key, + collection=violation.collection, + dataset_purposes=violation.dataset_purposes, + system_fides_key="analytics_system", + ) + assert request.consumer_id == "c1" + assert request.consumer_name == "Analytics Team" + assert request.consumer_purposes == frozenset({"marketing", "analytics"}) + assert request.dataset_key == "postgres_main" + assert request.collection == "users" + assert request.dataset_purposes == frozenset({"billing", "operations"}) + assert request.system_fides_key == "analytics_system" diff --git a/uv.lock b/uv.lock index f192dae6af..ea54cd1fb1 100644 --- a/uv.lock +++ b/uv.lock @@ -1006,6 +1006,7 @@ dependencies = [ { name = "sqlalchemy-redshift" }, { name = "sqlalchemy-stubs" }, { name = "sqlalchemy-utils" }, + { name = "sqlglot" }, { name = "sshtunnel" }, { name = "starlette" }, { name = "stream-zip" }, @@ -1170,6 +1171,7 @@ requires-dist = [ { name = "sqlalchemy-redshift", specifier = "==0.8.11" }, { name = "sqlalchemy-stubs", specifier = "==0.4" }, { name = "sqlalchemy-utils", specifier = "==0.38.3" }, + { name = "sqlglot", specifier = "~=30.0.3" }, { name = "sshtunnel", specifier = "==0.4.0" }, { name = "starlette", specifier = "~=0.50.0" }, { name = "stream-zip", specifier = "==0.0.83" }, @@ -4277,6 +4279,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e1/90/6ff1829b55cf752994be76bdf5f718db053dcb8d738f9df5b622207a4ab8/SQLAlchemy_Utils-0.38.3-py3-none-any.whl", hash = "sha256:5c13b5d08adfaa85f3d4e8ec09a75136216fad41346980d02974a70a77988bf9", size = 100264, upload-time = "2022-07-12T05:08:31.496Z" }, ] +[[package]] +name = "sqlglot" +version = "30.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5b/7a/3b6a8853fc2fa166f785a8ea4fecde46f70588e35471bc7811373da31a49/sqlglot-30.0.3.tar.gz", hash = "sha256:35ba7514c132b54f87fd1732a65a73615efa9fd83f6e1eed0a315bc9ee3e1027", size = 5802632, upload-time = "2026-03-19T16:51:39.166Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c6/13/b57ab75b0f60b5ee8cb8924bc01a5c419ed3221e00f8f11f8c059a707eb7/sqlglot-30.0.3-py3-none-any.whl", hash = "sha256:5489cc98b5666f1fafc21e0304ca286e513e142aa054ee5760806a2139d07a05", size = 651853, upload-time = "2026-03-19T16:51:36.241Z" }, +] + [[package]] name = "sshtunnel" version = "0.4.0"