diff --git a/.flake8 b/.flake8 index 62a48b1697..19f3b4fc07 100644 --- a/.flake8 +++ b/.flake8 @@ -117,6 +117,11 @@ per-file-ignores = # additionally test docstrings don't need param lists (DAR, DCO020): tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 + # The following ignores have been researched and should be considered permanent: + # WPS202: two-plugin module (secret store + SSH) with Protocol types, TypedDicts, + # and multiple helpers per plugin; restructuring would harm readability. + src/awx_plugins/credentials/akeyless.py: WPS202 + # The following ignores must be fixed and the entries removed from this config: src/awx_plugins/credentials/aim.py: ANN003, ANN201, B950, CCR001, D100, D103, LN001, Q003, WPS210, WPS221, WPS223, WPS231, WPS336, WPS432 src/awx_plugins/credentials/aws_secretsmanager.py: ANN003, ANN201, D100, D103, WPS111, WPS210, WPS329, WPS529 diff --git a/.mypy.ini b/.mypy.ini index 8520d2ae1a..b29d37daee 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -62,6 +62,15 @@ warn_unused_ignores = true # crashes with some decorators like `@functools.cache`: disallow_any_expr = false +[mypy-awx_plugins.credentials.akeyless] +# The akeyless SDK does not ship type stubs. These suppressions are needed +# until the SDK gains proper typing support: +disallow_any_expr = false +disallow_any_unimported = false +disallow_any_explicit = false +disallow_untyped_calls = false +warn_return_any = false + [mypy-awx_plugins.credentials.aws_secretsmanager] # crashes with some decorators like `@functools.cache`: disallow_any_expr = false @@ -106,8 +115,23 @@ disallow_any_expr = false # crashes with some decorators like `@tox.plugin.impl`: disallow_any_expr = false +[mypy-akeyless] +# The akeyless SDK does not ship type stubs: +ignore_missing_imports = true + +[mypy-akeyless.*] +# The akeyless SDK does not ship type stubs: +ignore_missing_imports = true + [mypy-tests.*] # crashes with some decorators like `@pytest.mark.parametrize`: disallow_any_expr = false # fails on `@hypothesis.given()`: disallow_any_decorated = false +# fixture return types like `Callable[..., object]` use `...` as Any: +disallow_any_explicit = false + +[mypy-tests.akeyless_test] +# Mock API objects are typed as `object`; test kwargs use generic dict +# rather than the specific TypedDicts the plugin functions expect: +disable_error_code = attr-defined, arg-type diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 422835b9d3..9b2f670284 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -319,6 +319,7 @@ repos: - pytest-mock # needed by pylint-pytest since it picks up pytest's args - pytest-subtests # needed by pylint-pytest since it picks up pytest's args - pytest-xdist # needed by pylint-pytest since it picks up pytest's args + - akeyless >= 5.0.8 # needed by credentials.akeyless and its tests - python-dsv-sdk # needed by credentials.dsv, credentials.thycotic_dsv - PyYAML # needed by credentials.injectors, inventory.plugins - Sphinx # needed by the Sphinx extension stub diff --git a/.pylintrc.toml b/.pylintrc.toml index 3ffdc0e837..d95ae6d132 100644 --- a/.pylintrc.toml +++ b/.pylintrc.toml @@ -106,7 +106,11 @@ py-version = "3.11" # source root is an absolute path or a path relative to the current working # directory used to determine a package namespace for modules located under the # source root. -# source-roots = +# Setting src/ prevents pylint from adding individual credential module +# directories to sys.path, which would cause false "module imports itself" +# warnings and circular-import errors for files with the same name as their +# third-party dependencies (e.g. akeyless.py vs the akeyless SDK): +source-roots = ["src"] # Allow loading of arbitrary C extensions. Extensions are imported into the # active Python interpreter and may run arbitrary code. diff --git a/.ruff.toml b/.ruff.toml index a8ecdbe6c6..52dd17cb11 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -173,6 +173,9 @@ testing = [ "S101", # Allow use of `assert` in test files "S105", # hardcoded-password-string "S106", # hardcoded-password-func-arg + "S107", # hardcoded-password-func-default: test helper factories use + # credential-like param names (token, secret_data) as configurable + # defaults — these are never real credentials "S108", # tmp dirs "S404", # Allow importing 'subprocess' module to testing call external tools needed by these hooks "S603", # subprocess calls diff --git a/docs/conf.py b/docs/conf.py index 1cb0f78ee1..c94a71da94 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -234,6 +234,12 @@ 'awx_plugins.interfaces._temporary_private_credential_api.Credential', ), ('py:class', 'EnvVarsType'), + # Akeyless SDK types: auto-generated from OpenAPI + # without type annotations or .pyi stubs + ('py:class', 'akeyless.models.auth.Auth'), + ('py:class', 'akeyless.models.describe_item.DescribeItem'), + ('py:class', 'akeyless.models.get_secret_value.GetSecretValue'), + ('py:class', 'akeyless.models.get_ssh_certificate.GetSSHCertificate'), ] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index a0d15b1f16..7de124f9c4 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,3 +1,4 @@ +Akeyless Ansible Approle async diff --git a/pyproject.toml b/pyproject.toml index 0606243dac..a7f9c9bd84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,6 +77,8 @@ centrify_vault_kv = "awx_plugins.credentials.centrify_vault:centrify_plugin" thycotic_dsv = "awx_plugins.credentials.dsv:dsv_plugin" thycotic_tss = "awx_plugins.credentials.tss:tss_plugin" aws_secretsmanager_credential = "awx_plugins.credentials.aws_secretsmanager:aws_secretmanager_plugin" +akeyless = "awx_plugins.credentials.akeyless:akeyless_plugin" +akeyless_ssh = "awx_plugins.credentials.akeyless:akeyless_ssh_plugin" github_app_lookup = "awx_plugins.credentials.github_app:github_app_lookup" [project.entry-points."awx_plugins.managed_credentials"] # new entry points group name @@ -189,6 +191,14 @@ credentials-aws-secretsmanager-credential = [ "awx_plugins.interfaces", "boto3", ] +credentials-akeyless = [ + "awx_plugins.interfaces", + "akeyless >= 5.0.8", +] +credentials-akeyless-ssh = [ + "awx_plugins.interfaces", + "akeyless >= 5.0.8", +] inventory-azure-rm = [ "awx_plugins.interfaces", "PyYAML", diff --git a/pytest.ini b/pytest.ini index ed3eceeead..4bd799b508 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,8 @@ [pytest] +# Add src/ to sys.path so tests can import the package even in environments +# where `pip install -e .` has not been run (e.g. the pre-commit pylint venv): +pythonpath = src + addopts = # `pytest-xdist`: --numprocesses=auto diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py new file mode 100644 index 0000000000..8fc06e9a5a --- /dev/null +++ b/src/awx_plugins/credentials/akeyless.py @@ -0,0 +1,464 @@ +"""Akeyless credential plugins for AWX.""" +# WPS202 is expected: implementing two credential plugins requires Protocol +# types, TypedDicts, and multiple helper functions per plugin, which exceeds +# the default module-member threshold. + +import json as _json +import typing as _t +from collections.abc import Mapping as _Mapping + +from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 + gettext_noop as _, +) + +# The akeyless SDK does not ship type stubs; pylint cannot resolve its imports. +# When not installed, pylint misidentifies `from akeyless import` as a +# self-import because this file is named akeyless.py (import-self). +# pylint: disable=import-error,import-self,no-name-in-module +from akeyless import ( + ApiClient as _ApiClient, + Auth as _Auth, + Configuration as _Configuration, + DescribeItem as _DescribeItem, + GetSecretValue as _GetSecretValue, + V2Api as _V2Api, +) +from akeyless.models.get_ssh_certificate import ( + GetSSHCertificate as _GetSSHCertificate, +) +from akeyless.rest import ApiException as _ApiException + +# pylint: enable=import-error,import-self,no-name-in-module +from . import plugin as _plugin + + +__all__ = ( # noqa: WPS410 + 'akeyless_backend', + 'akeyless_plugin', + 'akeyless_ssh_backend', + 'akeyless_ssh_plugin', +) + + +_SUPPORTED_ITEM_TYPES = frozenset(('STATIC_SECRET',)) + +_STRUCTURED_SECRET_FORMATS = frozenset(('json', 'key-value')) + +_PASSWORD_KEYS = frozenset(('username', 'password')) + + +class _AkeylessCommonKwargs(_t.TypedDict): + gateway_url: str + access_id: str + access_key: str + ca_cert: _t.NotRequired[str | None] + + +class _AkeylessBackendKwargs(_AkeylessCommonKwargs): + secret_path: str + secret_key: _t.NotRequired[str | None] + + +class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): + cert_issue_name: str + cert_username: str + public_key_data: str + ttl: _t.NotRequired[int | str | None] + + +class _AuthResponse(_t.Protocol): + token: str | None + + +class _StaticSecretInfo(_t.Protocol): + format: str + + +class _ItemGeneralInfo(_t.Protocol): + static_secret_info: _StaticSecretInfo + + +class _DescribeItemResponse(_t.Protocol): + item_type: str + item_sub_type: str + item_general_info: _ItemGeneralInfo + + +class _SshCertResponse(_t.Protocol): + data: str | None # noqa: WPS110 # must match the akeyless SDK response attribute name + + +class _AkeylessApi(_t.Protocol): + def auth(self: _t.Self, auth: _Auth) -> _AuthResponse: ... + + def describe_item( + self: _t.Self, + req: _DescribeItem, + ) -> _DescribeItemResponse: ... + + def get_secret_value( + self: _t.Self, + req: _GetSecretValue, + ) -> _Mapping[str, str]: ... + + def get_ssh_certificate( + self: _t.Self, + req: _GetSSHCertificate, + ) -> _SshCertResponse: ... + + +_common_plugin_inputs = [ + { + 'id': 'gateway_url', + 'label': _('Gateway URL'), + 'type': 'string', + 'help_text': _( + 'The URL of your Akeyless Gateway (e.g., https://api.akeyless.io, ' + 'https://my.akeyless.gw/api/v2)', + ), + 'default': 'https://api.akeyless.io', + }, + { + 'id': 'access_id', + 'label': _('Access ID'), + 'type': 'string', + 'help_text': _('Your Akeyless API Access ID'), + }, + { + 'id': 'access_key', + 'label': _('Access Key'), + 'type': 'string', + 'help_text': _('Your Akeyless API Access Key'), + 'secret': True, + }, + { + 'id': 'ca_cert', + 'label': _('CA Certificate'), + 'type': 'string', + 'multiline': True, + 'help_text': _( + 'CA certificate (PEM format) used to verify the gateway TLS ' + 'certificate.', + ), + }, +] + + +_akeyless_inputs = { + 'fields': _common_plugin_inputs, + 'metadata': [ + { + 'id': 'secret_path', + 'label': _('Secret Path'), + 'type': 'string', + 'help_text': _( + 'The path to the secret in Akeyless (e.g., ' + '/myapp/database/password).', + ), + }, + { + 'id': 'secret_key', + 'label': _('Secret Key'), + 'type': 'string', + 'help_text': _( + 'Optional key within the secret to retrieve (for JSON or ' + 'key-value secrets).', + ), + }, + ], + 'required': [ + 'gateway_url', + 'access_id', + 'access_key', + 'secret_path', + ], +} + + +_akeyless_ssh_inputs = { + 'fields': _common_plugin_inputs, + 'metadata': [ + { + 'id': 'cert_issue_name', + 'label': _('Certificate Issuer Name'), + 'type': 'string', + 'help_text': _( + 'The full path to the certificate issuer in Akeyless (e.g., ' + '/remote/ssh/certificate/issuer).', + ), + }, + { + 'id': 'cert_username', + 'label': _('Certificate Username'), + 'type': 'string', + 'help_text': _( + 'The username(s) to sign into the SSH certificate in a ' + 'comma-separated list, e.g., "ubuntu,nobody,nonroot".', + ), + }, + { + 'id': 'public_key_data', + 'label': _('Public Key Data'), + 'type': 'string', + 'help_text': _( + 'The public key data to sign (e.g. "ssh-rsa AAAAB3NzaC1yc2E...").', + ), + }, + { + 'id': 'ttl', + 'label': _('TTL'), + 'type': 'number', + 'help_text': _( + 'Time to live in seconds for the SSH certificate. If not ' + 'defined, the default issuer TTL is used.', + ), + }, + ], + 'required': [ + 'gateway_url', + 'access_id', + 'access_key', + 'cert_issue_name', + 'cert_username', + 'public_key_data', + ], +} + + +def _setup_client(gateway_url: str, ca_cert_path: str | None) -> _AkeylessApi: + client_configuration = _Configuration(host=gateway_url) + if ca_cert_path: + client_configuration.ssl_ca_cert = ca_cert_path + client_configuration.verify_ssl = True + api_client = _ApiClient(client_configuration) + api_client.user_agent = 'AWX' + api_client.default_headers['akeylessclienttype'] = 'AWX' + return _V2Api(api_client) + + +def _authenticate( + api_instance: _AkeylessApi, + access_id: str, + access_key: str, +) -> str: + auth_response = api_instance.auth( + _Auth( + access_id=access_id, + access_key=access_key, + ), + ) + if not auth_response.token: + raise ValueError( + 'Failed to authenticate with Akeyless: no token received.', + ) + return auth_response.token + + +def _extract_password_secret(secret_data: str, secret_key: str | None) -> str: + if not secret_key: + return secret_data + if secret_key not in _PASSWORD_KEYS: + raise NotImplementedError( + 'Password secrets only support "username" or "password" keys.', + ) + secret_dict = _t.cast('dict[str, str]', _json.loads(secret_data)) + return secret_dict[secret_key] + + +def _extract_text_secret( + secret_data: str, + secret_key: str | None, + static_secret_sub_type: str, +) -> str: + if static_secret_sub_type == 'password': + return _extract_password_secret(secret_data, secret_key) + if static_secret_sub_type == 'generic': + return secret_data + raise NotImplementedError( + 'Static secret sub type must be "password" or "generic".', + ) + + +def _extract_structured_secret( + secret_data: str, + secret_key: str | None, + secret_path: str, +) -> str: + if not secret_key: + return str(secret_data) + secret_dict = _t.cast('dict[str, str]', _json.loads(secret_data)) + try: + return secret_dict[secret_key] + except KeyError as exc: + raise KeyError( + f'Key "{secret_key}" not found in secret at path: {secret_path}', + ) from exc + + +def _extract_secret_value( + secret_response: _Mapping[str, str], + secret_path: str, + secret_key: str | None, + static_secret_format: str, + static_secret_sub_type: str, +) -> str: + secret_data = secret_response[secret_path] + if static_secret_format == 'text': + return _extract_text_secret( + secret_data, + secret_key, + static_secret_sub_type, + ) + if static_secret_format in _STRUCTURED_SECRET_FORMATS: + return _extract_structured_secret( + secret_data, + secret_key, + secret_path, + ) + raise NotImplementedError( + 'Static secret format must be "text", "json", or "key-value".', + ) + + +def _ensure_supported_item_type(secret_path: str, item_type: str) -> None: + if item_type not in _SUPPORTED_ITEM_TYPES: + raise NotImplementedError( + f'Secret "{secret_path}" is of type "{item_type}". ' + f'Supported types: {sorted(_SUPPORTED_ITEM_TYPES)}.', + ) + + +def _fetch_secret_value( + api_instance: _AkeylessApi, + token: str, + secret_path: str, + secret_key: str | None, +) -> str: + describe_item_request = _DescribeItem(name=secret_path, token=token) + describe_item_response = api_instance.describe_item(describe_item_request) + _ensure_supported_item_type(secret_path, describe_item_response.item_type) + + static_secret_format = ( + describe_item_response.item_general_info.static_secret_info.format + ) + static_secret_sub_type = describe_item_response.item_sub_type + + secret_response = api_instance.get_secret_value( + _GetSecretValue( + names=[secret_path], + token=token, + ), + ) + + return _extract_secret_value( + secret_response, + secret_path, + secret_key, + static_secret_format, + static_secret_sub_type, + ) + + +def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: + """Retrieve a secret value from Akeyless.""" + with _plugin.CertFiles(kwargs.get('ca_cert')) as ca_cert_path: + api_instance = _setup_client( + kwargs['gateway_url'].rstrip('/'), + ca_cert_path, + ) + try: + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err + try: + return _fetch_secret_value( + api_instance, + token, + kwargs['secret_path'], + kwargs.get('secret_key'), + ) + except _ApiException as api_exc: + raise RuntimeError( + f'Akeyless API error: {api_exc.reason}' + f' (Status: {api_exc.status})', + ) from api_exc + + +def _coerce_ttl(ttl_value: int | str | None) -> int | None: + if ttl_value is None or ttl_value == '': + return None + try: + return int(ttl_value) + except (TypeError, ValueError) as exc: + raise ValueError('TTL must be an integer number of seconds.') from exc + + +def _fetch_ssh_certificate( + api_instance: _AkeylessApi, + token: str, + ssh_inputs: _AkeylessSshBackendKwargs, +) -> str: + response = api_instance.get_ssh_certificate( + _GetSSHCertificate( + token=token, + cert_issuer_name=ssh_inputs['cert_issue_name'], + cert_username=ssh_inputs['cert_username'], + ttl=_coerce_ttl(ssh_inputs.get('ttl')), + public_key_data=ssh_inputs['public_key_data'], + ), + ) + if not response.data: + raise ValueError( + 'Failed to generate signed SSH certificate: no data returned.', + ) + return response.data + + +def akeyless_ssh_backend( + **kwargs: _t.Unpack[_AkeylessSshBackendKwargs], +) -> str: + """Generate a signed SSH certificate using Akeyless.""" + with _plugin.CertFiles(kwargs.get('ca_cert')) as ca_cert_path: + api_instance = _setup_client( + kwargs['gateway_url'].rstrip('/'), + ca_cert_path, + ) + try: + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err + try: + return _fetch_ssh_certificate( + api_instance, + token, + kwargs, + ) + except _ApiException as api_exc: + raise RuntimeError( + f'Akeyless API error: {api_exc.reason}' + f' (Status: {api_exc.status})', + ) from api_exc + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err + + +akeyless_plugin = _plugin.CredentialPlugin( + 'Akeyless', + inputs=_akeyless_inputs, # type: ignore[arg-type] + backend=akeyless_backend, +) + + +akeyless_ssh_plugin = _plugin.CredentialPlugin( + 'Akeyless SSH', + inputs=_akeyless_ssh_inputs, # type: ignore[arg-type] + backend=akeyless_ssh_backend, +) diff --git a/tests/akeyless_test.py b/tests/akeyless_test.py new file mode 100644 index 0000000000..3b32701c40 --- /dev/null +++ b/tests/akeyless_test.py @@ -0,0 +1,304 @@ +"""Tests for Akeyless credential plugins.""" + +from collections.abc import Callable + +import pytest +from pytest_mock import MockerFixture + +from akeyless.rest import ApiException # pylint: disable=import-error + +from awx_plugins.credentials import akeyless as akeyless_mod + + +_SECRET_PATH = '/test/secret' +_ACCESS_ID = 'p-test123' +_ACCESS_KEY = 'test-key' +_GATEWAY_URL = 'https://api.akeyless.io' + +_HTTP_FORBIDDEN = 403 +_HTTP_NOT_FOUND = 404 +_ONE_HOUR_SEC = 3600 +_TWO_HOURS_SEC = 7200 + +_MockApiFactory = Callable[..., object] + + +def _make_mock_api( # noqa: WPS211 # pylint: disable=too-many-arguments + mocker: MockerFixture, + *, + token: str | None = 'test-token', + item_type: str = 'STATIC_SECRET', + item_sub_type: str = 'generic', + secret_format: str = 'text', + secret_path: str = _SECRET_PATH, + secret_data: str = 'my-secret-value', + ssh_cert_data: str | None = 'ssh-rsa-SIGNED-CERT', +) -> object: + """Create a mock Akeyless API instance with configurable responses.""" + mock_static_info = mocker.MagicMock() + mock_static_info.format = secret_format + + mock_general_info = mocker.MagicMock() + mock_general_info.static_secret_info = mock_static_info + + mock_describe = mocker.MagicMock() + mock_describe.item_type = item_type + mock_describe.item_sub_type = item_sub_type + mock_describe.item_general_info = mock_general_info + + mock_api = mocker.MagicMock() + mock_api.auth.return_value.token = token + mock_api.describe_item.return_value = mock_describe + mock_api.get_secret_value.return_value = {secret_path: secret_data} + mock_api.get_ssh_certificate.return_value.data = ssh_cert_data + return mock_api + + +@pytest.fixture +def patch_setup_client( + monkeypatch: pytest.MonkeyPatch, + mocker: MockerFixture, +) -> _MockApiFactory: + """Patch _setup_client, returning a factory that yields the configured mock.""" + + def _factory(**kwargs: object) -> object: + mock = _make_mock_api(mocker, **kwargs) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock) + return mock + + return _factory + + +def _backend_kwargs( + *, + secret_path: str = _SECRET_PATH, + secret_key: str | None = None, + ca_cert: str | None = None, +) -> dict[str, object]: + """Build a minimal set of kwargs for akeyless_backend.""" + kwargs: dict[str, object] = { + 'gateway_url': _GATEWAY_URL, + 'access_id': _ACCESS_ID, + 'access_key': _ACCESS_KEY, + 'secret_path': secret_path, + } + if secret_key is not None: + kwargs['secret_key'] = secret_key + if ca_cert is not None: + kwargs['ca_cert'] = ca_cert + return kwargs + + +def _ssh_kwargs( + *, + ttl: int | str | None = None, + ca_cert: str | None = None, +) -> dict[str, object]: + """Build a minimal set of kwargs for akeyless_ssh_backend.""" + kwargs: dict[str, object] = { + 'gateway_url': _GATEWAY_URL, + 'access_id': _ACCESS_ID, + 'access_key': _ACCESS_KEY, + 'cert_issue_name': '/ssh/issuers/my-issuer', + 'cert_username': 'ubuntu', + 'public_key_data': 'ssh-rsa AAAAB3NzaC1yc2E...', + } + if ttl is not None: + kwargs['ttl'] = ttl + if ca_cert is not None: + kwargs['ca_cert'] = ca_cert + return kwargs + + +# --------------------------------------------------------------------------- +# akeyless_backend - secret store plugin +# --------------------------------------------------------------------------- + + +def test_akeyless_backend_text_generic_secret( + patch_setup_client: _MockApiFactory, +) -> None: + """Retrieve a plain-text generic secret successfully.""" + patch_setup_client(item_sub_type='generic', secret_format='text') + + fetched_secret = akeyless_mod.akeyless_backend( + **_backend_kwargs(), + ) + + assert fetched_secret == 'my-secret-value' + + +def test_akeyless_backend_json_secret_with_key( + patch_setup_client: _MockApiFactory, +) -> None: + """Retrieve a specific key from a JSON-format structured secret.""" + patch_setup_client( + secret_format='json', + secret_data='{"db_password": "s3cr3t", "db_user": "admin"}', + ) + + fetched_secret = akeyless_mod.akeyless_backend( + **_backend_kwargs(secret_key='db_password'), + ) + + assert fetched_secret == 's3cr3t' + + +def test_backend_password_secret_username_key( + patch_setup_client: _MockApiFactory, +) -> None: + """Retrieve the 'username' field from a text/password sub-type secret.""" + payload = '{"username": "myuser", "password": "mypass"}' + patch_setup_client( + item_sub_type='password', + secret_format='text', + secret_data=payload, + ) + + fetched_secret = akeyless_mod.akeyless_backend( + **_backend_kwargs(secret_key='username'), + ) + + assert fetched_secret == 'myuser' + + +def test_backend_unsupported_type_raises( + patch_setup_client: _MockApiFactory, +) -> None: + """Unsupported secret types (e.g. dynamic) should raise NotImplementedError.""" + patch_setup_client(item_type='DYNAMIC_SECRET') + + with pytest.raises(NotImplementedError, match='DYNAMIC_SECRET'): + akeyless_mod.akeyless_backend(**_backend_kwargs()) + + +def test_backend_api_exc_wraps_runtime( + patch_setup_client: _MockApiFactory, +) -> None: + """An ApiException from get_secret_value surfaces as RuntimeError.""" + mock_api = patch_setup_client() + mock_api.get_secret_value.side_effect = ApiException( + status=_HTTP_FORBIDDEN, + reason='Forbidden', + ) + + with pytest.raises( + RuntimeError, + match=r'Akeyless API error: Forbidden \(Status: 403\)', + ): + akeyless_mod.akeyless_backend(**_backend_kwargs()) + + +def test_backend_auth_failure_raises( + patch_setup_client: _MockApiFactory, +) -> None: + """A missing auth token should surface as RuntimeError to the plugin caller.""" + patch_setup_client(token=None) + + with pytest.raises(RuntimeError, match='no token received'): + akeyless_mod.akeyless_backend(**_backend_kwargs()) + + +def test_backend_missing_json_key_raises( + patch_setup_client: _MockApiFactory, +) -> None: + """A missing key in a structured secret should raise KeyError with the path.""" + patch_setup_client( + secret_format='json', + secret_data='{"other_key": "value"}', + ) + + with pytest.raises(KeyError, match=_SECRET_PATH): + akeyless_mod.akeyless_backend( + **_backend_kwargs(secret_key='missing_key'), + ) + + +# --------------------------------------------------------------------------- +# akeyless_ssh_backend - SSH certificate plugin +# --------------------------------------------------------------------------- + + +def test_akeyless_ssh_backend_success( + patch_setup_client: _MockApiFactory, +) -> None: + """Successfully generate a signed SSH certificate.""" + patch_setup_client(ssh_cert_data='ssh-rsa-cert-AAAAAA') + + signed_cert = akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) + + assert signed_cert == 'ssh-rsa-cert-AAAAAA' + + +def test_akeyless_ssh_backend_with_ttl( + patch_setup_client: _MockApiFactory, +) -> None: + """SSH certificate request with an explicit TTL should succeed.""" + mock_api = patch_setup_client(ssh_cert_data='signed-cert') + + signed_cert = akeyless_mod.akeyless_ssh_backend( + **_ssh_kwargs(ttl=_ONE_HOUR_SEC), + ) + + assert signed_cert == 'signed-cert' + ssh_request = mock_api.get_ssh_certificate.call_args.args[0] + assert ssh_request.ttl == _ONE_HOUR_SEC + + +def test_ssh_backend_no_cert_data_raises( + patch_setup_client: _MockApiFactory, +) -> None: + """An empty/None cert response should surface as RuntimeError.""" + patch_setup_client(ssh_cert_data=None) + + with pytest.raises(RuntimeError, match='no data returned'): + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) + + +def test_ssh_api_exc_wraps_runtime( + patch_setup_client: _MockApiFactory, +) -> None: + """An ApiException from get_ssh_certificate surfaces as RuntimeError.""" + mock_api = patch_setup_client() + mock_api.get_ssh_certificate.side_effect = ApiException( + status=_HTTP_NOT_FOUND, + reason='Issuer not found', + ) + + with pytest.raises( + RuntimeError, + match=r'Akeyless API error: Issuer not found \(Status: 404\)', + ): + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) + + +# --------------------------------------------------------------------------- +# _coerce_ttl helper +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ('ttl_input', 'expected'), + ( + pytest.param(None, None, id='none'), + pytest.param('', None, id='empty-string'), + pytest.param(_ONE_HOUR_SEC, _ONE_HOUR_SEC, id='int'), + pytest.param('7200', _TWO_HOURS_SEC, id='string-int'), + ), +) +def test_coerce_ttl_valid_inputs( + ttl_input: int | str | None, + expected: int | None, +) -> None: + """_coerce_ttl should convert numeric-like values and pass None/'' through.""" + # WPS437: _coerce_ttl is private but is tested directly here intentionally + # pylint: disable-next=protected-access + assert akeyless_mod._coerce_ttl(ttl_input) == expected # noqa: WPS437 + + +def test_coerce_ttl_invalid_raises() -> None: + """_coerce_ttl should raise ValueError for non-numeric strings.""" + with pytest.raises(ValueError, match='integer number of seconds'): + # WPS437: _coerce_ttl is private but is tested directly here intentionally + # pylint: disable-next=protected-access + akeyless_mod._coerce_ttl('not-a-number') # noqa: WPS437 diff --git a/tests/importable_test.py b/tests/importable_test.py index 535cbebcfd..f5ac6cfb84 100644 --- a/tests/importable_test.py +++ b/tests/importable_test.py @@ -80,6 +80,16 @@ def __str__(self) -> str: 'aws_secretsmanager_credential', 'awx_plugins.credentials.aws_secretsmanager:aws_secretmanager_plugin', ), + EntryPointParam( + 'awx_plugins.credentials', + 'akeyless', + 'awx_plugins.credentials.akeyless:akeyless_plugin', + ), + EntryPointParam( + 'awx_plugins.credentials', + 'akeyless_ssh', + 'awx_plugins.credentials.akeyless:akeyless_ssh_plugin', + ), ) diff --git a/tox.ini b/tox.ini index a08cf80b7b..21a6c66122 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,8 @@ extras = credentials-thycotic-dsv credentials-thycotic-tss credentials-aws-secretsmanager-credential + credentials-akeyless + credentials-akeyless-ssh inventory-azure-rm inventory-ec2 inventory-gce