Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 15 additions & 27 deletions asab/web/auth/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ def __init__(self, claims: dict, id_token: str = None):


def __repr__(self):
try:
return "<Authorization {}cid={!r}>".format(
"[SUPERUSER] " if self.has_superuser_access() else "",
self.CredentialsId,
)
except NotAuthenticatedError:
if not self.is_valid():
return "<Authorization [EXPIRED] cid={!r}>".format(
self.CredentialsId
)
return "<Authorization {}cid={!r}>".format(
"[SUPERUSER] " if self.has_superuser_access() else "",
self.CredentialsId,
)


def is_valid(self):
Expand All @@ -84,10 +83,7 @@ def has_superuser_access(self) -> bool:
Check whether the agent is a superuser.

Returns:
bool: Does the agent have superuser access?

Raises:
NotAuthenticatedError: When the authorization is expired or otherwise invalid.
bool: Does the subject have superuser access?

Examples:
>>> import asab.contextvars
Expand All @@ -97,8 +93,7 @@ def has_superuser_access(self) -> bool:
>>> else:
>>> print("I am but a mere mortal.")
"""
self.require_valid()
return is_superuser(self._Resources)
return self.is_valid() and is_superuser(self._Resources)


def has_resource_access(self, *resources: str) -> bool:
Expand All @@ -109,10 +104,7 @@ def has_resource_access(self, *resources: str) -> bool:
*resources (str): A variable number of resource IDs whose authorization is requested.

Returns:
bool: Is the agent authorized to access requested resources?

Raises:
NotAuthenticatedError: When the authorization is expired or otherwise invalid.
bool: Is the subject authorized to access requested resources?

Examples:
>>> import asab.contextvars
Expand All @@ -122,8 +114,7 @@ def has_resource_access(self, *resources: str) -> bool:
>>> else:
>>> print("Not much to do here.")
"""
self.require_valid()
return has_resource_access(self._Resources, resources, tenant=Tenant.get(None))
return self.is_valid() and has_resource_access(self._Resources, resources, tenant=Tenant.get(None))


def has_tenant_access(self, tenant=None) -> bool:
Expand All @@ -134,10 +125,7 @@ def has_tenant_access(self, tenant=None) -> bool:
tenant (str, optional): The tenant to check access for. If None, uses the tenant from context.

Returns:
bool: Is the agent authorized to access requested tenant?

Raises:
NotAuthenticatedError: When the authorization is expired or otherwise invalid.
bool: Is the subject authorized to access requested tenant?

Examples:
>>> # Using tenant context
Expand All @@ -155,15 +143,13 @@ def has_tenant_access(self, tenant=None) -> bool:
>>> # Specifying tenant directly
>>> authz.has_tenant_access("big-corporation")
"""
self.require_valid()

if tenant is None:
try:
tenant = Tenant.get()
except LookupError as e:
raise ValueError("No tenant in context.") from e

return has_tenant_access(self._Resources, tenant)
return self.is_valid() and has_tenant_access(self._Resources, tenant)


def require_valid(self):
Expand Down Expand Up @@ -193,7 +179,8 @@ def require_superuser_access(self):
>>> authz.require_superuser_access()
>>> print("I am a superuser and can do anything!")
"""
if not self.has_superuser_access():
self.require_valid()
if not is_superuser(self._Resources):
L.warning("Superuser authorization required.", struct_data={
"cid": self.CredentialsId})
raise AccessDeniedError()
Expand All @@ -216,7 +203,8 @@ def require_resource_access(self, *resources: str):
>>> authz.require_resource_access("article:read", "article:write")
>>> print("I can read and write articles!")
"""
if not self.has_resource_access(*resources):
self.require_valid()
if not has_resource_access(self._Resources, resources, tenant=Tenant.get(None)):
L.warning("Resource authorization required.", struct_data={
"resource": resources, "cid": self.CredentialsId})
scope = set()
Expand Down
24 changes: 16 additions & 8 deletions test/test_auth/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,37 @@ def test_valid(self):
self.Authz.require_valid()

def test_resource_access(self):
with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.has_resource_access(RESOURCE_1)
self.assertFalse(
self.Authz.has_resource_access(RESOURCE_1),
"Access to RESOURCE_1 is not authorized with expired authorization.",
)

with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.require_resource_access(RESOURCE_1)

def test_tenant_access(self):
with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.has_tenant_access()
self.assertFalse(
self.Authz.has_tenant_access(),
"Access to TENANT_1 is not authorized with expired authorization.",
)

with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.require_tenant_access()

# Explicit tenant param
with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.has_tenant_access(TENANT_1)
self.assertFalse(
self.Authz.has_tenant_access(TENANT_1),
"Access to TENANT_1 is not authorized with expired authorization.",
)

with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.require_tenant_access(TENANT_1)

def test_superuser_access(self):
with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.has_superuser_access()
self.assertFalse(
self.Authz.has_superuser_access(),
"Superuser access is not valid with expired authorization.",
)

with self.assertRaises(asab.exceptions.NotAuthenticatedError):
self.Authz.require_superuser_access()
Expand Down
Loading