From 69d505eadd65a1a034cb0a8cac3044eb8eb70bb3 Mon Sep 17 00:00:00 2001 From: Cal Paterson Date: Thu, 19 Sep 2024 15:26:22 +0300 Subject: [PATCH 1/2] Write demo version of alternate permissions implementation --- csvbase/auth.py | 77 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_auth.py | 22 +++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 csvbase/auth.py create mode 100644 tests/test_auth.py diff --git a/csvbase/auth.py b/csvbase/auth.py new file mode 100644 index 0000000..f0974dd --- /dev/null +++ b/csvbase/auth.py @@ -0,0 +1,77 @@ +"""Work in progress alternate permissions implementation""" + +from enum import Enum +from typing import Union +from typing_extensions import Literal +from uuid import UUID + +from sqlalchemy import cast, types as satypes +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select, literal_column, union_all + +from csvbase.value_objs import Table +from csvbase import models +from csvbase.web.func import get_current_user + + +class ObjectType(Enum): + TABLE = 1 + COMMENT = 2 + + def as_sql(self): + return literal_column(str(self.value), satypes.SmallInteger) + + +class ActionType(Enum): + READ = 1 + WRITE = 2 + ADMIN = 3 + + def as_sql(self): + return literal_column(str(self.value), satypes.SmallInteger) + + +def _build_table_permissions_subselect(sesh: Session, user_uuid: UUID): + """Return all the table permissions that the given user has (as a query).""" + public_table_permissions = select( + ObjectType.TABLE.as_sql().label("object"), + models.Table.table_uuid, + ActionType.READ.as_sql().label("action"), + ).where(models.Table.public) + private_table_permissions = select( + ObjectType.TABLE.as_sql().label("object"), + models.Table.table_uuid, + ActionType.WRITE.as_sql().label("action"), + ).where(models.Table.user_uuid == user_uuid) + + return union_all(private_table_permissions, public_table_permissions) + + +def ensure_table_access( + sesh: Session, table: Table, mode: Union[Literal["read"], Literal["write"]] +) -> None: + """Return happily if user is allowed to access the given table, raise otherwise.""" + current_user = get_current_user() + action = ActionType.READ if mode == "read" else ActionType.WRITE + + # Users's current permissions + table_permissions = _build_table_permissions_subselect( + sesh, current_user.user_uuid + ).subquery() + + # Check that user has access to do what they are currently doing + exists_stmt = ( + select(table_permissions) + .where( + table_permissions.c.object == ObjectType.TABLE.as_sql(), + table_permissions.c.table_uuid == table.table_uuid, + table_permissions.c.action == action.as_sql(), + ) + .exists() + ) + + rv = sesh.execute(select(exists_stmt)).scalar() + + # If they do not have sufficient perms, raise + if not rv: + raise RuntimeError("not allowed") diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..daf7d0b --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,22 @@ +from csvbase import auth + +import pytest + +from . import utils + + +def test_auth__can_read_own_tables(sesh, test_user, ten_rows): + with utils.current_user(test_user): + auth.ensure_table_access(sesh, ten_rows, "read") + + +def test_auth__can_write_own_tables(sesh, test_user, ten_rows): + with utils.current_user(test_user): + auth.ensure_table_access(sesh, ten_rows, "write") + + +def test_auth__cannot_write_other_peoples_tables(sesh, test_user, ten_rows, crypt_context): + other_user = utils.make_user(sesh, crypt_context) + with utils.current_user(other_user): + with pytest.raises(RuntimeError): + auth.ensure_table_access(sesh, ten_rows, "write") From 18cabd5e2edf380bf9302c4f3035e794ff1d4d74 Mon Sep 17 00:00:00 2001 From: Cal Paterson Date: Thu, 19 Sep 2024 16:10:35 +0300 Subject: [PATCH 2/2] Lint --- csvbase/auth.py | 4 ++-- tests/test_auth.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/csvbase/auth.py b/csvbase/auth.py index f0974dd..7d793ef 100644 --- a/csvbase/auth.py +++ b/csvbase/auth.py @@ -5,7 +5,7 @@ from typing_extensions import Literal from uuid import UUID -from sqlalchemy import cast, types as satypes +from sqlalchemy import types as satypes from sqlalchemy.orm import Session from sqlalchemy.sql.expression import select, literal_column, union_all @@ -56,7 +56,7 @@ def ensure_table_access( # Users's current permissions table_permissions = _build_table_permissions_subselect( - sesh, current_user.user_uuid + sesh, current_user.user_uuid # type: ignore ).subquery() # Check that user has access to do what they are currently doing diff --git a/tests/test_auth.py b/tests/test_auth.py index daf7d0b..ce968b5 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -15,7 +15,9 @@ def test_auth__can_write_own_tables(sesh, test_user, ten_rows): auth.ensure_table_access(sesh, ten_rows, "write") -def test_auth__cannot_write_other_peoples_tables(sesh, test_user, ten_rows, crypt_context): +def test_auth__cannot_write_other_peoples_tables( + sesh, test_user, ten_rows, crypt_context +): other_user = utils.make_user(sesh, crypt_context) with utils.current_user(other_user): with pytest.raises(RuntimeError):