Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/7741-pbac-evaluation-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Added
description: Add PBAC evaluation service for purpose-based access control checks
pr: 7741
labels: []
2 changes: 1 addition & 1 deletion clients/admin-ui/src/features/common/nav/nav-config.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ if (process.env.NEXT_PUBLIC_APP_ENV === "development") {
requiresPlus: true,
},
{
title: "Seed Data",
title: "Seed data",
path: routes.SEED_DATA_ROUTE,
scopes: [ScopeRegistryEnum.DEVELOPER_READ],
requiresPlus: true,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ dependencies = [
"sqlalchemy-redshift==0.8.11",
"sqlalchemy-stubs==0.4",
"sqlalchemy[asyncio]==1.4.27",
"sqlglot~=30.0.3",
"sshtunnel==0.4.0",
"starlette~=0.50.0",
"stream-zip==0.0.83",
Expand Down
195 changes: 195 additions & 0 deletions src/fides/service/pbac/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# Purpose-Based Access Control (PBAC)

Open-source evaluation service for purpose-based data access control. Determines whether a data consumer has the declared purposes required to access datasets, collections, and fields.

## Quick Start

### 1. Register consumers (who accesses data)

```
POST /api/v1/data-consumer
{
"name": "Analytics Team",
"type": "group",
"external_id": "analytics-team",
"contact_email": "analytics-lead@company.com",
"members": ["analyst@company.com", "lead@company.com"]
}
```

### 2. Define purposes (why data is accessed)

```
POST /api/v1/data-purpose
{
"fides_key": "marketing_analytics",
"name": "Marketing Analytics",
"data_use": "marketing.advertising"
}
```

### 3. Assign purposes to consumers

```
PUT /api/v1/data-consumer/{id}/purposes
{ "purpose_fides_keys": ["marketing_analytics"] }
```

### 4. Annotate datasets with purposes

Datasets declare their allowed purposes via fideslang `data_purposes` on datasets, collections, and fields.

### 5. Evaluate a SQL query

```
POST /api/v1/pbac/evaluate
{
"query_text": "SELECT email, purchase_history FROM customers.orders",
"user_identity": "analyst@company.com"
}
```

Response:

```json
{
"query_id": "abc-123",
"is_compliant": false,
"consumer": {
"name": "Analytics Team",
"external_id": "analytics-team",
"purpose_fides_keys": ["marketing_analytics"]
},
"violations": [
{
"dataset_key": "customers",
"consumer_purposes": ["marketing_analytics"],
"dataset_purposes": ["billing_operations"],
"reason": "Consumer purposes do not overlap with dataset purposes",
"control": "purpose_restriction"
}
]
}
```

## How It Works

```
raw SQL
|
v
SQL Parser (sqlglot) -- extracts table references
|
v
RawQueryLogEntry
|
v
PBACEvaluationService
|
+-- 1. Identity Resolution
| user email --> consumer (via contact_email, external_id, or members list)
|
+-- 2. Dataset Resolution
| table references --> fides dataset keys
|
+-- 3. Purpose Map
| consumer --> declared purposes
| dataset --> declared purposes
|
+-- 4. PBAC Engine
| Do consumer purposes overlap with dataset purposes?
| yes --> COMPLIANT
| no --> VIOLATION
|
+-- 5. Policy v2 Engine (optional)
| Does any access policy override the violation?
| ALLOW --> suppress violation
| DENY --> confirm violation
| NO_DECISION --> confirm violation
|
v
EvaluationResult
is_compliant: bool
violations: [...]
```

## Identity Resolution

The system resolves who is running a query by matching the user identity against registered consumers.

**Resolution chain (in order):**

1. **contact_email** -- exact match on the consumer's contact email
2. **external_id** -- match on the consumer's external identifier (group name, role ID, etc.)

> **Note:** Members-based resolution (matching by members list) is not yet implemented in the OSS path. The `RedisIdentityResolver` currently supports steps 1 and 2 only.

If no consumer matches, the user is marked as "unresolved" with no declared purposes, which means all dataset accesses are violations.

### Group membership

Consumers can represent groups or teams. Add user emails to the `members` list:

```json
{
"name": "Analytics Team",
"type": "group",
"external_id": "analytics-team",
"members": [
"analyst@company.com",
"data-scientist@company.com",
"intern@company.com"
]
}
```

When `analyst@company.com` runs a query, they inherit the Analytics Team's purposes.

## Policy v2 (Override Engine)

When PBAC finds a violation (purposes don't overlap), the Policy v2 engine gets a chance to override it. Policies use priority-based, first-decisive-match-wins evaluation.

A policy can ALLOW access even when purposes don't match:

```
POST /api/v1/policy
{
"fides_key": "allow_analytics_on_orders",
"decision": "ALLOW",
"priority": 100,
"match": {
"data_use": { "any": ["marketing.advertising"] }
}
}
```

PBAC is always checked first. Policies only evaluated when purposes don't match.

## Package Structure

```
fides/service/pbac/
types.py -- RawQueryLogEntry, TableRef
sql_parser.py -- Generic SQL --> RawQueryLogEntry (sqlglot)
engine/ -- PBAC evaluation engine (zero dependencies)
types.py -- ConsumerPurposes, DatasetPurposes, QueryAccess, etc.
evaluate.py -- evaluate_access()
reason.py -- Human-readable violation reasons
evaluation/ -- Service boundary
types.py -- EvaluationResult, ResolvedConsumer, EvaluationViolation
interface.py -- PBACEvaluationService Protocol
identity/ -- Consumer identity resolution
interface.py -- IdentityResolver Protocol
basic.py -- BasicIdentityResolver (email + external_id + members)
policies/ -- Policy v2 evaluation interface
interface.py -- AccessPolicyEvaluator Protocol + types
noop.py -- NoOpPolicyEvaluator (default)
```

## Extending (Fidesplus)

Fidesplus adds platform-specific capabilities on top of the OSS evaluation:

- **Platform connectors** -- BigQuery, Snowflake, Databricks audit log ingestion
- **Platform identity resolution** -- queries BigQuery IAM / Snowflake RBAC to resolve user roles, then matches roles against consumer `external_id`
- **Access control dashboard** -- violation logs, timeseries, per-consumer breakdowns
12 changes: 12 additions & 0 deletions src/fides/service/pbac/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Purpose-Based Access Control (PBAC) evaluation service.

Import types and pure functions directly::

from fides.service.pbac.types import RawQueryLogEntry, EvaluationResult
from fides.service.pbac.evaluate import evaluate_access
from fides.service.pbac.service import PBACEvaluationService

The service module (PBACEvaluationService, InProcessPBACEvaluationService)
requires Redis/Celery deps and should be imported explicitly from
``fides.service.pbac.service``.
"""
Empty file.
105 changes: 105 additions & 0 deletions src/fides/service/pbac/consumers/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

import dataclasses
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional

from fides.service.pbac.purposes.entities import DataPurposeEntity

if TYPE_CHECKING:
from fides.api.models.data_consumer import ( # type: ignore[import-not-found]
DataConsumer,
)
from fides.api.models.sql_models import System # type: ignore[attr-defined]


@dataclass
class DataConsumerEntity:
"""Unified domain entity for both DataConsumer rows and System-as-consumer."""

id: str
name: str
type: str
created_at: datetime
updated_at: datetime
description: Optional[str] = None
external_id: Optional[str] = None
purposes: list[DataPurposeEntity] = field(default_factory=list)
purpose_fides_keys: list[str] = field(default_factory=list)
system_fides_key: Optional[str] = None
vendor_id: Optional[str] = None
egress: Optional[dict[str, Any]] = None
ingress: Optional[dict[str, Any]] = None
data_shared_with_third_parties: Optional[bool] = None
third_parties: Optional[str] = None
shared_categories: Optional[list[str]] = None
tags: list[str] = field(default_factory=list)
contact_email: Optional[str] = None
contact_slack_channel: Optional[str] = None
contact_details: Optional[dict[str, Any]] = None

def to_dict(self) -> dict:
d = dataclasses.asdict(self)
d["created_at"] = self.created_at.isoformat()
d["updated_at"] = self.updated_at.isoformat()
# Store purpose references as fides_keys only (not nested entities)
d["purpose_fides_keys"] = self.purpose_fides_keys
del d["purposes"]
return d

@classmethod
def from_dict(cls, d: dict) -> DataConsumerEntity:
d = dict(d)
d["created_at"] = datetime.fromisoformat(d["created_at"])
d["updated_at"] = datetime.fromisoformat(d["updated_at"])
d.setdefault("purpose_fides_keys", [])
d.setdefault("purposes", [])
return cls(**d)

@classmethod
def from_consumer(cls, obj: DataConsumer) -> DataConsumerEntity:
purposes = [
DataPurposeEntity.from_orm(cp.data_purpose) for cp in obj.consumer_purposes
]
return cls(
id=obj.id,
name=obj.name,
description=obj.description,
type=obj.type,
external_id=obj.external_id,
purposes=purposes,
purpose_fides_keys=[p.fides_key for p in purposes],
egress=obj.egress,
ingress=obj.ingress,
data_shared_with_third_parties=obj.data_shared_with_third_parties,
third_parties=obj.third_parties,
shared_categories=obj.shared_categories or [],
tags=obj.tags or [],
contact_email=obj.contact_email,
contact_slack_channel=obj.contact_slack_channel,
contact_details=obj.contact_details,
created_at=obj.created_at,
updated_at=obj.updated_at,
)

@classmethod
def from_system(cls, obj: System) -> DataConsumerEntity:
purposes = [
DataPurposeEntity.from_orm(sp.data_purpose) for sp in obj.system_purposes
]
return cls(
id=obj.id,
name=obj.name or obj.fides_key,
description=obj.description,
type="system",
purposes=purposes,
purpose_fides_keys=[p.fides_key for p in purposes],
system_fides_key=obj.fides_key,
vendor_id=getattr(obj, "vendor_id", None),
egress=obj.egress if isinstance(obj.egress, dict) else None,
ingress=obj.ingress if isinstance(obj.ingress, dict) else None,
tags=obj.tags or [],
created_at=obj.created_at,
updated_at=obj.updated_at,
)
Loading
Loading