From 0626205fcd201725e406812727d2a70841322f31 Mon Sep 17 00:00:00 2001 From: James Sandford Date: Thu, 12 Feb 2026 13:53:02 +0000 Subject: [PATCH 1/5] Move utils to own folder --- examples/ingest_hls.py | 4 ++-- examples/outgest_file.py | 4 ++-- examples/simple_edit.py | 4 ++-- examples/utils/__init__.py | 0 examples/{ => utils}/client.py | 0 examples/{ => utils}/credentials.py | 6 ++++++ 6 files changed, 12 insertions(+), 6 deletions(-) create mode 100644 examples/utils/__init__.py rename examples/{ => utils}/client.py (100%) rename examples/{ => utils}/credentials.py (93%) diff --git a/examples/ingest_hls.py b/examples/ingest_hls.py index 151b5932..3e7202a1 100755 --- a/examples/ingest_hls.py +++ b/examples/ingest_hls.py @@ -15,8 +15,8 @@ import mediajson import av -from credentials import Credentials, BasicCredentials, OAuth2ClientCredentials -from client import post_request, put_request +from utils.credentials import Credentials, BasicCredentials, OAuth2ClientCredentials +from utils.client import post_request, put_request logging.basicConfig() logger = logging.getLogger() diff --git a/examples/outgest_file.py b/examples/outgest_file.py index e6a84e4a..52acf3cd 100755 --- a/examples/outgest_file.py +++ b/examples/outgest_file.py @@ -14,8 +14,8 @@ import aiohttp import av -from credentials import Credentials, BasicCredentials, OAuth2ClientCredentials -from client import get_request +from utils.credentials import Credentials, BasicCredentials, OAuth2ClientCredentials +from utils.client import get_request logging.basicConfig() logger = logging.getLogger() diff --git a/examples/simple_edit.py b/examples/simple_edit.py index ccfa0729..37615989 100755 --- a/examples/simple_edit.py +++ b/examples/simple_edit.py @@ -13,8 +13,8 @@ from mediatimestamp import TimeRange, Timestamp import mediajson -from credentials import Credentials, BasicCredentials, OAuth2ClientCredentials -from client import post_request, put_request, get_request +from utils.credentials import Credentials, BasicCredentials, OAuth2ClientCredentials +from utils.client import post_request, put_request, get_request logging.basicConfig() logger = logging.getLogger() diff --git a/examples/utils/__init__.py b/examples/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/client.py b/examples/utils/client.py similarity index 100% rename from examples/client.py rename to examples/utils/client.py diff --git a/examples/credentials.py b/examples/utils/credentials.py similarity index 93% rename from examples/credentials.py rename to examples/utils/credentials.py index e6322e0d..bcc2bf74 100644 --- a/examples/credentials.py +++ b/examples/utils/credentials.py @@ -19,6 +19,12 @@ def header(self) -> dict[str, str]: return {} +class EmptyCredentials(Credentials): + """Empty Credentials class for when none are used""" + def header(self) -> dict[str, str]: + return {} + + class BasicCredentials(Credentials): """Basic username/password credentials""" def __init__(self, username: str, password: str) -> None: From 9ceb9046bc79a1fd3709c1dda22bbd831a4e0727 Mon Sep 17 00:00:00 2001 From: James Sandford Date: Thu, 12 Feb 2026 13:56:20 +0000 Subject: [PATCH 2/5] Add fine-grained auth proxy example --- examples/authz_proxy/Makefile | 44 ++ examples/authz_proxy/README.md | 93 ++++ examples/authz_proxy/__init__.py | 0 examples/authz_proxy/api.py | 588 +++++++++++++++++++++ examples/authz_proxy/permissions.py | 54 ++ examples/authz_proxy/py.typed | 0 examples/authz_proxy/requirements.txt | 5 + examples/authz_proxy/resources.py | 485 +++++++++++++++++ examples/authz_proxy/test-requirements.txt | 2 + examples/authz_proxy/token_checker.py | 44 ++ 10 files changed, 1315 insertions(+) create mode 100644 examples/authz_proxy/Makefile create mode 100644 examples/authz_proxy/README.md create mode 100644 examples/authz_proxy/__init__.py create mode 100644 examples/authz_proxy/api.py create mode 100644 examples/authz_proxy/permissions.py create mode 100644 examples/authz_proxy/py.typed create mode 100644 examples/authz_proxy/requirements.txt create mode 100644 examples/authz_proxy/resources.py create mode 100644 examples/authz_proxy/test-requirements.txt create mode 100644 examples/authz_proxy/token_checker.py diff --git a/examples/authz_proxy/Makefile b/examples/authz_proxy/Makefile new file mode 100644 index 00000000..6fb493c4 --- /dev/null +++ b/examples/authz_proxy/Makefile @@ -0,0 +1,44 @@ +all: help + +venv: + mkdir -p venv + python -m venv venv + . venv/bin/activate && pip install -r requirements.txt && deactivate + +testvenv: + mkdir -p testvenv + python -m venv testvenv + . testvenv/bin/activate && pip install -r test-requirements.txt -r requirements.txt && deactivate + +env_exports: + @echo "export API_URL=" + @echo "export JWKS_URL=" + @echo "export GROUP_CLAIM=" + +run: venv + . venv/bin/activate && sanic api:app ; deactivate + +lint: testvenv + . testvenv/bin/activate && flake8 ; deactivate + +typecheck: testvenv + . testvenv/bin/activate && mypy api.py ; deactivate + +clean: + @rm -rf venv + @rm -rf testvenv + @rm -rf .mypy_cache + @rm -rf __pycache__ + @rm -rf .python-version + +help: + @echo "TAMS Authorisation Proxy Example" + @echo "make venv - Prepare a Python virtual environment in venv/ for running the examples" + @echo "make testvenv - Prepare a Python virtual environment in testvenv/ for running the tests" + @echo "make env_exports - Print environment variable exports that may used in the examples" + @echo "make run - Start the auth proxy" + @echo "make lint - Lint the source code" + @echo "make typecheck - Typecheck the source code" + @echo "make clean - Delete files that were created" + +.PHONY: all help clean env_exports run lint typecheck diff --git a/examples/authz_proxy/README.md b/examples/authz_proxy/README.md new file mode 100644 index 00000000..f63e962f --- /dev/null +++ b/examples/authz_proxy/README.md @@ -0,0 +1,93 @@ +# Authorisation Proxy Example +This example demonstrates Fine-Grained Authorisation (FGA) using a reverse proxy in front of a TAMS API instance, by matching user group membership to an `auth_classes` tag on Sources, Flows and Webhooks. +For a more complete discussion of FGA, see [AppNote 0016 Authorisation in TAMS Workflows](../../docs/appnotes/0016-authorisation-in-tams-workflows.md). + +> [!IMPORTANT] +> This is intended as a proof-of-concept demo and a starting point. +See the [Limitations](#limitations) section for further details on security, feature, and performance compromises. + +## How it works +The proxy receives requests from your client with the JSON Web Token (JWT) access token in the headers. +A JWT is a base64 encoded signed block of JSON containing claims about the user of the token (the "Bearer"), such as their identity or group membership. +Using the JSON Web Key Sets (JWKS) endpoint for the token, the proxy can check the authenticity of that token, using the provided public key to confirm the signature was constructed using the authorisation server's private key, however the token can be decoded and the claims read without any secret material. + +Having read the token, the proxy extracts the user's groups, then makes a request to the upstream TAMS API to decide whether to allow the incoming request, by comparing the user's groups with the Resource(s) `auth_classes` tag. +Finally if the request is allowed, it makes the request and returns the response to the original client. +Requests to the upstream TAMS API use the the user's token. + +## How to use it + +### Pre-requisites +- A deployed copy of the TAMS API - this demo was tested against the [AWS implementation](https://github.com/awslabs/time-addressable-media-store). +- A user login method that issues a JWT access token containing the user's groups (such as AWS Cognito or Keycloak) +- A way to validate that JWT - e.g. a JWKS endpoint + +> [!CAUTION] +> Making requests with the user's token instead of separate credentials means user can bypass the proxy and go directly to the upstream API with no authorisation rules, if that upstream Service is accessible on the user's network. +> This is a requirement of using their token, in order for the proxy to use it to get tags and make authorisation decisions. + +### Usage as a Proxy + +The proxy logic is implemented into a [Sanic](https://github.com/sanic-org/sanic) app. +To run the proxy, set the following environment variables and start the proxy: + +```bash +export API_URL= +export JWKS_URL= +export GROUP_CLAIM=groups +make run +``` + +Note: `GROUP_CLAIM` does not need to be set when using Cognito. An appropriate default is set. + +If you visit http://127.0.0.1:8000 with a valid access token (against the provided JWKS URL) you will be granted access based on your user groups. + +When using the [AWS TAMS Tools UI](https://github.com/aws-samples/time-addressable-media-store-tools/tree/main/frontend), set up your `env.local` file with `VITE_APP_AWS_API_ENDPOINT=http://127.0.0.1:8000` for example. + +## Resource Permissions + +This example implements the fine-grained authorization logic described in [AppNote0016](../../docs/appnotes/0016-authorisation-in-tams-workflows.md#finer-grained-authorisation) using [tags](../../docs/appnotes/0016-authorisation-in-tams-workflows.md#implementation-using-tags) to manage base permissions. + +In general this implementation reads the list of groups in the request, and only grants access to resources that have a corresponding `auth_classes` tag entry. +For demonstration purposes, basic logic has been provided that grants read, write, and delete permissions where matching classes end with `read`, `write`, and `delete` respectively. +i.e. a request that claims the `news_write` group will have write permissions against resources that have `news_write` in the `auth_classes` list. +A request that claims the `admin` group will be granted admin permissions. +This logic exists in the [`resources.py`](./resources.py) file and may be replaced with different/more complex logic as required. + +## Limitations + +> [!WARNING] +> This implementation is deliberately simple to serve as a starting point +> It comes with some notable, and potentially dangerous, limitations + +- This example implementation re-uses the client's token for upstream requests to the Store service + - If the user can make requests to the Store directly, they can bypass this auth proxy entirely + - The correct approach would be for the proxy to possess its own client ID and secret, and make requests to the upstream API using the Client Credentials Grant + - The auth proxy would separately authenticate users by whatever mechanism generates tokens, however their user tokens would not have access to the upstream API + - Alternatively a network level solution could be applied, where the upstream API is not available to users other than the proxy itself +- This example implementation has not been fully security tested/hardened + - It only exists to provide documentation as code for how fine-grained auth may be implemented + - It WILL NOT be supported for production use cases +- Webhook permission are evaluated on registration/update of configuration only + - Changes in permissions after registration/update will not be picked up + - New resources created after registration/update of the webhook which match the configuration will not be picked up + - Wildcard webhooks (i.e. those which consume all resources of a given type) are only available to admins +- This example implementation assumes API-wide read/write/delete permissions are implemented in the upstream Store implementation + - The exception is admin permissions which result in a pass-through behaviour in this auth proxy +- This example implementation is reliant on the Groups OIDC claim + - Many auth providers do not support Groups on machine users +- This example implementation may return empty lists in listings before the final page + - This will happen where all items in a page are filtered out + - A more complete implementation may recurse to the next page in this case +- This example implementation is designed to be readable, not performant + - In many cases, it will make multiple requests to the Service where one would suffice + +See [CONTRIBUTING.md](../../CONTRIBUTING.md) should you wish to improve on these! + +## Tests + +Tests can be run with the following command. + +```bash +make lint typecheck +``` diff --git a/examples/authz_proxy/__init__.py b/examples/authz_proxy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/authz_proxy/api.py b/examples/authz_proxy/api.py new file mode 100644 index 00000000..cfda9b2f --- /dev/null +++ b/examples/authz_proxy/api.py @@ -0,0 +1,588 @@ +import os +from functools import wraps +from uuid import UUID + +from token_checker import verify_token, get_signing_key, get_groups_in_token +from resources import Tams +from permissions import any_is_admin, filter_read +from httpx import AsyncClient +from sanic import Sanic +from sanic.log import logger +from sanic.request import Request +from sanic.exceptions import Forbidden, InvalidUsage, NotFound + +app = Sanic("ProxyApp") +app.config.CORS_ORIGINS = "*" + + +@app.before_server_start +async def setup_client(app): + client = AsyncClient() + app.ctx.api_url = os.environ["API_URL"] + app.ctx.jwks_client = get_signing_key(os.environ.get("JWKS_URL")) + app.ctx.tams = Tams(client, logger, app.ctx.api_url) + + +def handle_token(): + """Decorator that verifies token in request, and extracts the embedded groups""" + def decorator(f): + @wraps(f) + async def decorated_function(request: Request, *args, **kwargs): + del (request.headers["host"]) + + token = verify_token(request, app.ctx.jwks_client) + groups = get_groups_in_token(token) + + return await f(request, groups, *args, **kwargs) + return decorated_function + return decorator + + +def passthrough_if_admin(): + """Decorator that passes through the request if the user is an admin""" + def decorator(f): + @wraps(f) + async def decorated_function(request: Request, groups: list[str], *args, **kwargs): + # Passthrough request if request has admin permissions + if any_is_admin(groups): + return await app.ctx.tams.passthrough_request(request) + + # Otherwise process based on decorated function + return await f(request, groups, *args, **kwargs) + return decorated_function + return decorator + + +@app.route('/', methods=["OPTIONS"]) +async def options_all(request: Request, path: str): + del (request.headers["host"]) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/', methods=['GET', 'HEAD']) +@handle_token() +async def root(request: Request, groups: list[str]): + # Available to all + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/service', methods=['GET', 'HEAD']) +@handle_token() +async def service(request: Request, groups: list[str]): + # Available to all + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/service', methods=['POST']) +@handle_token() +@passthrough_if_admin() +async def post_service(request: Request, groups: list[str]): + # Admin only (bypassed by `passthrough_if_admin` decorator above) + raise Forbidden("Insufficient permissions") + + +@app.route('/service/storage-backends', methods=['GET', 'HEAD']) +@handle_token() +async def storage_backends(request: Request, groups: list[str]): + # Available to all + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/service/webhooks', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def webhooks(request: Request, groups: list[str]): + # Restrict returned data to only the webhooks that the request has read permission on. + filtered_groups = filter_read(groups) + return await app.ctx.tams.filtered_webhooks(request, filtered_groups) + + +@app.route('/service/webhooks', methods=['POST']) +@handle_token() +@passthrough_if_admin() +async def post_webhooks(request: Request, groups: list[str]): + # If the request includes Source or Flow filters, + # the request must have read permissions on all Source or Flow IDs requested. + # Otherwise, reject. + # Note that this endpoint only allows creation, not modification, of webhooks. + # + # NOTE: This implementation will only accept requests where IDs are provided. + # Where collected by IDs are provided, they are evaluated on webhook generation only + # Wildcard webhooks may be created by an admin, in which case the request is passed through. + + # Verify and flatten webhook Source/Flow IDs based on permissions + await app.ctx.tams.validate_webhook(request, groups) + static_request = await app.ctx.tams.static_webhook(request, groups) + return await app.ctx.tams.passthrough_request(static_request) + + +@app.route('/service/webhooks/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def webhook(request: Request, groups: list[str], webhook_id: UUID): + # Request must have read permissions on {webhookId}. + (await app.ctx.tams.Webhook(request, webhook_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/service/webhooks/', methods=['PUT']) +@handle_token() +@passthrough_if_admin() +async def put_webhook(request: Request, groups: list[str], webhook_id: UUID): + # Request must have write permissions on {webhookId}. + # If the request includes Source or Flow filters, + # the request must have read permissions on all Source or Flow IDs requested. + # + # NOTE: This implementation will only accept requests where IDs are provided. + # Where collected by IDs are provided, they are evaluated on webhook generation only + # Wildcard webhooks may be created by an admin, in which case the request is passed through. + webhook = await app.ctx.tams.Webhook(request, webhook_id) + webhook.has_write(groups, throw=True) + + new_auth_classes = request.json.get("tags", {}).get("auth_classes", []) + webhook.validate_modify_auth_classes(new_auth_classes, groups) + + # Verify and flatten webhook Source/Flow IDs based on permissions + await app.ctx.tams.validate_webhook(request, groups) + static_request = await app.ctx.tams.static_webhook(request, groups) + + return await app.ctx.tams.passthrough_request(static_request) + + +@app.route('/service/webhooks/', methods=['DELETE']) +@handle_token() +@passthrough_if_admin() +async def delete_webhook(request: Request, groups: list[str], webhook_id: UUID): + (await app.ctx.tams.Webhook(request, webhook_id)).has_delete(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def sources(request: Request, groups: list[str]): + # Restrict the returned data to only the Sources that the request has read permission on. + filtered_groups = filter_read(groups) + return await app.ctx.tams.filtered_sources(request, filtered_groups) + + +@app.route('/sources/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def source(request: Request, groups: list[str], source_id: UUID): + # Request must have read permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//tags', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def source_tags(request: Request, groups: list[str], source_id: UUID): + # Request must have read permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//tags/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def source_tag(request: Request, groups: list[str], source_id: UUID, tag_name: str): + # Request must have read permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//tags/', methods=['PUT']) +@handle_token() +@passthrough_if_admin() +async def put_source_tag(request: Request, groups: list[str], source_id: UUID, tag_name: str): + # Request must have write permissions on {sourceId}. + # Must not permit addition of permissions the request doesn't claim. + source = await app.ctx.tams.Source(request, source_id) + source.has_write(groups, throw=True) + + if tag_name == "auth_classes": + new_auth_classes = request.json + source.validate_modify_auth_classes(new_auth_classes, groups) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//tags/', methods=['DELETE']) +@handle_token() +@passthrough_if_admin() +async def delete_source_tag(request: Request, groups: list[str], source_id: UUID, tag_name: str): + # Request must have write permissions on {sourceId}. + source = await app.ctx.tams.Source(request, source_id) + source.has_write(groups, throw=True) + + if tag_name == "auth_classes": + # Check has permissions that will be removed + source.validate_modify_auth_classes([], groups) + + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//description', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def source_description(request: Request, groups: list[str], source_id: UUID): + # Request must have read permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//description', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_source_description(request: Request, groups: list[str], source_id: UUID): + # Request must have write permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//label', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def source_label(request: Request, groups: list[str], source_id: UUID): + # Request must have read permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/sources//label', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_source_label(request: Request, groups: list[str], source_id: UUID): + # Request must have write permissions on {sourceId}. + (await app.ctx.tams.Source(request, source_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flows(request: Request, groups: list[str]): + # Restrict returned data to only the Flows that the request has read permission on. + filtered_groups = filter_read(groups) + return await app.ctx.tams.filtered_flows(request, filtered_groups) + + +@app.route('/flows/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows/', methods=['PUT']) +@handle_token() +@passthrough_if_admin() +async def put_flow(request: Request, groups: list[str], flow_id: UUID): + # If {flowId} does not currently exist, request must have write permissions on the Flow's Source ID + # if it already exists in this TAMS instance. + # If neither {flowId} nor the Source ID exist, allow if the request has create permission + # (see Creating new Flows and Sources). + # If {flowId} already exists, request must have write permissions on {flowId}. + + # Note: It is assumed that the "create permission" restriction mentioned above + # is implemented in the upstream API + flow = await app.ctx.tams.Flow(request, flow_id) + if flow.exists: + # Update Flow + flow.has_write(groups, throw=True) + + new_auth_classes = request.json.get("tags", {}).get("auth_classes", []) + flow.validate_modify_auth_classes(new_auth_classes, groups) + else: + # Create flow + source_id = request.json.get("source_id", None) + if not source_id: + raise InvalidUsage("Missing source_id") + else: + source = await app.ctx.tams.Source(request, source_id) + if source.exists: + # If the Source exists, verify the request has write access on it + source.has_write(groups, throw=True) + + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows/', methods=['DELETE']) +@handle_token() +@passthrough_if_admin() +async def delete_flow(request: Request, groups: list[str], flow_id: UUID): + # Request must have delete permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_delete(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//tags', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_tags(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//tags/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_tag(request: Request, groups: list[str], flow_id: UUID, tag_name: str): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//tags/', methods=['PUT']) +@handle_token() +@passthrough_if_admin() +async def put_flow_tag(request: Request, groups: list[str], flow_id: UUID, tag_name: str): + # Request must have write permissions on {flowId}. + # Must not permit addition of permissions the request doesn't claim. + flow = await app.ctx.tams.Flow(request, flow_id) + flow.has_write(groups, throw=True) + + if tag_name == "auth_classes": + new_auth_classes = request.json + flow.validate_modify_auth_classes(new_auth_classes, groups) + + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//tags/', methods=['DELETE']) +@handle_token() +@passthrough_if_admin() +async def delete_flow_tag(request: Request, groups: list[str], flow_id: UUID, tag_name: str): + # Request must have write permissions on {flowId}. + flow = await app.ctx.tams.Flow(request, flow_id) + flow.has_write(groups, throw=True) + + if tag_name == "auth_classes": + # Validate has permissions that will be removed + flow.validate_modify_auth_classes([], groups) + + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//description', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_description(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//description', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_flow_description(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//label', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_label(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//label', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_flow_label(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//read_only', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_read_only(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//read_only', methods=['PUT']) +@handle_token() +@passthrough_if_admin() +async def put_flow_read_only(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//flow_collection', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_collection(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//flow_collection', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_flow_collection(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//max_bit_rate', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_max_bit_rate(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//max_bit_rate', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_flow_max_bit_rate(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//avg_bit_rate', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_avg_bit_rate(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//avg_bit_rate', methods=['PUT', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def put_del_flow_avg_bit_rate(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//segments', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def flow_segments(request: Request, groups: list[str], flow_id: UUID): + # Request must have read permissions on {flowID}. + (await app.ctx.tams.Flow(request, flow_id)).has_read(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//segments', methods=['POST']) +@handle_token() +@passthrough_if_admin() +async def post_flow_segments(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}, + # and either this must be the first registration of the Media Object(s) (i.e. /objects/{objectId} returns 404) + # or the request must have read access to the Media Object(s) being written. + # Otherwise reject. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + + # Note: The following for-loop is quite inefficient and potentially slow. + # While the use of asyncio in this implementation allows for requests to be + # processed in parallel, the items in this for-loop will be processed sequentially + # but without blocking. A complete implementation may wish to parallelise this + # loop. This hasn't been done here to help with readability. + segments = request.json + if type(segments) is not list: + segments = [segments] + for segment in segments: + object_id = segment.get("object_id", None) + if not object_id: + raise InvalidUsage("Missing segments `object_id`") + + media_object = await app.ctx.tams.MediaObject(request, object_id) + + if media_object.exists: + try: + await media_object.has_read(groups, throw=True) + except NotFound, Forbidden: + raise InvalidUsage(f"Object {object_id} not found") + + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//segments', methods=['DELETE']) +@handle_token() +@passthrough_if_admin() +async def del_flow_segments(request: Request, groups: list[str], flow_id: UUID): + # Request must have delete permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_delete(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flows//storage', methods=['POST']) +@handle_token() +@passthrough_if_admin() +async def post_flow_storage(request: Request, groups: list[str], flow_id: UUID): + # Request must have write permissions on {flowId}. + (await app.ctx.tams.Flow(request, flow_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/objects/', methods=['GET', 'HEAD']) +@handle_token() +@passthrough_if_admin() +async def object(request: Request, groups: list[str], object_id: UUID): + # Restrict returned data in referenced_by_flows property to only the Flows that the request has read access to. + # If the request has read access to no Flows of this object, return 404, + # however if the request has access but all of the Flows have been filtered out, + # return the response with an empty referenced_by_flows list. + await (await app.ctx.tams.MediaObject(request, object_id)).has_read(groups, throw=True) + return await app.ctx.tams.filtered_object(request, object_id, groups) + + +@app.route('/objects//instances', methods=['POST', 'DELETE']) +@handle_token() +@passthrough_if_admin() +async def post_del_object_instances(request: Request, groups: list[str], object_id: UUID): + # Request must have write permissions on {objectId}. + await (await app.ctx.tams.MediaObject(request, object_id)).has_write(groups, throw=True) + return await app.ctx.tams.passthrough_request(request) + + +@app.route('/flow-delete-requests', methods=['HEAD', 'GET']) +@handle_token() +@passthrough_if_admin() +async def flow_delete_requests(auth_request: Request, groups): + # Admin only + raise NotFound() + + +@app.route('/flow-delete-requests/', methods=['HEAD', 'GET']) +@handle_token() +@passthrough_if_admin() +async def flow_delete_request(request: Request, groups: list[str], request_id: UUID): + # Request must have delete permissions on the Delete Request's Flow ID. + # NOTE: This implementation will only work with TAMS Service implementations that + # continue to return Flow metadata while the Flow is being deleted. + # Partial deletes should work with all implementations + + # Needs to be a GET to retrieve `flow_id` + flow_delete_request_res = await app.ctx.tams.get_upstream(request, f"flow-delete-requests/{request_id}") + flow_id = flow_delete_request_res.json["flow_id"] + (await app.ctx.tams.Flow(request, flow_id)).has_delete(groups, throw=True) + + # Handle both GET & HEAD + return await app.ctx.tams.passthrough_request(request) + + +if __name__ == "__main__": + app.run(access_log=True) diff --git a/examples/authz_proxy/permissions.py b/examples/authz_proxy/permissions.py new file mode 100644 index 00000000..f552ec54 --- /dev/null +++ b/examples/authz_proxy/permissions.py @@ -0,0 +1,54 @@ +def is_read(group: str) -> bool: + # NOTE: This may be replaced with any logic to determine if a supplied "group" has Read permissions + return group.endswith("read") + + +def is_write(group: str) -> bool: + # NOTE: This may be replaced with any logic to determine if a supplied "group" has Write permissions + return group.endswith("write") + + +def is_delete(group: str) -> bool: + # NOTE: This may be replaced with any logic to determine if a supplied "group" has Delete permissions + return group.endswith("delete") + + +def is_admin(group: str) -> bool: + # NOTE: This may be replaced with any logic to determine if a supplied "group" has Admin permissions + return group == "admin" + + +def is_any(group: str) -> bool: + return is_read(group) or is_write(group) or is_delete(group) or is_admin(group) + + +def any_is_read(groups: list[str]) -> bool: + return any([is_read(x) for x in groups]) + + +def any_is_write(groups: list[str]) -> bool: + return any([is_write(x) for x in groups]) + + +def any_is_delete(groups: list[str]) -> bool: + return any([is_delete(x) for x in groups]) + + +def any_is_admin(groups: list[str]) -> bool: + return any([is_admin(x) for x in groups]) + + +def any_is_any(groups: list[str]) -> bool: + return any([is_any(x) for x in groups]) + + +def filter_read(groups: list[str]) -> list[str]: + return list(filter(is_read, groups)) + + +def filter_write(groups: list[str]) -> list[str]: + return list(filter(is_write, groups)) + + +def filter_delete(groups: list[str]) -> list[str]: + return list(filter(is_delete, groups)) diff --git a/examples/authz_proxy/py.typed b/examples/authz_proxy/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/examples/authz_proxy/requirements.txt b/examples/authz_proxy/requirements.txt new file mode 100644 index 00000000..2498e5ea --- /dev/null +++ b/examples/authz_proxy/requirements.txt @@ -0,0 +1,5 @@ +authlib +httpx +sanic +sanic[ext] +PyJWT \ No newline at end of file diff --git a/examples/authz_proxy/resources.py b/examples/authz_proxy/resources.py new file mode 100644 index 00000000..802831ac --- /dev/null +++ b/examples/authz_proxy/resources.py @@ -0,0 +1,485 @@ +import dataclasses +import json +from uuid import UUID +from copy import deepcopy +from typing import Any, Callable, cast, Optional, List, Dict + +from httpx import AsyncClient +from sanic.exceptions import Forbidden, NotFound, InvalidUsage +from sanic.request import Request +from sanic.response import raw as raw_resp +from sanic.response import json as json_resp +from sanic.response.types import HTTPResponse +from logging import Logger + +from permissions import any_is_read, any_is_write, any_is_delete, any_is_any, filter_read, filter_write, filter_delete + + +@dataclasses.dataclass +class Tams(object): + client: AsyncClient + logger: Logger + api_url: str + + async def Flow(self, request: Request, flow_id: UUID): + return await Flow(self.client, self.api_url, request, flow_id)._async_init() + + async def Source(self, request: Request, source_id: UUID): + return await Source(self.client, self.api_url, request, source_id)._async_init() + + async def Webhook(self, request: Request, webhook_id: UUID): + return await Webhook(self.client, self.api_url, request, webhook_id)._async_init() + + async def MediaObject(self, request: Request, object_id: UUID): + return await MediaObject(self.client, self.api_url, request, object_id)._async_init() + + async def passthrough_request(self, request: Request) -> HTTPResponse: + if request.path == "": + target_url = self.api_url + else: + target_url = f"{self.api_url}{request.path}" + + self.logger.info(f"Proxying {request.method} request to {request.path} -> {target_url}") + + # Suppress any HTML render, since the links won't work + if "accept" in request.headers: + del (request.headers["accept"]) + + if "content-length" in request.headers: + del (request.headers["content-length"]) + + res = await self.client.request( + method=request.method, + headers=request.headers, + url=target_url, + json=request.json, + params=request.args + ) + + if res.status_code == 301 or res.status_code == 302: + # Rewrite the location header to avoid redirecting upstream + assert request.conn_info + original_server = request.conn_info.server + protocol = "https" if request.conn_info.ssl else "http" + original_origin = f"{protocol}://{original_server}" + res.headers["Location"] = res.headers["Location"].replace(self.api_url, original_origin) + + return raw_resp(res.text, res.status_code, cast(dict[str, str], res.headers)) + + async def get_upstream(self, request: Request, url: str, params: Optional[dict] = None) -> Any: + headers_copy = deepcopy(request.headers) + if "content-length" in headers_copy: + del (headers_copy["content-length"]) + + return await self.client.request( + method="GET", + headers=headers_copy, + url=f"{self.api_url}{url}", + params=params + ) + + async def _filtered_resource_listing(self, request: Request, groups: list[str]) -> HTTPResponse: + # First request the resources of this type the user has access to + orig_auth_classes_str = request.args.get("tag.auth_classes", "") + orig_auth_classes = orig_auth_classes_str.split(",") if orig_auth_classes_str else [] + request.args["tag.auth_classes"] = ",".join(groups) + + res = await self.passthrough_request(request) + + json_body = json.loads(res.body) if res.body else None + + filtered_body: Any = [] + + # Handle user specified filter on `auth_classes` + if len(orig_auth_classes) > 0 and (type(json_body) is list): + for item in json_body: + # Note: This could return empty lists before the last page + # A more complete implementation could recurse + auth_classes = item.get("tags", {}).get("auth_classes", []) + if set(auth_classes).intersection(orig_auth_classes): + filtered_body.append(item) + else: + filtered_body = json_body + + return json_resp(filtered_body, res.status, cast(dict[str, str], res.headers)) + + async def filtered_sources(self, request: Request, groups: list[str]) -> HTTPResponse: + return await self._filtered_resource_listing(request, groups) + + async def filtered_flows(self, request: Request, groups: list[str]) -> HTTPResponse: + return await self._filtered_resource_listing(request, groups) + + async def filtered_webhooks(self, request: Request, groups: list[str]) -> HTTPResponse: + return await self._filtered_resource_listing(request, groups) + + async def filtered_object(self, request: Request, object_id: UUID, groups: list[str]) -> HTTPResponse: + orig_auth_classes_str = request.args.get("tag.auth_classes", "") + orig_auth_classes = orig_auth_classes_str.split(",") if orig_auth_classes_str else [] + + request.args["tag.auth_classes"] = ",".join(groups) + + res = await self.passthrough_request(request) + + assert res.body + json_body = json.loads(res.body) + + filtered_referenced_by_flows = [] + + # Handle user specified filter on `auth_classes` + if len(orig_auth_classes) > 0: + for flow in json_body["referenced_by_flows"]: + flow = await self.Flow(request, flow) + if flow.has_any(orig_auth_classes): + filtered_referenced_by_flows.append(flow) + else: + filtered_referenced_by_flows = json_body["referenced_by_flows"] + + json_body["referenced_by_flows"] = filtered_referenced_by_flows + + return json_resp(json_body, res.status, cast(dict[str, str], res.headers)) + + async def validate_webhook(self, request: Request, groups: list[str]): + # Validates webhook based on permissions + + # Reject wildcard webhooks + events = request.json.get("events", []) + if any(event.startswith("flows/") for event in events): + if ( + not request.json.get("flow_ids", []) + and not request.json.get("source_ids", []) + and not request.json.get("flow_collected_by_ids", []) + and not request.json.get("source_collected_by_ids", [])): + raise InvalidUsage( + "When subscribing to Flow events, one or more of 'flow_ids', " + "'source_ids', 'flow_collected_by_ids', and 'source_collected_by_ids' must be specified") + + if any(event.startswith("sources/") for event in events): + if not request.json.get("source_ids", []) and not request.json.get("source_collected_by_ids", []): + raise InvalidUsage("When subscribing to Source events, one or both of " + "'source_ids' and 'source_collected_by_ids' must be specified") + + # If any filter Flows are invalid, reject immediately + combined_flow_ids = (set(request.json.get("flow_ids", [])) + | set(request.json.get("flow_collected_by_ids", []))) + for flow_id in combined_flow_ids: + if not (await self.Flow(request, flow_id)).has_read(groups): + raise InvalidUsage("One or more filter Flow IDs don't exist or have insufficient permissions") + + # If any filter Sources are invalid, reject immediately + combined_source_ids = (set(request.json.get("source_ids", [])) + | set(request.json.get("source_collected_by_ids", []))) + for source_id in combined_source_ids: + if not (await self.Source(request, source_id)).has_read(groups): + raise InvalidUsage("One or more filter Source IDs don't exist or have insufficient permissions") + + def decode_link_header(self, link_header: str) -> List[Dict[str, Any]]: + processed_links = [] + working_links = link_header.split(",") + for working_link in working_links: + parts = working_link.split(";") + url = parts[0].strip().lstrip("<").rstrip(">") + + link_params = {} + for working_param in parts[1:]: + param, val = working_param.strip().split("=", 1) + + # Remove leading/trailing quotation marks + val = val.split("\"", 1)[-1].rsplit("\"", 1)[0] + link_params[param] = val + + processed_links.append({ + "url": url, + "params": link_params + }) + + return processed_links + + async def static_webhook(self, request: Request, groups: list[str]): + # Generates a staticly computed list of Flows and Sources to subscribe to based on permissions + # Source/Flow ID filters are applied such that all must be satisfied + + # Statically evaluate Sources to include + static_sources = set(request.json.get("source_ids", [])) + + for source_id in request.json.get("source_collected_by_ids", []): + res = (await self.get_upstream(request, f"/sources/{source_id}")) + collected_sources = res.json().get("source_collection", []) if res.status_code == 200 else [] + for collected_source in collected_sources: + collected_source_id = collected_source["id"] + if (await self.Source(request, collected_source_id)).has_read(groups): + if "source_ids" in request.json: + static_sources &= {collected_source_id} + else: + static_sources.add(collected_source_id) + + # Statically evaluate Flows to include + static_flows = set(request.json.get("flow_ids", [])) + + for flow_id in request.json.get("flow_collected_by_ids", []): + res = await self.get_upstream(request, f"/flows/{flow_id}/flow_collection") + collected_flows = res.json() if res.status_code == 200 else [] + + for collected_flow in collected_flows: + collected_flow_id = collected_flow["id"] + if (await self.Flow(request, collected_flow_id)).has_read(groups): + if "flow_ids" in request.json: + static_flows &= {collected_flow_id} + else: + static_flows.add(collected_flow_id) + + # If subscribed to flow events, get flows from sources + events = request.json.get("events", []) + if any(event.startswith("flows/") for event in events): + for source_id in static_sources: + res = await self.get_upstream(request, "/flows", {"source_id": source_id}) + while True: + for flow in res.json(): + flow_id = flow["id"] + if (await self.Flow(request, flow_id)).has_read(groups): + if any([x in request.json for x in ["flow_ids", "flow_collected_by_ids"]]): + static_flows &= {flow_id} + else: + static_flows.add(flow_id) + + # Follow paging links + links = self.decode_link_header(res.headers.get("link", "")) + for link in links: + if link["params"].get("rel", "") == "next": + next_link = link["url"] + break + else: + # No "next" link found. Exit while loop. + break + + next_path = next_link.split(self.api_url, 1)[-1] + res = await self.get_upstream(request, next_path) + + # Verify flattend list isn't empty + # This step is really important! If the flattened lists are empty, they will be + # interpreted as a wildcard match and return data the user shouldn't have access to! + events = request.json.get("events", []) + if any(event.startswith("flows/") for event in events): + if not static_flows: + raise InvalidUsage("Webhook subscribes to Flow events, but doesn't match any Flow IDs") + + if any(event.startswith("sources/") for event in events): + if not static_sources: + raise InvalidUsage("Webhook subscribes to Source events, but doesn't match any Source IDs") + + # Build statically filtered request + static_body = request.json + static_body.pop("flow_collected_by_ids", None) + static_body.pop("source_collected_by_ids", None) + if static_flows: + static_body["flow_ids"] = list(static_flows) + if static_sources: + static_body["source_ids"] = list(static_sources) + + # Note: We can't use deepcopy() here due to C optimisations in Request + static_request = Request( + url_bytes=request.path.encode(), + headers=request.headers, + version=request.version, + method=request.method, + transport=request.transport, + app=request.app, + head=request.head + ) + # Make sure the signalled encoding matches the new bodies encoding + static_request.headers["Content-Type"] = "application/json; charset=utf-8" + static_request.body = json.dumps(static_body).encode("UTF-8") + + return static_request + + +@dataclasses.dataclass +class Resource(object): + client: AsyncClient + api_url: str + request: Request + + classes: list[str] = dataclasses.field(init=False, repr=False, default_factory=list) + exists: bool = dataclasses.field(init=False, default=False) + + async def _get_upstream(self, url: str, throw: bool = False) -> Any: + headers_copy = deepcopy(self.request.headers) + if "content-length" in headers_copy: + del (headers_copy["content-length"]) + + res = await self.client.request( + method="GET", + headers=headers_copy, + url=f"{self.api_url}/{url}" + ) + + if res.status_code != 200: + if throw: + raise NotFound() + return {} + else: + self.exists = True + return res.json() + + def has_permission(self, + check_function: Callable[[list[str]], bool], + groups: list[str], + throw: bool = False) -> bool: + if throw and not self.exists: + raise NotFound() + + has_permission = check_function(list(set(self.classes) & set(groups))) + + if throw and not has_permission: + self.has_any(groups, throw) # Will raise NotFound if has no permissions + raise Forbidden("Insufficient permissions") + + return has_permission + + def has_read(self, groups: list[str], throw: bool = False) -> bool: + return self.has_permission(any_is_read, groups, throw) + + def has_write(self, groups: list[str], throw: bool = False) -> bool: + return self.has_permission(any_is_write, groups, throw) + + def has_delete(self, groups: list[str], throw: bool = False) -> bool: + return self.has_permission(any_is_delete, groups, throw) + + def has_any(self, groups: list[str], throw: bool = False) -> bool: + if throw and not self.exists: + raise NotFound() + + has_permission = any_is_any(list(set(self.classes) & set(groups))) + + if throw and not has_permission: + raise NotFound() + + return has_permission + + def validate_modify_auth_classes(self, new_auth_classes: list[str], groups: list[str]) -> None: + added_classes = list(set(new_auth_classes) - set(self.classes)) + removed_classes = list(set(self.classes) - set(new_auth_classes)) + changed_classes = list(set(added_classes + removed_classes)) + + self.has_write(groups, throw=True) + + if any_is_read(changed_classes): + self.has_read(groups, throw=True) + + if any_is_delete(changed_classes): + self.has_delete(groups, throw=True) + + +class Flow(Resource): + def __init__(self, client: AsyncClient, api_url: str, request: Request, flow_id: UUID) -> None: + super().__init__(client, api_url, request) + self.flow_id = flow_id + + async def _async_init(self): + _flow_data = await self._get_upstream(f"flows/{self.flow_id}") + + if self.exists: + self.classes = _flow_data.get("tags", {}).get("auth_classes", []) + self.source_id = _flow_data["source_id"] + + return self + + +class Source(Resource): + def __init__(self, client: AsyncClient, api_url: str, request: Request, source_id: UUID) -> None: + super().__init__(client, api_url, request) + self.source_id = source_id + + async def _async_init(self): + _source_data = await self._get_upstream(f"sources/{self.source_id}") + + if self.exists: + self.classes = _source_data.get("tags", {}).get("auth_classes", []) + return self + + +class Webhook(Resource): + def __init__(self, client: AsyncClient, api_url: str, request: Request, webhook_id: UUID) -> None: + super().__init__(client, api_url, request) + self.webhook_id = webhook_id + + async def _async_init(self): + _webhook_data = await self._get_upstream(f"service/webhooks/{self.webhook_id}") + + if self.exists: + self.classes = _webhook_data.get("tags", {}).get("auth_classes", []) + return self + + +@dataclasses.dataclass +class MediaObject(object): + client: AsyncClient + api_url: str + request: Request + object_id: UUID + + exists: bool = dataclasses.field(init=False, default=False) + + async def _get_upstream(self, groups: list[str], throw: bool = False) -> Any: + headers_copy = deepcopy(self.request.headers) + if "content-length" in headers_copy: + del (headers_copy["content-length"]) + + args = {"flow_tag.auth_classes": ",".join(groups)} + + res = await self.client.request( + method="GET", + headers=headers_copy, + url=f"{self.api_url}/objects/{self.object_id}", + params=args + ) + + if res.status_code != 200: + if throw: + raise NotFound() + return {} + else: + self.exists = True + + return res.json() + + async def _async_init(self): + # Prime self.exists + await self._get_upstream([]) + + return self + + async def has_permission(self, groups: list[str], throw: bool = False) -> bool: + media_object = await self._get_upstream(groups) + + if throw and not self.exists: + raise NotFound() + + has_permission = (media_object.get("referenced_by_flows", []) != []) + + if throw and not has_permission: + await self.has_any(groups, throw) # Will raise NotFound if has no permissions + raise Forbidden("Insufficient permissions") + + return has_permission + + async def has_read(self, groups: list[str], throw: bool = False) -> bool: + return await self.has_permission(filter_read(groups), throw) + + async def has_write(self, groups: list[str], throw: bool = False) -> bool: + return await self.has_permission(filter_write(groups), throw) + + async def has_delete(self, groups: list[str], throw: bool = False) -> bool: + return await self.has_permission(filter_delete(groups), throw) + + async def has_any(self, groups: list[str], throw: bool = False) -> bool: + media_object = await self._get_upstream(groups) + + if throw and not self.exists: + raise NotFound() + + has_permission = (media_object.get("referenced_by_flows", []) != []) + + if throw and not has_permission: + raise NotFound() + + return has_permission diff --git a/examples/authz_proxy/test-requirements.txt b/examples/authz_proxy/test-requirements.txt new file mode 100644 index 00000000..b3f4974f --- /dev/null +++ b/examples/authz_proxy/test-requirements.txt @@ -0,0 +1,2 @@ +flake8 +mypy \ No newline at end of file diff --git a/examples/authz_proxy/token_checker.py b/examples/authz_proxy/token_checker.py new file mode 100644 index 00000000..aef85d83 --- /dev/null +++ b/examples/authz_proxy/token_checker.py @@ -0,0 +1,44 @@ +import os +import sanic +from sanic.exceptions import Unauthorized +from sanic.log import logger + +import jwt +from jwt import PyJWKClient, InvalidSignatureError, ExpiredSignatureError + +GROUPS_CLAIM = os.environ.get("GROUPS_CLAIM", "cognito:groups") + + +def get_signing_key(jwks_url: str): + jwks_client = PyJWKClient(jwks_url) + jwks_client.get_signing_keys() + logger.info("Got signing keys") + return jwks_client + + +def verify_token(request: sanic.Request, jwks_client: PyJWKClient) -> dict: + try: + token_header = request.headers["Authorization"] + except KeyError: + raise Unauthorized() + + token = token_header[7:] + signing_key = jwks_client.get_signing_key_from_jwt(token) + + try: + token_decoded = jwt.decode(token, signing_key, algorithms=["RS256"]) + except InvalidSignatureError as e: + logger.exception(e) + raise Unauthorized() + except ExpiredSignatureError as e: + logger.exception(e) + raise Unauthorized() + + return token_decoded + + +def get_groups_in_token(token: dict) -> list[str]: + try: + return token[GROUPS_CLAIM] + except KeyError: + return [] From d6171d2ba52ded11a7731b9c890bb199b4521e40 Mon Sep 17 00:00:00 2001 From: James Sandford Date: Thu, 12 Feb 2026 15:29:35 +0000 Subject: [PATCH 3/5] Add auth proxy to examples readme --- examples/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/README.md b/examples/README.md index f508847f..eebdaa16 100644 --- a/examples/README.md +++ b/examples/README.md @@ -178,3 +178,7 @@ This mode can be used by adding the `--cut-interval-sec ` parameter to will cut between the two Flows on that interval. The resulting Flow will not be playable using simple tools (such as direct HLS mappings) and will require a client that fully implements the TAMS specification, including handling long-GOP precharge if necessary. + +### Authorization Proxy ([authz_proxy](./authz_proxy)) + +This [authorization proxy](./authz_proxy) demonstrates Fine-Grained Authorisation (FGA) using a reverse proxy in front of a TAMS API instance, by matching user group membership to an `auth_classes` tag on Sources, Flows and Webhooks. From d64fad74bf9f1915c0a9e54ac6002c8e256e2b34 Mon Sep 17 00:00:00 2001 From: James Sandford Date: Thu, 12 Feb 2026 15:30:20 +0000 Subject: [PATCH 4/5] Fix links in examples readme --- examples/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/README.md b/examples/README.md index eebdaa16..511cb33f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -113,7 +113,7 @@ If the timing is known to be 30 Hz (which it is for this sample content) then th ### Outgest File ([outgest_file.py](./outgest_file.py)) -The [outgest_file.py](./ingest_hls.py) script demonstrates how Flow media can be exported to a local file. +The [outgest_file.py](./outgest_file.py) script demonstrates how Flow media can be exported to a local file. Run the script as follows (replace `` and set `` to the Flow ID logged by the [ingest HLS script](#ingest-hls-ingest_hlspy)), @@ -139,7 +139,7 @@ The script follows these steps: ### Simple Edit ([simple_edit.py](./simple_edit.py)) -The [simple_edit.py](./ingest_hls.py) script demonstrates how media can be shared between Flows using a lightweight metadata-only operation that constructs a Flow from timeranges of other Flows. +The [simple_edit.py](./simple_edit.py) script demonstrates how media can be shared between Flows using a lightweight metadata-only operation that constructs a Flow from timeranges of other Flows. The script takes 2 Flows and timeranges as inputs, and creates an output Flow that is a concatenation of the 2 inputs, containing at most one page of segments from each. Firstly, create the 2 input Flows from the sample content. From e821db88619b406ee9237ca6cd159b7f2c132b3d Mon Sep 17 00:00:00 2001 From: James Sandford Date: Tue, 17 Feb 2026 10:59:13 +0000 Subject: [PATCH 5/5] Fix tests --- examples/authz_proxy/.flake8 | 3 +++ examples/authz_proxy/api.py | 2 +- examples/utils/client.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 examples/authz_proxy/.flake8 diff --git a/examples/authz_proxy/.flake8 b/examples/authz_proxy/.flake8 new file mode 100644 index 00000000..e74feb55 --- /dev/null +++ b/examples/authz_proxy/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 120 +exclude = .git,.tox,.tox-package,.mypy_cache,dist,deb_dist,__pycache__,venv,testvenv diff --git a/examples/authz_proxy/api.py b/examples/authz_proxy/api.py index cfda9b2f..d09c1aff 100644 --- a/examples/authz_proxy/api.py +++ b/examples/authz_proxy/api.py @@ -513,7 +513,7 @@ async def post_flow_segments(request: Request, groups: list[str], flow_id: UUID) if media_object.exists: try: await media_object.has_read(groups, throw=True) - except NotFound, Forbidden: + except (NotFound, Forbidden): raise InvalidUsage(f"Object {object_id} not found") return await app.ctx.tams.passthrough_request(request) diff --git a/examples/utils/client.py b/examples/utils/client.py index 050815af..31b6b60a 100644 --- a/examples/utils/client.py +++ b/examples/utils/client.py @@ -6,7 +6,7 @@ import aiohttp -from credentials import Credentials, RenewableCredentials +from .credentials import Credentials, RenewableCredentials @asynccontextmanager