Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to

## [unreleased]

### Added
- perms: Support deprecation of actions with optional replacement in
authorization policy definition (#64).

### Fixed
- auth: Escape user-provided values in LDAP search filters to prevent LDAP
injection attacks and support values with parenthesis (#63).
Expand Down
96 changes: 92 additions & 4 deletions src/permissions/rfl/permissions/rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import configparser
import logging
import warnings
from pathlib import Path
from typing import Set, Tuple
from typing import Optional, Set, Tuple

from rfl.authentication.user import AuthenticatedUser
import yaml
Expand All @@ -21,6 +22,28 @@
ALL_MEMBER = "ALL"


class ActionMetadata:
"""Metadata describing an action in the policy definition."""

def __init__(
self,
name: str,
description: str,
deprecated: bool = False,
replaced_by: Optional[str] = None,
):
self.name = name
self.description = description
self.deprecated = deprecated
self.replaced_by = replaced_by

def __repr__(self):
return (
f"ActionMetadata(name={self.name}, deprecated={self.deprecated}, "
f"replaced_by={self.replaced_by})"
)


class RBACPolicyRole:
def __init__(self, name: str, members: Set[str], actions: Set[str]):
self.name = name
Expand Down Expand Up @@ -58,7 +81,7 @@ def __init__(self, path: Path = None, raw: str = None):
f"Invalid YAML policy definition: {str(err)}"
) from err
try:
self.actions = set(content["actions"].keys())
actions_content = content["actions"]
except KeyError as err:
raise RBACPolicyDefinitionLoadError(
"Actions key not found in YAML policy definition"
Expand All @@ -68,6 +91,71 @@ def __init__(self, path: Path = None, raw: str = None):
"Unable to extract the set of actions from actions key in YAML policy "
"definition"
) from err
if not isinstance(actions_content, dict):
raise RBACPolicyDefinitionLoadError(
"Actions definition must be a mapping of action names to definitions"
)

self.actions = {}
for action, definition in actions_content.items():
if isinstance(definition, str):
description = definition
deprecated = False
replaced_by = None
elif isinstance(definition, dict):
try:
description = definition["description"]
except KeyError as err:
raise RBACPolicyDefinitionLoadError(
f"Description is mandatory for action {action}"
) from err
deprecated = definition.get("deprecated", False)
if not isinstance(deprecated, bool):
raise RBACPolicyDefinitionLoadError(
f"Invalid deprecated value for action {action}"
)
replaced_by = definition.get("replaced_by")
if replaced_by is not None and not isinstance(replaced_by, str):
raise RBACPolicyDefinitionLoadError(
f"Invalid replaced_by value for action {action}"
)
else:
raise RBACPolicyDefinitionLoadError(
f"Invalid definition for action {action}"
)
self.actions[action] = ActionMetadata(
name=action,
description=description,
deprecated=deprecated,
replaced_by=replaced_by,
)

# Validate replacement references
for action, metadata in self.actions.items():
if metadata.replaced_by and metadata.replaced_by not in self.actions:
raise RBACPolicyDefinitionLoadError(
f"Replacement action {metadata.replaced_by} for {action} not found "
"in policy definition"
)

def apply(self, action: str) -> Set[str]:
"""Return effective actions for the given action name, handling deprecation.

Deprecated actions are stripped, a UserWarning is emitted, and a single
replacement action (if declared) is added instead. Unknown actions still raise
RBACPolicyRolesLoadError.
"""
metadata = self.actions[action]
if metadata.deprecated:
warning_msg = f"Action {action} is deprecated"
if metadata.replaced_by:
warning_msg += f"; use {metadata.replaced_by} instead"
warnings.warn(warning_msg, UserWarning)
if metadata.replaced_by:
return self.apply(metadata.replaced_by)
return set()

return {action}


class RBACPolicyRolesLoader:
Expand Down Expand Up @@ -132,7 +220,7 @@ def _load(self):
members = self._expand_members(self.content.get("roles", role))
self.roles.add(RBACPolicyRole(role, members, actions))

def _expand_actions(self, actions_str):
def _expand_actions(self, actions_str: str) -> Set[str]:
"""Return the set of actions declared in comma-separated list provided in
actions_str argument. If an item is prefixed by @, the set is expanded with the
actions of the role name that follows."""
Expand All @@ -145,7 +233,7 @@ def _expand_actions(self, actions_str):
raise RBACPolicyRolesLoadError(
f"Action {action} not found in policy definition"
)
actions.add(action)
actions.update(self.definition.apply(action))
return actions

def _expand_members(self, members_str):
Expand Down
193 changes: 189 additions & 4 deletions src/permissions/rfl/tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#
# SPDX-License-Identifier: LGPL-3.0-or-later

import textwrap
import unittest
import warnings

from rfl.authentication.user import AuthenticatedUser, AnonymousUser
from rfl.permissions.rbac import (
Expand All @@ -24,7 +26,21 @@
add-users: Add users to the system
view-tasks: View all tasks on the system
launch-tasks: Launch tasks on the system
edit-tasks: Edit all submitted tasks
edit-tasks:
description: Edit all submitted tasks
"""

DEFINITION_DEPRECATED = """
actions:
old-action:
description: Old action
deprecated: true
replaced_by: new-action
old-no-replace:
description: Old action without replacement
deprecated: true
new-action:
description: New action
"""

VALID_ROLES = """
Expand Down Expand Up @@ -61,7 +77,7 @@ class TestRBACPolicyDefinitionYAMLLoader(unittest.TestCase):
def test_load_valid_definition(self):
loader = RBACPolicyDefinitionYAMLLoader(raw=VALID_DEFINITION)
self.assertEqual(
loader.actions,
set(loader.actions.keys()),
{"view-users", "add-users", "view-tasks", "launch-tasks", "edit-tasks"},
)

Expand All @@ -84,11 +100,131 @@ def test_load_invalid_actions_content(self):
raw_definition = "actions: fail"
with self.assertRaisesRegex(
RBACPolicyDefinitionLoadError,
r"^Unable to extract the set of actions from actions key in YAML policy "
r"definition$",
r"^Actions definition must be a mapping of action names to definitions$",
):
RBACPolicyDefinitionYAMLLoader(raw=raw_definition)

def test_load_missing_description(self):
definition = textwrap.dedent("""
actions:
old-action:
deprecated: true
""")
with self.assertRaisesRegex(
RBACPolicyDefinitionLoadError,
r"^Description is mandatory for action old-action$",
):
RBACPolicyDefinitionYAMLLoader(raw=definition)

def test_load_invalid_deprecated_not_boolean(self):
definition = textwrap.dedent("""
actions:
old-action:
description: Old action
deprecated: "true"
""")
with self.assertRaisesRegex(
RBACPolicyDefinitionLoadError,
r"^Invalid deprecated value for action old-action$",
):
RBACPolicyDefinitionYAMLLoader(raw=definition)

def test_load_invalid_replaced_by_list(self):
definition = textwrap.dedent("""
actions:
old-action:
description: Old action
deprecated: true
replaced_by:
- a
- b
""")
with self.assertRaisesRegex(
RBACPolicyDefinitionLoadError,
r"^Invalid replaced_by value for action old-action$",
):
RBACPolicyDefinitionYAMLLoader(raw=definition)

def test_load_invalid_replaced_by_unknown(self):
definition = textwrap.dedent("""
actions:
old-action:
description: Old action
deprecated: true
replaced_by: missing
""")
with self.assertRaisesRegex(
RBACPolicyDefinitionLoadError,
r"^Replacement action missing for old-action not found in policy "
r"definition$",
):
RBACPolicyDefinitionYAMLLoader(raw=definition)

def test_load_legacy_string_description(self):
loader = RBACPolicyDefinitionYAMLLoader(raw=VALID_DEFINITION)
expected = {
"view-users": "View all users information",
"add-users": "Add users to the system",
"view-tasks": "View all tasks on the system",
"launch-tasks": "Launch tasks on the system",
"edit-tasks": "Edit all submitted tasks",
}
self.assertEqual(
{action: meta.description for action, meta in loader.actions.items()},
expected,
)

def test_apply_non_deprecated_action(self):
loader = RBACPolicyDefinitionYAMLLoader(raw=VALID_DEFINITION)
result = loader.apply("view-users")
self.assertEqual(result, {"view-users"})

def test_apply_deprecated_action_without_replacement(self):
loader = RBACPolicyDefinitionYAMLLoader(raw=DEFINITION_DEPRECATED)
with self.assertWarns(UserWarning) as warning:
result = loader.apply("old-no-replace")
self.assertEqual(result, set())
self.assertIn("Action old-no-replace is deprecated", str(warning.warning))

def test_apply_deprecated_action_with_replacement(self):
loader = RBACPolicyDefinitionYAMLLoader(raw=DEFINITION_DEPRECATED)
with self.assertWarns(UserWarning) as warning:
result = loader.apply("old-action")
self.assertEqual(result, {"new-action"})
self.assertIn("Action old-action is deprecated", str(warning.warning))
self.assertIn("use new-action instead", str(warning.warning))

def test_apply_deprecated_action_recursive_replacement(self):
definition = textwrap.dedent("""
actions:
old-action:
description: Old action
deprecated: true
replaced_by: intermediate-action
intermediate-action:
description: Intermediate action
deprecated: true
replaced_by: final-action
final-action:
description: Final action
""")
loader = RBACPolicyDefinitionYAMLLoader(raw=definition)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = loader.apply("old-action")
self.assertEqual(result, {"final-action"})
# Should warn about both deprecated actions
self.assertEqual(len(w), 2, f"Expected 2 warnings, got {len(w)}")
warnings_messages = [str(warning.message) for warning in w]
self.assertTrue(
any("old-action" in msg for msg in warnings_messages),
f"Expected warning about old-action in {warnings_messages}",
)
self.assertTrue(
any("intermediate-action" in msg for msg in warnings_messages),
f"Expected warning about intermediate-action in {warnings_messages}",
)


class TestRBACPolicyRolesIniLoader(unittest.TestCase):
def test_load_valid_roles(self):
Expand Down Expand Up @@ -171,6 +307,55 @@ def test_load_roles_undefined_expand(self):
):
loader._load()

def test_load_roles_deprecated_replaced(self):
definition = RBACPolicyDefinitionYAMLLoader(raw=DEFINITION_DEPRECATED)
roles = textwrap.dedent("""
[roles]
user=ALL

[user]
actions=old-action
""")
with self.assertWarns(UserWarning):
loader = RBACPolicyRolesIniLoader(definition=definition, raw=roles)
# old-action stripped, new-action added
user_role = next(role for role in loader.roles if role.name == "user")
self.assertEqual(user_role.actions, {"new-action"})

def test_load_roles_deprecated_without_replacement(self):
definition = RBACPolicyDefinitionYAMLLoader(raw=DEFINITION_DEPRECATED)
roles = textwrap.dedent("""
[roles]
user=ALL

[user]
actions=old-no-replace
""")
with self.assertWarns(UserWarning):
loader = RBACPolicyRolesIniLoader(definition=definition, raw=roles)
user_role = next(role for role in loader.roles if role.name == "user")
self.assertEqual(user_role.actions, set())

def test_load_roles_inheritance_with_deprecated(self):
definition = RBACPolicyDefinitionYAMLLoader(raw=DEFINITION_DEPRECATED)
roles = textwrap.dedent("""
[roles]
base=ALL
user=ALL

[base]
actions=old-action

[user]
actions=@base
""")
with self.assertWarns(UserWarning):
loader = RBACPolicyRolesIniLoader(definition=definition, raw=roles)
base_role = next(role for role in loader.roles if role.name == "base")
user_role = next(role for role in loader.roles if role.name == "user")
self.assertEqual(base_role.actions, {"new-action"})
self.assertEqual(user_role.actions, {"new-action"})


class TestRBACPolicyManager(unittest.TestCase):
def test_manager_init(self):
Expand Down