From e10452bfa5e4cd82c50ec70153d59cc3ae0f774e Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Tue, 20 Jan 2026 14:50:05 -0500 Subject: [PATCH 1/9] init --- pyproject.toml | 10 + src/awx_plugins/credentials/akeyless.py | 308 ++++++++++++++++++++++++ tests/importable_test.py | 10 + tox.ini | 2 + 4 files changed, 330 insertions(+) create mode 100644 src/awx_plugins/credentials/akeyless.py diff --git a/pyproject.toml b/pyproject.toml index 03a30ed26b..d06bcbbe74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,8 @@ centrify_vault_kv = "awx_plugins.credentials.centrify_vault:centrify_plugin" thycotic_dsv = "awx_plugins.credentials.dsv:dsv_plugin" thycotic_tss = "awx_plugins.credentials.tss:tss_plugin" aws_secretsmanager_credential = "awx_plugins.credentials.aws_secretsmanager:aws_secretmanager_plugin" +akeyless = "awx_plugins.credentials.akeyless:akeyless_plugin" +akeyless_ssh = "awx_plugins.credentials.akeyless:akeyless_ssh_plugin" github_app_lookup = "awx_plugins.credentials.github_app:github_app_lookup" [project.entry-points."awx_plugins.managed_credentials"] # new entry points group name @@ -179,6 +181,14 @@ credentials-aws-secretsmanager-credential = [ "awx_plugins.interfaces", "boto3", ] +credentials-akeyless = [ + "awx_plugins.interfaces", + "akeyless >= 5.0.8", +] +credentials-akeyless-ssh = [ + "awx_plugins.interfaces", + "akeyless >= 5.0.8", +] inventory-azure-rm = [ "awx_plugins.interfaces", "PyYAML", diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py new file mode 100644 index 0000000000..38a361d24e --- /dev/null +++ b/src/awx_plugins/credentials/akeyless.py @@ -0,0 +1,308 @@ +# FIXME: the following violations must be addressed gradually and unignored +# mypy: disable-error-code="import-not-found, import-untyped, no-untyped-def" + +import json + +from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 + gettext_noop as _, +) + +from akeyless import Auth, ApiClient, Configuration, DescribeItem, GetSecretValue, V2Api +from akeyless.models.get_ssh_certificate import GetSSHCertificate +from akeyless.rest import ApiException + +from .plugin import CertFiles, CredentialPlugin + + +SUPPORTED_ITEM_TYPES = {'STATIC_SECRET'} + + +common_plugin_inputs = [ + { + 'id': 'gateway_url', + 'label': _('Gateway URL'), + 'type': 'string', + 'help_text': _( + 'The URL of your Akeyless Gateway (e.g., https://api.akeyless.io, ' + 'https://my.akeyless.gw/api/v2)', + ), + 'default': 'https://api.akeyless.io', + }, + { + 'id': 'access_id', + 'label': _('Access ID'), + 'type': 'string', + 'help_text': _('Your Akeyless API Access ID'), + }, + { + 'id': 'access_key', + 'label': _('Access Key'), + 'type': 'string', + 'help_text': _('Your Akeyless API Access Key'), + 'secret': True, + }, + { + 'id': 'ca_cert', + 'label': _('CA Certificate'), + 'type': 'string', + 'multiline': True, + 'help_text': _( + 'CA certificate (PEM format) used to verify the gateway TLS ' + 'certificate.', + ), + }, +] + + +akeyless_inputs = { + 'fields': common_plugin_inputs, + 'metadata': [ + { + 'id': 'secret_path', + 'label': _('Secret Path'), + 'type': 'string', + 'help_text': _( + 'The path to the secret in Akeyless (e.g., ' + '/myapp/database/password).', + ), + }, + { + 'id': 'secret_key', + 'label': _('Secret Key'), + 'type': 'string', + 'help_text': _( + 'Optional key within the secret to retrieve (for JSON or ' + 'key-value secrets).', + ), + }, + ], + 'required': [ + 'gateway_url', + 'access_id', + 'access_key', + 'secret_path', + ], +} + + +akeyless_ssh_inputs = { + 'fields': common_plugin_inputs, + 'metadata': [ + { + 'id': 'cert_issue_name', + 'label': _('Certificate Issuer Name'), + 'type': 'string', + 'help_text': _( + 'The full path to the certificate issuer in Akeyless (e.g., ' + '/remote/ssh/certificate/issuer).', + ), + }, + { + 'id': 'cert_username', + 'label': _('Certificate Username'), + 'type': 'string', + 'help_text': _( + 'The username(s) to sign into the SSH certificate in a ' + 'comma-separated list, e.g., "ubuntu,nobody,nonroot".', + ), + }, + { + 'id': 'public_key_data', + 'label': _('Public Key Data'), + 'type': 'string', + 'help_text': _( + 'The public key data to sign (e.g. "ssh-rsa AAAAB3NzaC1yc2E...").', + ), + }, + { + 'id': 'ttl', + 'label': _('TTL'), + 'type': 'number', + 'help_text': _( + 'Time to live in seconds for the SSH certificate. If not ' + 'defined, the default issuer TTL is used.', + ), + }, + ], + 'required': [ + 'gateway_url', + 'access_id', + 'access_key', + 'cert_issue_name', + 'cert_username', + 'public_key_data', + ], +} + + +def _setup_client(gateway_url, ca_cert_path): + client_configuration = Configuration(host=gateway_url) + if ca_cert_path: + client_configuration.ssl_ca_cert = ca_cert_path + client_configuration.verify_ssl = True + api_client = ApiClient(client_configuration) + api_client.user_agent = 'AWX' + api_client.default_headers['akeylessclienttype'] = 'AWX' + return V2Api(api_client) + + +def _authenticate(api_instance, access_id, access_key): + auth_response = api_instance.auth( + Auth( + access_id=access_id, + access_key=access_key, + ), + ) + if not auth_response.token: + raise RuntimeError( + 'Failed to authenticate with Akeyless: no token received.', + ) + return auth_response.token + + +def _extract_secret_value( + secret_response, + secret_path, + secret_key, + static_secret_format, + static_secret_sub_type, +): + secret_data = secret_response[secret_path] + if static_secret_format == 'text': + if static_secret_sub_type == 'password': + if secret_key: + if secret_key not in ('username', 'password'): + raise NotImplementedError( + 'Password secrets only support "username" or "password" ' + 'keys.', + ) + secret_dict = json.loads(secret_data) + return secret_dict[secret_key] + return secret_data + if static_secret_sub_type == 'generic': + return secret_data + raise NotImplementedError( + 'Static secret sub type must be "password" or "generic".', + ) + + if static_secret_format in ('json', 'key-value'): + if secret_key: + secret_dict = json.loads(secret_data) + if secret_key not in secret_dict: + raise KeyError( + f'Key "{secret_key}" not found in secret at path: ' + f'{secret_path}', + ) + return secret_dict[secret_key] + return str(secret_data) + + raise NotImplementedError( + 'Static secret format must be "text", "json", or "key-value".', + ) + + +def akeyless_backend(**kwargs): + gateway_url = kwargs['gateway_url'].rstrip('/') + access_id = kwargs['access_id'] + access_key = kwargs['access_key'] + ca_cert = kwargs.get('ca_cert') or None + secret_path = kwargs['secret_path'] + secret_key = kwargs.get('secret_key') + + with CertFiles(ca_cert) as ca_cert_path: + try: + api_instance = _setup_client(gateway_url, ca_cert_path) + token = _authenticate(api_instance, access_id, access_key) + + describe_item_request = DescribeItem(name=secret_path, token=token) + describe_item_response = api_instance.describe_item( + describe_item_request, + ) + item_type = describe_item_response.item_type + if item_type not in SUPPORTED_ITEM_TYPES: + raise NotImplementedError( + f'Secret "{secret_path}" is of type "{item_type}". ' + f'Supported types: {sorted(SUPPORTED_ITEM_TYPES)}.', + ) + + static_secret_format = ( + describe_item_response.item_general_info.static_secret_info.format + ) + static_secret_sub_type = describe_item_response.item_sub_type + + secret_response = api_instance.get_secret_value( + GetSecretValue( + names=[secret_path], + token=token, + ), + ) + + return _extract_secret_value( + secret_response, + secret_path, + secret_key, + static_secret_format, + static_secret_sub_type, + ) + except ApiException as exc: + raise RuntimeError( + f'Akeyless API error: {exc.reason} (Status: {exc.status})', + ) from exc + + +def _coerce_ttl(ttl_value): + if ttl_value in (None, ''): + return None + try: + return int(ttl_value) + except (TypeError, ValueError) as exc: + raise ValueError('TTL must be an integer number of seconds.') from exc + + +def akeyless_ssh_backend(**kwargs): + gateway_url = kwargs['gateway_url'].rstrip('/') + access_id = kwargs['access_id'] + access_key = kwargs['access_key'] + ca_cert = kwargs.get('ca_cert') or None + cert_issue_name = kwargs['cert_issue_name'] + cert_username = kwargs['cert_username'] + public_key_data = kwargs['public_key_data'] + ttl = _coerce_ttl(kwargs.get('ttl')) + + with CertFiles(ca_cert) as ca_cert_path: + try: + api_instance = _setup_client(gateway_url, ca_cert_path) + token = _authenticate(api_instance, access_id, access_key) + + response = api_instance.get_ssh_certificate( + GetSSHCertificate( + token=token, + cert_issuer_name=cert_issue_name, + cert_username=cert_username, + ttl=ttl, + public_key_data=public_key_data, + ), + ) + if not response.data: + raise RuntimeError( + 'Failed to generate signed SSH certificate: no data returned.', + ) + return response.data + except ApiException as exc: + raise RuntimeError( + f'Akeyless API error: {exc.reason} (Status: {exc.status})', + ) from exc + + +akeyless_plugin = CredentialPlugin( + 'Akeyless', + inputs=akeyless_inputs, + backend=akeyless_backend, +) + + +akeyless_ssh_plugin = CredentialPlugin( + 'Akeyless SSH', + inputs=akeyless_ssh_inputs, + backend=akeyless_ssh_backend, +) diff --git a/tests/importable_test.py b/tests/importable_test.py index aad46693ab..a5473ea4e2 100644 --- a/tests/importable_test.py +++ b/tests/importable_test.py @@ -70,6 +70,16 @@ def __str__(self) -> str: 'aws_secretsmanager_credential', 'awx_plugins.credentials.aws_secretsmanager:aws_secretmanager_plugin', ), + EntryPointParam( + 'awx_plugins.credentials', + 'akeyless', + 'awx_plugins.credentials.akeyless:akeyless_plugin', + ), + EntryPointParam( + 'awx_plugins.credentials', + 'akeyless_ssh', + 'awx_plugins.credentials.akeyless:akeyless_ssh_plugin', + ), ) diff --git a/tox.ini b/tox.ini index a08cf80b7b..21a6c66122 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,8 @@ extras = credentials-thycotic-dsv credentials-thycotic-tss credentials-aws-secretsmanager-credential + credentials-akeyless + credentials-akeyless-ssh inventory-azure-rm inventory-ec2 inventory-gce From e932273f523c34d847dad9fb81a332437748ba27 Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Tue, 20 Jan 2026 16:02:23 -0500 Subject: [PATCH 2/9] fix precommit --- .flake8 | 1 + .mypy.ini | 8 + src/awx_plugins/credentials/akeyless.py | 285 +++++++++++++++--------- 3 files changed, 193 insertions(+), 101 deletions(-) diff --git a/.flake8 b/.flake8 index 7a7c37ccf0..4b0a4166fe 100644 --- a/.flake8 +++ b/.flake8 @@ -118,6 +118,7 @@ per-file-ignores = tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 # The following ignores must be fixed and the entries removed from this config: + src/awx_plugins/credentials/akeyless.py: WPS202 src/awx_plugins/credentials/aim.py: ANN003, ANN201, B950, CCR001, D100, D103, LN001, Q003, WPS210, WPS221, WPS223, WPS231, WPS336, WPS432 src/awx_plugins/credentials/aws_secretsmanager.py: ANN003, ANN201, D100, D103, WPS111, WPS210, WPS329, WPS529 src/awx_plugins/credentials/azure_kv.py: ANN003, ANN201, D100, D103, WPS111, WPS361, WPS421 diff --git a/.mypy.ini b/.mypy.ini index 8520d2ae1a..226af05d9b 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -62,6 +62,14 @@ warn_unused_ignores = true # crashes with some decorators like `@functools.cache`: disallow_any_expr = false +[mypy-awx_plugins.credentials.akeyless] +# crashes with some decorators like `@functools.cache`: +disallow_any_expr = false +disallow_any_unimported = false +disallow_any_explicit = false +disallow_untyped_calls = false +warn_return_any = false + [mypy-awx_plugins.credentials.aws_secretsmanager] # crashes with some decorators like `@functools.cache`: disallow_any_expr = false diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 38a361d24e..37b4959f63 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -1,20 +1,53 @@ +"""Akeyless credential plugins for AWX.""" # noqa: WPS202 +# pylint: disable=import-error, import-self, no-name-in-module # FIXME: the following violations must be addressed gradually and unignored # mypy: disable-error-code="import-not-found, import-untyped, no-untyped-def" import json +from collections.abc import Mapping +from typing import NotRequired, TypedDict, Unpack, cast from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 gettext_noop as _, ) -from akeyless import Auth, ApiClient, Configuration, DescribeItem, GetSecretValue, V2Api +from akeyless import ( + ApiClient, + Auth, + Configuration, + DescribeItem, + GetSecretValue, + V2Api, +) from akeyless.models.get_ssh_certificate import GetSSHCertificate from akeyless.rest import ApiException from .plugin import CertFiles, CredentialPlugin -SUPPORTED_ITEM_TYPES = {'STATIC_SECRET'} +SUPPORTED_ITEM_TYPES = frozenset(('STATIC_SECRET',)) + +STRUCTURED_SECRET_FORMATS = frozenset(('json', 'key-value')) +PASSWORD_KEYS = frozenset(('username', 'password')) + + +class _AkeylessCommonKwargs(TypedDict): + gateway_url: str + access_id: str + access_key: str + ca_cert: NotRequired[str | None] + + +class _AkeylessBackendKwargs(_AkeylessCommonKwargs): + secret_path: str + secret_key: NotRequired[str | None] + + +class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): + cert_issue_name: str + cert_username: str + public_key_data: str + ttl: NotRequired[int | str | None] common_plugin_inputs = [ @@ -135,7 +168,7 @@ } -def _setup_client(gateway_url, ca_cert_path): +def _setup_client(gateway_url: str, ca_cert_path: str | None) -> V2Api: client_configuration = Configuration(host=gateway_url) if ca_cert_path: client_configuration.ssl_ca_cert = ca_cert_path @@ -146,7 +179,7 @@ def _setup_client(gateway_url, ca_cert_path): return V2Api(api_client) -def _authenticate(api_instance, access_id, access_key): +def _authenticate(api_instance: V2Api, access_id: str, access_key: str) -> str: auth_response = api_instance.auth( Auth( access_id=access_id, @@ -160,89 +193,129 @@ def _authenticate(api_instance, access_id, access_key): return auth_response.token -def _extract_secret_value( - secret_response, - secret_path, - secret_key, - static_secret_format, - static_secret_sub_type, -): - secret_data = secret_response[secret_path] - if static_secret_format == 'text': - if static_secret_sub_type == 'password': - if secret_key: - if secret_key not in ('username', 'password'): - raise NotImplementedError( - 'Password secrets only support "username" or "password" ' - 'keys.', - ) - secret_dict = json.loads(secret_data) - return secret_dict[secret_key] - return secret_data - if static_secret_sub_type == 'generic': - return secret_data +def _extract_password_secret(secret_data: str, secret_key: str | None) -> str: + if not secret_key: + return secret_data + if secret_key not in PASSWORD_KEYS: raise NotImplementedError( - 'Static secret sub type must be "password" or "generic".', + 'Password secrets only support "username" or "password" keys.', ) + secret_dict = cast('dict[str, str]', json.loads(secret_data)) + return secret_dict[secret_key] + + +def _extract_text_secret( + secret_data: str, + secret_key: str | None, + static_secret_sub_type: str, +) -> str: + if static_secret_sub_type == 'password': + return _extract_password_secret(secret_data, secret_key) + if static_secret_sub_type == 'generic': + return secret_data + raise NotImplementedError( + 'Static secret sub type must be "password" or "generic".', + ) + - if static_secret_format in ('json', 'key-value'): - if secret_key: - secret_dict = json.loads(secret_data) - if secret_key not in secret_dict: - raise KeyError( - f'Key "{secret_key}" not found in secret at path: ' - f'{secret_path}', - ) - return secret_dict[secret_key] +def _extract_structured_secret( + secret_data: str, + secret_key: str | None, + secret_path: str, +) -> str: + if not secret_key: return str(secret_data) + secret_dict = cast('dict[str, str]', json.loads(secret_data)) + try: + return secret_dict[secret_key] + except KeyError as exc: + raise KeyError( + f'Key "{secret_key}" not found in secret at path: {secret_path}', + ) from exc + +def _extract_secret_value( + secret_response: Mapping[str, str], + secret_path: str, + secret_key: str | None, + static_secret_format: str, + static_secret_sub_type: str, +) -> str: + secret_data = secret_response[secret_path] + if static_secret_format == 'text': + return _extract_text_secret( + secret_data, + secret_key, + static_secret_sub_type, + ) + if static_secret_format in STRUCTURED_SECRET_FORMATS: + return _extract_structured_secret( + secret_data, + secret_key, + secret_path, + ) raise NotImplementedError( 'Static secret format must be "text", "json", or "key-value".', ) -def akeyless_backend(**kwargs): - gateway_url = kwargs['gateway_url'].rstrip('/') - access_id = kwargs['access_id'] - access_key = kwargs['access_key'] - ca_cert = kwargs.get('ca_cert') or None - secret_path = kwargs['secret_path'] - secret_key = kwargs.get('secret_key') +def _ensure_supported_item_type(secret_path: str, item_type: str) -> None: + if item_type not in SUPPORTED_ITEM_TYPES: + raise NotImplementedError( + f'Secret "{secret_path}" is of type "{item_type}". ' + f'Supported types: {sorted(SUPPORTED_ITEM_TYPES)}.', + ) + - with CertFiles(ca_cert) as ca_cert_path: - try: - api_instance = _setup_client(gateway_url, ca_cert_path) - token = _authenticate(api_instance, access_id, access_key) +def _fetch_secret_value( + api_instance: V2Api, + token: str, + secret_path: str, + secret_key: str | None, +) -> str: + describe_item_request = DescribeItem(name=secret_path, token=token) + describe_item_response = api_instance.describe_item(describe_item_request) + _ensure_supported_item_type(secret_path, describe_item_response.item_type) - describe_item_request = DescribeItem(name=secret_path, token=token) - describe_item_response = api_instance.describe_item( - describe_item_request, - ) - item_type = describe_item_response.item_type - if item_type not in SUPPORTED_ITEM_TYPES: - raise NotImplementedError( - f'Secret "{secret_path}" is of type "{item_type}". ' - f'Supported types: {sorted(SUPPORTED_ITEM_TYPES)}.', - ) - - static_secret_format = ( - describe_item_response.item_general_info.static_secret_info.format - ) - static_secret_sub_type = describe_item_response.item_sub_type + static_secret_format = ( + describe_item_response.item_general_info.static_secret_info.format + ) + static_secret_sub_type = describe_item_response.item_sub_type - secret_response = api_instance.get_secret_value( - GetSecretValue( - names=[secret_path], - token=token, - ), - ) + secret_response = api_instance.get_secret_value( + GetSecretValue( + names=[secret_path], + token=token, + ), + ) + + return _extract_secret_value( + secret_response, + secret_path, + secret_key, + static_secret_format, + static_secret_sub_type, + ) - return _extract_secret_value( - secret_response, - secret_path, - secret_key, - static_secret_format, - static_secret_sub_type, + +def akeyless_backend(**kwargs: Unpack[_AkeylessBackendKwargs]) -> str: + """Retrieve a secret value from Akeyless.""" + with CertFiles(kwargs.get('ca_cert') or None) as ca_cert_path: + api_instance = _setup_client( + kwargs['gateway_url'].rstrip('/'), + ca_cert_path, + ) + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) + try: + return _fetch_secret_value( + api_instance, + token, + kwargs['secret_path'], + kwargs.get('secret_key'), ) except ApiException as exc: raise RuntimeError( @@ -250,8 +323,8 @@ def akeyless_backend(**kwargs): ) from exc -def _coerce_ttl(ttl_value): - if ttl_value in (None, ''): +def _coerce_ttl(ttl_value: int | str | None) -> int | None: + if ttl_value is None or ttl_value == '': return None try: return int(ttl_value) @@ -259,35 +332,45 @@ def _coerce_ttl(ttl_value): raise ValueError('TTL must be an integer number of seconds.') from exc -def akeyless_ssh_backend(**kwargs): - gateway_url = kwargs['gateway_url'].rstrip('/') - access_id = kwargs['access_id'] - access_key = kwargs['access_key'] - ca_cert = kwargs.get('ca_cert') or None - cert_issue_name = kwargs['cert_issue_name'] - cert_username = kwargs['cert_username'] - public_key_data = kwargs['public_key_data'] - ttl = _coerce_ttl(kwargs.get('ttl')) +def _fetch_ssh_certificate( + api_instance: V2Api, + token: str, + ssh_inputs: _AkeylessSshBackendKwargs, +) -> str: + response = api_instance.get_ssh_certificate( + GetSSHCertificate( + token=token, + cert_issuer_name=ssh_inputs['cert_issue_name'], + cert_username=ssh_inputs['cert_username'], + ttl=_coerce_ttl(ssh_inputs.get('ttl')), + public_key_data=ssh_inputs['public_key_data'], + ), + ) + if not response.data: + raise RuntimeError( + 'Failed to generate signed SSH certificate: no data returned.', + ) + return response.data + - with CertFiles(ca_cert) as ca_cert_path: +def akeyless_ssh_backend(**kwargs: Unpack[_AkeylessSshBackendKwargs]) -> str: + """Generate a signed SSH certificate using Akeyless.""" + with CertFiles(kwargs.get('ca_cert') or None) as ca_cert_path: + api_instance = _setup_client( + kwargs['gateway_url'].rstrip('/'), + ca_cert_path, + ) + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) try: - api_instance = _setup_client(gateway_url, ca_cert_path) - token = _authenticate(api_instance, access_id, access_key) - - response = api_instance.get_ssh_certificate( - GetSSHCertificate( - token=token, - cert_issuer_name=cert_issue_name, - cert_username=cert_username, - ttl=ttl, - public_key_data=public_key_data, - ), + return _fetch_ssh_certificate( + api_instance, + token, + kwargs, ) - if not response.data: - raise RuntimeError( - 'Failed to generate signed SSH certificate: no data returned.', - ) - return response.data except ApiException as exc: raise RuntimeError( f'Akeyless API error: {exc.reason} (Status: {exc.status})', From 5124a2d1d95c2b4d660f94ebe142c430d262a8e9 Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Tue, 20 Jan 2026 16:07:00 -0500 Subject: [PATCH 3/9] fix build docs --- src/awx_plugins/credentials/akeyless.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 37b4959f63..91ba996d8e 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -5,7 +5,7 @@ import json from collections.abc import Mapping -from typing import NotRequired, TypedDict, Unpack, cast +from typing import Any, NotRequired, TypedDict, Unpack, cast from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 gettext_noop as _, @@ -168,7 +168,7 @@ class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): } -def _setup_client(gateway_url: str, ca_cert_path: str | None) -> V2Api: +def _setup_client(gateway_url: str, ca_cert_path: str | None) -> Any: client_configuration = Configuration(host=gateway_url) if ca_cert_path: client_configuration.ssl_ca_cert = ca_cert_path @@ -179,7 +179,7 @@ def _setup_client(gateway_url: str, ca_cert_path: str | None) -> V2Api: return V2Api(api_client) -def _authenticate(api_instance: V2Api, access_id: str, access_key: str) -> str: +def _authenticate(api_instance: Any, access_id: str, access_key: str) -> str: auth_response = api_instance.auth( Auth( access_id=access_id, @@ -268,7 +268,7 @@ def _ensure_supported_item_type(secret_path: str, item_type: str) -> None: def _fetch_secret_value( - api_instance: V2Api, + api_instance: Any, token: str, secret_path: str, secret_key: str | None, @@ -333,7 +333,7 @@ def _coerce_ttl(ttl_value: int | str | None) -> int | None: def _fetch_ssh_certificate( - api_instance: V2Api, + api_instance: Any, token: str, ssh_inputs: _AkeylessSshBackendKwargs, ) -> str: From 84f425d7d330aa9a95d631859a531b2fa9b73770 Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Tue, 20 Jan 2026 16:14:58 -0500 Subject: [PATCH 4/9] fix precommit --- .flake8 | 2 +- src/awx_plugins/credentials/akeyless.py | 49 ++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/.flake8 b/.flake8 index 4b0a4166fe..98f3febd4c 100644 --- a/.flake8 +++ b/.flake8 @@ -118,7 +118,7 @@ per-file-ignores = tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 # The following ignores must be fixed and the entries removed from this config: - src/awx_plugins/credentials/akeyless.py: WPS202 + src/awx_plugins/credentials/akeyless.py: ANN101, WPS110, WPS202 src/awx_plugins/credentials/aim.py: ANN003, ANN201, B950, CCR001, D100, D103, LN001, Q003, WPS210, WPS221, WPS223, WPS231, WPS336, WPS432 src/awx_plugins/credentials/aws_secretsmanager.py: ANN003, ANN201, D100, D103, WPS111, WPS210, WPS329, WPS529 src/awx_plugins/credentials/azure_kv.py: ANN003, ANN201, D100, D103, WPS111, WPS361, WPS421 diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 91ba996d8e..1cfbf0f589 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -5,7 +5,7 @@ import json from collections.abc import Mapping -from typing import Any, NotRequired, TypedDict, Unpack, cast +from typing import NotRequired, Protocol, TypedDict, Unpack, cast from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 gettext_noop as _, @@ -50,6 +50,41 @@ class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): ttl: NotRequired[int | str | None] +class _AuthResponse(Protocol): + token: str | None + + +class _StaticSecretInfo(Protocol): + format: str + + +class _ItemGeneralInfo(Protocol): + static_secret_info: _StaticSecretInfo + + +class _DescribeItemResponse(Protocol): + item_type: str + item_sub_type: str + item_general_info: _ItemGeneralInfo + + +class _SshCertResponse(Protocol): + data: str | None + + +class _AkeylessApi(Protocol): + def auth(self, auth: Auth) -> _AuthResponse: ... + + def describe_item(self, req: DescribeItem) -> _DescribeItemResponse: ... + + def get_secret_value(self, req: GetSecretValue) -> Mapping[str, str]: ... + + def get_ssh_certificate( + self, + req: GetSSHCertificate, + ) -> _SshCertResponse: ... + + common_plugin_inputs = [ { 'id': 'gateway_url', @@ -168,7 +203,7 @@ class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): } -def _setup_client(gateway_url: str, ca_cert_path: str | None) -> Any: +def _setup_client(gateway_url: str, ca_cert_path: str | None) -> _AkeylessApi: client_configuration = Configuration(host=gateway_url) if ca_cert_path: client_configuration.ssl_ca_cert = ca_cert_path @@ -179,7 +214,11 @@ def _setup_client(gateway_url: str, ca_cert_path: str | None) -> Any: return V2Api(api_client) -def _authenticate(api_instance: Any, access_id: str, access_key: str) -> str: +def _authenticate( + api_instance: _AkeylessApi, + access_id: str, + access_key: str, +) -> str: auth_response = api_instance.auth( Auth( access_id=access_id, @@ -268,7 +307,7 @@ def _ensure_supported_item_type(secret_path: str, item_type: str) -> None: def _fetch_secret_value( - api_instance: Any, + api_instance: _AkeylessApi, token: str, secret_path: str, secret_key: str | None, @@ -333,7 +372,7 @@ def _coerce_ttl(ttl_value: int | str | None) -> int | None: def _fetch_ssh_certificate( - api_instance: Any, + api_instance: _AkeylessApi, token: str, ssh_inputs: _AkeylessSshBackendKwargs, ) -> str: From be2e93e05889f7c870702ffb7b093532b6945be3 Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Tue, 20 Jan 2026 16:21:38 -0500 Subject: [PATCH 5/9] fix build docs --- src/awx_plugins/credentials/akeyless.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 1cfbf0f589..0a54931e0a 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -73,15 +73,15 @@ class _SshCertResponse(Protocol): class _AkeylessApi(Protocol): - def auth(self, auth: Auth) -> _AuthResponse: ... + def auth(self, auth: object) -> _AuthResponse: ... - def describe_item(self, req: DescribeItem) -> _DescribeItemResponse: ... + def describe_item(self, req: object) -> _DescribeItemResponse: ... - def get_secret_value(self, req: GetSecretValue) -> Mapping[str, str]: ... + def get_secret_value(self, req: object) -> Mapping[str, str]: ... def get_ssh_certificate( self, - req: GetSSHCertificate, + req: object, ) -> _SshCertResponse: ... From 0858c70158ca18ae96ee688c1909b0494211f67f Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Wed, 18 Mar 2026 10:26:44 -0400 Subject: [PATCH 6/9] pr review fixes --- .flake8 | 1 - .mypy.ini | 3 +- src/awx_plugins/credentials/akeyless.py | 119 +++++----- tests/akeyless_test.py | 296 ++++++++++++++++++++++++ 4 files changed, 363 insertions(+), 56 deletions(-) create mode 100644 tests/akeyless_test.py diff --git a/.flake8 b/.flake8 index e50b582c02..62a48b1697 100644 --- a/.flake8 +++ b/.flake8 @@ -118,7 +118,6 @@ per-file-ignores = tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 # The following ignores must be fixed and the entries removed from this config: - src/awx_plugins/credentials/akeyless.py: ANN101, WPS110, WPS202 src/awx_plugins/credentials/aim.py: ANN003, ANN201, B950, CCR001, D100, D103, LN001, Q003, WPS210, WPS221, WPS223, WPS231, WPS336, WPS432 src/awx_plugins/credentials/aws_secretsmanager.py: ANN003, ANN201, D100, D103, WPS111, WPS210, WPS329, WPS529 src/awx_plugins/credentials/azure_kv.py: ANN003, ANN201, D100, D103, WPS111, WPS361, WPS421 diff --git a/.mypy.ini b/.mypy.ini index 226af05d9b..03c1daf45d 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -63,7 +63,8 @@ warn_unused_ignores = true disallow_any_expr = false [mypy-awx_plugins.credentials.akeyless] -# crashes with some decorators like `@functools.cache`: +# The akeyless SDK does not ship type stubs. These suppressions are needed +# until the SDK gains proper typing support: disallow_any_expr = false disallow_any_unimported = false disallow_any_explicit = false diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 0a54931e0a..51f69b3299 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -1,16 +1,18 @@ """Akeyless credential plugins for AWX.""" # noqa: WPS202 -# pylint: disable=import-error, import-self, no-name-in-module -# FIXME: the following violations must be addressed gradually and unignored -# mypy: disable-error-code="import-not-found, import-untyped, no-untyped-def" +# WPS202 is expected: implementing two credential plugins requires Protocol +# types, TypedDicts, and multiple helper functions per plugin, which exceeds +# the default module-member threshold. -import json -from collections.abc import Mapping -from typing import NotRequired, Protocol, TypedDict, Unpack, cast +import collections.abc as _c +import json as _json +import typing as _t from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 gettext_noop as _, ) +# The akeyless SDK does not ship type stubs; pylint cannot resolve its imports. +# pylint: disable=import-error,no-name-in-module from akeyless import ( ApiClient, Auth, @@ -21,71 +23,76 @@ ) from akeyless.models.get_ssh_certificate import GetSSHCertificate from akeyless.rest import ApiException +# pylint: enable=import-error,no-name-in-module from .plugin import CertFiles, CredentialPlugin -SUPPORTED_ITEM_TYPES = frozenset(('STATIC_SECRET',)) +_SUPPORTED_ITEM_TYPES = frozenset(('STATIC_SECRET',)) -STRUCTURED_SECRET_FORMATS = frozenset(('json', 'key-value')) -PASSWORD_KEYS = frozenset(('username', 'password')) +_STRUCTURED_SECRET_FORMATS = frozenset(('json', 'key-value')) +_PASSWORD_KEYS = frozenset(('username', 'password')) -class _AkeylessCommonKwargs(TypedDict): + +class _AkeylessCommonKwargs(_t.TypedDict): gateway_url: str access_id: str access_key: str - ca_cert: NotRequired[str | None] + ca_cert: _t.NotRequired[str | None] class _AkeylessBackendKwargs(_AkeylessCommonKwargs): secret_path: str - secret_key: NotRequired[str | None] + secret_key: _t.NotRequired[str | None] class _AkeylessSshBackendKwargs(_AkeylessCommonKwargs): cert_issue_name: str cert_username: str public_key_data: str - ttl: NotRequired[int | str | None] + ttl: _t.NotRequired[int | str | None] -class _AuthResponse(Protocol): +class _AuthResponse(_t.Protocol): token: str | None -class _StaticSecretInfo(Protocol): +class _StaticSecretInfo(_t.Protocol): format: str -class _ItemGeneralInfo(Protocol): +class _ItemGeneralInfo(_t.Protocol): static_secret_info: _StaticSecretInfo -class _DescribeItemResponse(Protocol): +class _DescribeItemResponse(_t.Protocol): item_type: str item_sub_type: str item_general_info: _ItemGeneralInfo -class _SshCertResponse(Protocol): - data: str | None +class _SshCertResponse(_t.Protocol): + data: str | None # noqa: WPS110 # must match the akeyless SDK response attribute name -class _AkeylessApi(Protocol): - def auth(self, auth: object) -> _AuthResponse: ... +class _AkeylessApi(_t.Protocol): + def auth(self: _t.Self, auth: object) -> _AuthResponse: ... - def describe_item(self, req: object) -> _DescribeItemResponse: ... + def describe_item(self: _t.Self, req: object) -> _DescribeItemResponse: ... - def get_secret_value(self, req: object) -> Mapping[str, str]: ... + def get_secret_value( + self: _t.Self, + req: object, + ) -> _c.Mapping[str, str]: ... def get_ssh_certificate( - self, + self: _t.Self, req: object, ) -> _SshCertResponse: ... -common_plugin_inputs = [ +_common_plugin_inputs = [ { 'id': 'gateway_url', 'label': _('Gateway URL'), @@ -122,8 +129,8 @@ def get_ssh_certificate( ] -akeyless_inputs = { - 'fields': common_plugin_inputs, +_akeyless_inputs = { + 'fields': _common_plugin_inputs, 'metadata': [ { 'id': 'secret_path', @@ -153,8 +160,8 @@ def get_ssh_certificate( } -akeyless_ssh_inputs = { - 'fields': common_plugin_inputs, +_akeyless_ssh_inputs = { + 'fields': _common_plugin_inputs, 'metadata': [ { 'id': 'cert_issue_name', @@ -226,7 +233,7 @@ def _authenticate( ), ) if not auth_response.token: - raise RuntimeError( + raise ValueError( 'Failed to authenticate with Akeyless: no token received.', ) return auth_response.token @@ -235,11 +242,11 @@ def _authenticate( def _extract_password_secret(secret_data: str, secret_key: str | None) -> str: if not secret_key: return secret_data - if secret_key not in PASSWORD_KEYS: + if secret_key not in _PASSWORD_KEYS: raise NotImplementedError( 'Password secrets only support "username" or "password" keys.', ) - secret_dict = cast('dict[str, str]', json.loads(secret_data)) + secret_dict = _t.cast('dict[str, str]', _json.loads(secret_data)) return secret_dict[secret_key] @@ -264,7 +271,7 @@ def _extract_structured_secret( ) -> str: if not secret_key: return str(secret_data) - secret_dict = cast('dict[str, str]', json.loads(secret_data)) + secret_dict = _t.cast('dict[str, str]', _json.loads(secret_data)) try: return secret_dict[secret_key] except KeyError as exc: @@ -274,7 +281,7 @@ def _extract_structured_secret( def _extract_secret_value( - secret_response: Mapping[str, str], + secret_response: _c.Mapping[str, str], secret_path: str, secret_key: str | None, static_secret_format: str, @@ -287,7 +294,7 @@ def _extract_secret_value( secret_key, static_secret_sub_type, ) - if static_secret_format in STRUCTURED_SECRET_FORMATS: + if static_secret_format in _STRUCTURED_SECRET_FORMATS: return _extract_structured_secret( secret_data, secret_key, @@ -299,10 +306,10 @@ def _extract_secret_value( def _ensure_supported_item_type(secret_path: str, item_type: str) -> None: - if item_type not in SUPPORTED_ITEM_TYPES: + if item_type not in _SUPPORTED_ITEM_TYPES: raise NotImplementedError( f'Secret "{secret_path}" is of type "{item_type}". ' - f'Supported types: {sorted(SUPPORTED_ITEM_TYPES)}.', + f'Supported types: {sorted(_SUPPORTED_ITEM_TYPES)}.', ) @@ -337,19 +344,19 @@ def _fetch_secret_value( ) -def akeyless_backend(**kwargs: Unpack[_AkeylessBackendKwargs]) -> str: +def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: """Retrieve a secret value from Akeyless.""" - with CertFiles(kwargs.get('ca_cert') or None) as ca_cert_path: + with CertFiles(kwargs.get('ca_cert')) as ca_cert_path: api_instance = _setup_client( kwargs['gateway_url'].rstrip('/'), ca_cert_path, ) - token = _authenticate( - api_instance, - kwargs['access_id'], - kwargs['access_key'], - ) try: + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) return _fetch_secret_value( api_instance, token, @@ -360,6 +367,8 @@ def akeyless_backend(**kwargs: Unpack[_AkeylessBackendKwargs]) -> str: raise RuntimeError( f'Akeyless API error: {exc.reason} (Status: {exc.status})', ) from exc + except ValueError as exc: + raise RuntimeError(str(exc)) from exc def _coerce_ttl(ttl_value: int | str | None) -> int | None: @@ -386,25 +395,25 @@ def _fetch_ssh_certificate( ), ) if not response.data: - raise RuntimeError( + raise ValueError( 'Failed to generate signed SSH certificate: no data returned.', ) return response.data -def akeyless_ssh_backend(**kwargs: Unpack[_AkeylessSshBackendKwargs]) -> str: +def akeyless_ssh_backend(**kwargs: _t.Unpack[_AkeylessSshBackendKwargs]) -> str: """Generate a signed SSH certificate using Akeyless.""" - with CertFiles(kwargs.get('ca_cert') or None) as ca_cert_path: + with CertFiles(kwargs.get('ca_cert')) as ca_cert_path: api_instance = _setup_client( kwargs['gateway_url'].rstrip('/'), ca_cert_path, ) - token = _authenticate( - api_instance, - kwargs['access_id'], - kwargs['access_key'], - ) try: + token = _authenticate( + api_instance, + kwargs['access_id'], + kwargs['access_key'], + ) return _fetch_ssh_certificate( api_instance, token, @@ -414,17 +423,19 @@ def akeyless_ssh_backend(**kwargs: Unpack[_AkeylessSshBackendKwargs]) -> str: raise RuntimeError( f'Akeyless API error: {exc.reason} (Status: {exc.status})', ) from exc + except ValueError as exc: + raise RuntimeError(str(exc)) from exc akeyless_plugin = CredentialPlugin( 'Akeyless', - inputs=akeyless_inputs, + inputs=_akeyless_inputs, backend=akeyless_backend, ) akeyless_ssh_plugin = CredentialPlugin( 'Akeyless SSH', - inputs=akeyless_ssh_inputs, + inputs=_akeyless_ssh_inputs, backend=akeyless_ssh_backend, ) diff --git a/tests/akeyless_test.py b/tests/akeyless_test.py new file mode 100644 index 0000000000..273e3c1a1c --- /dev/null +++ b/tests/akeyless_test.py @@ -0,0 +1,296 @@ +"""Tests for Akeyless credential plugins.""" + +import pytest +from pytest_mock import MockerFixture + +from akeyless.rest import ApiException # pylint: disable=import-error + +from awx_plugins.credentials import akeyless as akeyless_mod + + +_SECRET_PATH = '/test/secret' +_ACCESS_ID = 'p-test123' +_ACCESS_KEY = 'test-key' +_GATEWAY_URL = 'https://api.akeyless.io' + + +def _make_mock_api( + mocker: MockerFixture, + *, + token: str | None = 'test-token', + item_type: str = 'STATIC_SECRET', + item_sub_type: str = 'generic', + secret_format: str = 'text', + secret_path: str = _SECRET_PATH, + secret_data: str = 'my-secret-value', + ssh_cert_data: str | None = 'ssh-rsa-SIGNED-CERT', +) -> object: + """Create a mock Akeyless API instance with configurable responses.""" + mock_static_info = mocker.MagicMock() + mock_static_info.format = secret_format + + mock_general_info = mocker.MagicMock() + mock_general_info.static_secret_info = mock_static_info + + mock_describe = mocker.MagicMock() + mock_describe.item_type = item_type + mock_describe.item_sub_type = item_sub_type + mock_describe.item_general_info = mock_general_info + + mock_api = mocker.MagicMock() + mock_api.auth.return_value.token = token + mock_api.describe_item.return_value = mock_describe + mock_api.get_secret_value.return_value = {secret_path: secret_data} + mock_api.get_ssh_certificate.return_value.data = ssh_cert_data + return mock_api + + +def _backend_kwargs( + *, + secret_path: str = _SECRET_PATH, + secret_key: str | None = None, + ca_cert: str | None = None, +) -> dict[str, object]: + """Build a minimal set of kwargs for akeyless_backend.""" + kwargs: dict[str, object] = { + 'gateway_url': _GATEWAY_URL, + 'access_id': _ACCESS_ID, + 'access_key': _ACCESS_KEY, + 'secret_path': secret_path, + } + if secret_key is not None: + kwargs['secret_key'] = secret_key + if ca_cert is not None: + kwargs['ca_cert'] = ca_cert + return kwargs + + +def _ssh_kwargs( + *, + ttl: int | str | None = None, + ca_cert: str | None = None, +) -> dict[str, object]: + """Build a minimal set of kwargs for akeyless_ssh_backend.""" + kwargs: dict[str, object] = { + 'gateway_url': _GATEWAY_URL, + 'access_id': _ACCESS_ID, + 'access_key': _ACCESS_KEY, + 'cert_issue_name': '/ssh/issuers/my-issuer', + 'cert_username': 'ubuntu', + 'public_key_data': 'ssh-rsa AAAAB3NzaC1yc2E...', + } + if ttl is not None: + kwargs['ttl'] = ttl + if ca_cert is not None: + kwargs['ca_cert'] = ca_cert + return kwargs + + +# --------------------------------------------------------------------------- +# akeyless_backend – secret store plugin +# --------------------------------------------------------------------------- + + +def test_akeyless_backend_text_generic_secret( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Retrieve a plain-text generic secret successfully.""" + mock_api = _make_mock_api(mocker, item_sub_type='generic', secret_format='text') + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + result = akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + + assert result == 'my-secret-value' + + +def test_akeyless_backend_json_secret_with_key( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Retrieve a specific key from a JSON-format structured secret.""" + mock_api = _make_mock_api( + mocker, + secret_format='json', + secret_data='{"db_password": "s3cr3t", "db_user": "admin"}', + ) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + result = akeyless_mod.akeyless_backend( # type: ignore[arg-type] + **_backend_kwargs(secret_key='db_password'), + ) + + assert result == 's3cr3t' + + +def test_akeyless_backend_password_secret_username_key( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Retrieve the 'username' field from a text/password sub-type secret.""" + payload = '{"username": "myuser", "password": "mypass"}' + mock_api = _make_mock_api( + mocker, + item_sub_type='password', + secret_format='text', + secret_data=payload, + ) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + result = akeyless_mod.akeyless_backend( # type: ignore[arg-type] + **_backend_kwargs(secret_key='username'), + ) + + assert result == 'myuser' + + +def test_akeyless_backend_unsupported_item_type_raises( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Unsupported secret types (e.g. dynamic) should raise NotImplementedError.""" + mock_api = _make_mock_api(mocker, item_type='DYNAMIC_SECRET') + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises(NotImplementedError, match='DYNAMIC_SECRET'): + akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + + +def test_akeyless_backend_api_exception_wraps_to_runtime_error( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """ApiException from the SDK should be re-raised as RuntimeError.""" + mock_api = _make_mock_api(mocker) + mock_api.get_secret_value.side_effect = ApiException( + status=403, + reason='Forbidden', + ) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises(RuntimeError, match=r'Akeyless API error: Forbidden \(Status: 403\)'): + akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + + +def test_akeyless_backend_auth_failure_raises_runtime_error( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A missing auth token should surface as RuntimeError to the plugin caller.""" + mock_api = _make_mock_api(mocker, token=None) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises(RuntimeError, match='no token received'): + akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + + +def test_akeyless_backend_missing_json_key_raises( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A missing key in a structured secret should raise KeyError with the path.""" + mock_api = _make_mock_api( + mocker, + secret_format='json', + secret_data='{"other_key": "value"}', + ) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises(KeyError, match=_SECRET_PATH): + akeyless_mod.akeyless_backend( # type: ignore[arg-type] + **_backend_kwargs(secret_key='missing_key'), + ) + + +# --------------------------------------------------------------------------- +# akeyless_ssh_backend – SSH certificate plugin +# --------------------------------------------------------------------------- + + +def test_akeyless_ssh_backend_success( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Successfully generate a signed SSH certificate.""" + mock_api = _make_mock_api(mocker, ssh_cert_data='ssh-rsa-cert-AAAAAA') + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + result = akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + + assert result == 'ssh-rsa-cert-AAAAAA' + + +def test_akeyless_ssh_backend_with_ttl( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """SSH certificate request with an explicit TTL should succeed.""" + mock_api = _make_mock_api(mocker, ssh_cert_data='signed-cert') + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + result = akeyless_mod.akeyless_ssh_backend( # type: ignore[arg-type] + **_ssh_kwargs(ttl=3600), + ) + + assert result == 'signed-cert' + _, call_kwargs = mock_api.get_ssh_certificate.call_args + assert call_kwargs['body'].ttl == 3600 + + +def test_akeyless_ssh_backend_no_cert_data_raises( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """An empty/None cert response should surface as RuntimeError.""" + mock_api = _make_mock_api(mocker, ssh_cert_data=None) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises(RuntimeError, match='no data returned'): + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + + +def test_akeyless_ssh_backend_api_exception_wraps_to_runtime_error( + mocker: MockerFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """ApiException from the SSH cert call should be re-raised as RuntimeError.""" + mock_api = _make_mock_api(mocker) + mock_api.get_ssh_certificate.side_effect = ApiException( + status=404, + reason='Issuer not found', + ) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + + with pytest.raises( + RuntimeError, + match=r'Akeyless API error: Issuer not found \(Status: 404\)', + ): + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# _coerce_ttl helper +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ('ttl_input', 'expected'), + ( + pytest.param(None, None, id='none'), + pytest.param('', None, id='empty-string'), + pytest.param(3600, 3600, id='int'), + pytest.param('7200', 7200, id='string-int'), + ), +) +def test_coerce_ttl_valid_inputs( + ttl_input: int | str | None, + expected: int | None, +) -> None: + """_coerce_ttl should convert numeric-like values and pass None/'' through.""" + assert akeyless_mod._coerce_ttl(ttl_input) == expected # noqa: WPS437 + + +def test_coerce_ttl_invalid_raises() -> None: + """_coerce_ttl should raise ValueError for non-numeric strings.""" + with pytest.raises(ValueError, match='integer number of seconds'): + akeyless_mod._coerce_ttl('not-a-number') # noqa: WPS437 From 2ee90140faad7a7017f042be2e79623ee4317c0a Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Wed, 18 Mar 2026 13:23:40 -0400 Subject: [PATCH 7/9] fix precommit ci --- .flake8 | 5 + .mypy.ini | 15 +++ .pre-commit-config.yaml | 1 + .pylintrc.toml | 6 +- .ruff.toml | 3 + pytest.ini | 4 + src/awx_plugins/credentials/akeyless.py | 30 +++-- tests/akeyless_test.py | 170 ++++++++++++------------ 8 files changed, 140 insertions(+), 94 deletions(-) diff --git a/.flake8 b/.flake8 index 62a48b1697..19f3b4fc07 100644 --- a/.flake8 +++ b/.flake8 @@ -117,6 +117,11 @@ per-file-ignores = # additionally test docstrings don't need param lists (DAR, DCO020): tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 + # The following ignores have been researched and should be considered permanent: + # WPS202: two-plugin module (secret store + SSH) with Protocol types, TypedDicts, + # and multiple helpers per plugin; restructuring would harm readability. + src/awx_plugins/credentials/akeyless.py: WPS202 + # The following ignores must be fixed and the entries removed from this config: src/awx_plugins/credentials/aim.py: ANN003, ANN201, B950, CCR001, D100, D103, LN001, Q003, WPS210, WPS221, WPS223, WPS231, WPS336, WPS432 src/awx_plugins/credentials/aws_secretsmanager.py: ANN003, ANN201, D100, D103, WPS111, WPS210, WPS329, WPS529 diff --git a/.mypy.ini b/.mypy.ini index 03c1daf45d..b29d37daee 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -115,8 +115,23 @@ disallow_any_expr = false # crashes with some decorators like `@tox.plugin.impl`: disallow_any_expr = false +[mypy-akeyless] +# The akeyless SDK does not ship type stubs: +ignore_missing_imports = true + +[mypy-akeyless.*] +# The akeyless SDK does not ship type stubs: +ignore_missing_imports = true + [mypy-tests.*] # crashes with some decorators like `@pytest.mark.parametrize`: disallow_any_expr = false # fails on `@hypothesis.given()`: disallow_any_decorated = false +# fixture return types like `Callable[..., object]` use `...` as Any: +disallow_any_explicit = false + +[mypy-tests.akeyless_test] +# Mock API objects are typed as `object`; test kwargs use generic dict +# rather than the specific TypedDicts the plugin functions expect: +disable_error_code = attr-defined, arg-type diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 422835b9d3..9b2f670284 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -319,6 +319,7 @@ repos: - pytest-mock # needed by pylint-pytest since it picks up pytest's args - pytest-subtests # needed by pylint-pytest since it picks up pytest's args - pytest-xdist # needed by pylint-pytest since it picks up pytest's args + - akeyless >= 5.0.8 # needed by credentials.akeyless and its tests - python-dsv-sdk # needed by credentials.dsv, credentials.thycotic_dsv - PyYAML # needed by credentials.injectors, inventory.plugins - Sphinx # needed by the Sphinx extension stub diff --git a/.pylintrc.toml b/.pylintrc.toml index 3ffdc0e837..d95ae6d132 100644 --- a/.pylintrc.toml +++ b/.pylintrc.toml @@ -106,7 +106,11 @@ py-version = "3.11" # source root is an absolute path or a path relative to the current working # directory used to determine a package namespace for modules located under the # source root. -# source-roots = +# Setting src/ prevents pylint from adding individual credential module +# directories to sys.path, which would cause false "module imports itself" +# warnings and circular-import errors for files with the same name as their +# third-party dependencies (e.g. akeyless.py vs the akeyless SDK): +source-roots = ["src"] # Allow loading of arbitrary C extensions. Extensions are imported into the # active Python interpreter and may run arbitrary code. diff --git a/.ruff.toml b/.ruff.toml index a8ecdbe6c6..52dd17cb11 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -173,6 +173,9 @@ testing = [ "S101", # Allow use of `assert` in test files "S105", # hardcoded-password-string "S106", # hardcoded-password-func-arg + "S107", # hardcoded-password-func-default: test helper factories use + # credential-like param names (token, secret_data) as configurable + # defaults — these are never real credentials "S108", # tmp dirs "S404", # Allow importing 'subprocess' module to testing call external tools needed by these hooks "S603", # subprocess calls diff --git a/pytest.ini b/pytest.ini index ed3eceeead..4bd799b508 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,8 @@ [pytest] +# Add src/ to sys.path so tests can import the package even in environments +# where `pip install -e .` has not been run (e.g. the pre-commit pylint venv): +pythonpath = src + addopts = # `pytest-xdist`: --numprocesses=auto diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index 51f69b3299..da65fa61a5 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -1,18 +1,20 @@ -"""Akeyless credential plugins for AWX.""" # noqa: WPS202 +"""Akeyless credential plugins for AWX.""" # WPS202 is expected: implementing two credential plugins requires Protocol # types, TypedDicts, and multiple helper functions per plugin, which exceeds # the default module-member threshold. -import collections.abc as _c import json as _json import typing as _t +from collections.abc import Mapping as _Mapping from awx_plugins.interfaces._temporary_private_django_api import ( # noqa: WPS436 gettext_noop as _, ) # The akeyless SDK does not ship type stubs; pylint cannot resolve its imports. -# pylint: disable=import-error,no-name-in-module +# When not installed, pylint misidentifies `from akeyless import` as a +# self-import because this file is named akeyless.py (import-self). +# pylint: disable=import-error,import-self,no-name-in-module from akeyless import ( ApiClient, Auth, @@ -23,8 +25,8 @@ ) from akeyless.models.get_ssh_certificate import GetSSHCertificate from akeyless.rest import ApiException -# pylint: enable=import-error,no-name-in-module +# pylint: enable=import-error,import-self,no-name-in-module from .plugin import CertFiles, CredentialPlugin @@ -84,7 +86,7 @@ def describe_item(self: _t.Self, req: object) -> _DescribeItemResponse: ... def get_secret_value( self: _t.Self, req: object, - ) -> _c.Mapping[str, str]: ... + ) -> _Mapping[str, str]: ... def get_ssh_certificate( self: _t.Self, @@ -281,7 +283,7 @@ def _extract_structured_secret( def _extract_secret_value( - secret_response: _c.Mapping[str, str], + secret_response: _Mapping[str, str], secret_path: str, secret_key: str | None, static_secret_format: str, @@ -357,6 +359,9 @@ def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: kwargs['access_id'], kwargs['access_key'], ) + except ValueError as exc: + raise RuntimeError(str(exc)) from exc + try: return _fetch_secret_value( api_instance, token, @@ -367,8 +372,6 @@ def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: raise RuntimeError( f'Akeyless API error: {exc.reason} (Status: {exc.status})', ) from exc - except ValueError as exc: - raise RuntimeError(str(exc)) from exc def _coerce_ttl(ttl_value: int | str | None) -> int | None: @@ -401,7 +404,9 @@ def _fetch_ssh_certificate( return response.data -def akeyless_ssh_backend(**kwargs: _t.Unpack[_AkeylessSshBackendKwargs]) -> str: +def akeyless_ssh_backend( + **kwargs: _t.Unpack[_AkeylessSshBackendKwargs], +) -> str: """Generate a signed SSH certificate using Akeyless.""" with CertFiles(kwargs.get('ca_cert')) as ca_cert_path: api_instance = _setup_client( @@ -414,6 +419,9 @@ def akeyless_ssh_backend(**kwargs: _t.Unpack[_AkeylessSshBackendKwargs]) -> str: kwargs['access_id'], kwargs['access_key'], ) + except ValueError as exc: + raise RuntimeError(str(exc)) from exc + try: return _fetch_ssh_certificate( api_instance, token, @@ -429,13 +437,13 @@ def akeyless_ssh_backend(**kwargs: _t.Unpack[_AkeylessSshBackendKwargs]) -> str: akeyless_plugin = CredentialPlugin( 'Akeyless', - inputs=_akeyless_inputs, + inputs=_akeyless_inputs, # type: ignore[arg-type] backend=akeyless_backend, ) akeyless_ssh_plugin = CredentialPlugin( 'Akeyless SSH', - inputs=_akeyless_ssh_inputs, + inputs=_akeyless_ssh_inputs, # type: ignore[arg-type] backend=akeyless_ssh_backend, ) diff --git a/tests/akeyless_test.py b/tests/akeyless_test.py index 273e3c1a1c..dfcaee25a3 100644 --- a/tests/akeyless_test.py +++ b/tests/akeyless_test.py @@ -1,5 +1,7 @@ """Tests for Akeyless credential plugins.""" +from collections.abc import Callable + import pytest from pytest_mock import MockerFixture @@ -13,8 +15,15 @@ _ACCESS_KEY = 'test-key' _GATEWAY_URL = 'https://api.akeyless.io' +_HTTP_FORBIDDEN = 403 +_HTTP_NOT_FOUND = 404 +_ONE_HOUR_SEC = 3600 +_TWO_HOURS_SEC = 7200 + +_MockApiFactory = Callable[..., object] + -def _make_mock_api( +def _make_mock_api( # noqa: WPS211 # pylint: disable=too-many-arguments mocker: MockerFixture, *, token: str | None = 'test-token', @@ -45,6 +54,21 @@ def _make_mock_api( return mock_api +@pytest.fixture +def patch_setup_client( + monkeypatch: pytest.MonkeyPatch, + mocker: MockerFixture, +) -> _MockApiFactory: + """Patch _setup_client, returning a factory that yields the configured mock.""" + + def _factory(**kwargs: object) -> object: + mock = _make_mock_api(mocker, **kwargs) + monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock) + return mock + + return _factory + + def _backend_kwargs( *, secret_path: str = _SECRET_PATH, @@ -87,185 +111,165 @@ def _ssh_kwargs( # --------------------------------------------------------------------------- -# akeyless_backend – secret store plugin +# akeyless_backend - secret store plugin # --------------------------------------------------------------------------- def test_akeyless_backend_text_generic_secret( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, + patch_setup_client: _MockApiFactory, ) -> None: """Retrieve a plain-text generic secret successfully.""" - mock_api = _make_mock_api(mocker, item_sub_type='generic', secret_format='text') - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + patch_setup_client(item_sub_type='generic', secret_format='text') - result = akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + fetched_secret = akeyless_mod.akeyless_backend( + **_backend_kwargs(), + ) - assert result == 'my-secret-value' + assert fetched_secret == 'my-secret-value' def test_akeyless_backend_json_secret_with_key( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, + patch_setup_client: _MockApiFactory, ) -> None: """Retrieve a specific key from a JSON-format structured secret.""" - mock_api = _make_mock_api( - mocker, + patch_setup_client( secret_format='json', secret_data='{"db_password": "s3cr3t", "db_user": "admin"}', ) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) - result = akeyless_mod.akeyless_backend( # type: ignore[arg-type] + fetched_secret = akeyless_mod.akeyless_backend( **_backend_kwargs(secret_key='db_password'), ) - assert result == 's3cr3t' + assert fetched_secret == 's3cr3t' -def test_akeyless_backend_password_secret_username_key( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_backend_password_secret_username_key( + patch_setup_client: _MockApiFactory, ) -> None: """Retrieve the 'username' field from a text/password sub-type secret.""" payload = '{"username": "myuser", "password": "mypass"}' - mock_api = _make_mock_api( - mocker, + patch_setup_client( item_sub_type='password', secret_format='text', secret_data=payload, ) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) - result = akeyless_mod.akeyless_backend( # type: ignore[arg-type] + fetched_secret = akeyless_mod.akeyless_backend( **_backend_kwargs(secret_key='username'), ) - assert result == 'myuser' + assert fetched_secret == 'myuser' -def test_akeyless_backend_unsupported_item_type_raises( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_backend_unsupported_type_raises( + patch_setup_client: _MockApiFactory, ) -> None: """Unsupported secret types (e.g. dynamic) should raise NotImplementedError.""" - mock_api = _make_mock_api(mocker, item_type='DYNAMIC_SECRET') - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + patch_setup_client(item_type='DYNAMIC_SECRET') with pytest.raises(NotImplementedError, match='DYNAMIC_SECRET'): - akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + akeyless_mod.akeyless_backend(**_backend_kwargs()) -def test_akeyless_backend_api_exception_wraps_to_runtime_error( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_backend_api_exc_wraps_runtime( + patch_setup_client: _MockApiFactory, ) -> None: - """ApiException from the SDK should be re-raised as RuntimeError.""" - mock_api = _make_mock_api(mocker) + """An ApiException from get_secret_value surfaces as RuntimeError.""" + mock_api = patch_setup_client() mock_api.get_secret_value.side_effect = ApiException( - status=403, + status=_HTTP_FORBIDDEN, reason='Forbidden', ) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) - with pytest.raises(RuntimeError, match=r'Akeyless API error: Forbidden \(Status: 403\)'): - akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + with pytest.raises( + RuntimeError, + match=r'Akeyless API error: Forbidden \(Status: 403\)', + ): + akeyless_mod.akeyless_backend(**_backend_kwargs()) -def test_akeyless_backend_auth_failure_raises_runtime_error( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_backend_auth_failure_raises( + patch_setup_client: _MockApiFactory, ) -> None: """A missing auth token should surface as RuntimeError to the plugin caller.""" - mock_api = _make_mock_api(mocker, token=None) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + patch_setup_client(token=None) with pytest.raises(RuntimeError, match='no token received'): - akeyless_mod.akeyless_backend(**_backend_kwargs()) # type: ignore[arg-type] + akeyless_mod.akeyless_backend(**_backend_kwargs()) -def test_akeyless_backend_missing_json_key_raises( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_backend_missing_json_key_raises( + patch_setup_client: _MockApiFactory, ) -> None: """A missing key in a structured secret should raise KeyError with the path.""" - mock_api = _make_mock_api( - mocker, + patch_setup_client( secret_format='json', secret_data='{"other_key": "value"}', ) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) with pytest.raises(KeyError, match=_SECRET_PATH): - akeyless_mod.akeyless_backend( # type: ignore[arg-type] + akeyless_mod.akeyless_backend( **_backend_kwargs(secret_key='missing_key'), ) # --------------------------------------------------------------------------- -# akeyless_ssh_backend – SSH certificate plugin +# akeyless_ssh_backend - SSH certificate plugin # --------------------------------------------------------------------------- def test_akeyless_ssh_backend_success( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, + patch_setup_client: _MockApiFactory, ) -> None: """Successfully generate a signed SSH certificate.""" - mock_api = _make_mock_api(mocker, ssh_cert_data='ssh-rsa-cert-AAAAAA') - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + patch_setup_client(ssh_cert_data='ssh-rsa-cert-AAAAAA') - result = akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + signed_cert = akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) - assert result == 'ssh-rsa-cert-AAAAAA' + assert signed_cert == 'ssh-rsa-cert-AAAAAA' def test_akeyless_ssh_backend_with_ttl( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, + patch_setup_client: _MockApiFactory, ) -> None: """SSH certificate request with an explicit TTL should succeed.""" - mock_api = _make_mock_api(mocker, ssh_cert_data='signed-cert') - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + mock_api = patch_setup_client(ssh_cert_data='signed-cert') - result = akeyless_mod.akeyless_ssh_backend( # type: ignore[arg-type] - **_ssh_kwargs(ttl=3600), + signed_cert = akeyless_mod.akeyless_ssh_backend( + **_ssh_kwargs(ttl=_ONE_HOUR_SEC), ) - assert result == 'signed-cert' + assert signed_cert == 'signed-cert' _, call_kwargs = mock_api.get_ssh_certificate.call_args - assert call_kwargs['body'].ttl == 3600 + assert call_kwargs['body'].ttl == _ONE_HOUR_SEC -def test_akeyless_ssh_backend_no_cert_data_raises( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_ssh_backend_no_cert_data_raises( + patch_setup_client: _MockApiFactory, ) -> None: """An empty/None cert response should surface as RuntimeError.""" - mock_api = _make_mock_api(mocker, ssh_cert_data=None) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) + patch_setup_client(ssh_cert_data=None) with pytest.raises(RuntimeError, match='no data returned'): - akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) -def test_akeyless_ssh_backend_api_exception_wraps_to_runtime_error( - mocker: MockerFixture, - monkeypatch: pytest.MonkeyPatch, +def test_ssh_api_exc_wraps_runtime( + patch_setup_client: _MockApiFactory, ) -> None: - """ApiException from the SSH cert call should be re-raised as RuntimeError.""" - mock_api = _make_mock_api(mocker) + """An ApiException from get_ssh_certificate surfaces as RuntimeError.""" + mock_api = patch_setup_client() mock_api.get_ssh_certificate.side_effect = ApiException( - status=404, + status=_HTTP_NOT_FOUND, reason='Issuer not found', ) - monkeypatch.setattr(akeyless_mod, '_setup_client', lambda *_: mock_api) with pytest.raises( RuntimeError, match=r'Akeyless API error: Issuer not found \(Status: 404\)', ): - akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # type: ignore[arg-type] + akeyless_mod.akeyless_ssh_backend(**_ssh_kwargs()) # --------------------------------------------------------------------------- @@ -278,8 +282,8 @@ def test_akeyless_ssh_backend_api_exception_wraps_to_runtime_error( ( pytest.param(None, None, id='none'), pytest.param('', None, id='empty-string'), - pytest.param(3600, 3600, id='int'), - pytest.param('7200', 7200, id='string-int'), + pytest.param(_ONE_HOUR_SEC, _ONE_HOUR_SEC, id='int'), + pytest.param('7200', _TWO_HOURS_SEC, id='string-int'), ), ) def test_coerce_ttl_valid_inputs( @@ -287,10 +291,12 @@ def test_coerce_ttl_valid_inputs( expected: int | None, ) -> None: """_coerce_ttl should convert numeric-like values and pass None/'' through.""" + # pylint: disable-next=protected-access assert akeyless_mod._coerce_ttl(ttl_input) == expected # noqa: WPS437 def test_coerce_ttl_invalid_raises() -> None: """_coerce_ttl should raise ValueError for non-numeric strings.""" with pytest.raises(ValueError, match='integer number of seconds'): + # pylint: disable-next=protected-access akeyless_mod._coerce_ttl('not-a-number') # noqa: WPS437 From 317988447045ffa0530b7de333440c03379453aa Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Wed, 18 Mar 2026 15:16:04 -0400 Subject: [PATCH 8/9] pr review fixes --- docs/spelling_wordlist.txt | 1 + src/awx_plugins/credentials/akeyless.py | 87 +++++++++++++++---------- tests/akeyless_test.py | 6 +- 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index a0d15b1f16..7de124f9c4 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,3 +1,4 @@ +Akeyless Ansible Approle async diff --git a/src/awx_plugins/credentials/akeyless.py b/src/awx_plugins/credentials/akeyless.py index da65fa61a5..8fc06e9a5a 100644 --- a/src/awx_plugins/credentials/akeyless.py +++ b/src/awx_plugins/credentials/akeyless.py @@ -16,18 +16,28 @@ # self-import because this file is named akeyless.py (import-self). # pylint: disable=import-error,import-self,no-name-in-module from akeyless import ( - ApiClient, - Auth, - Configuration, - DescribeItem, - GetSecretValue, - V2Api, + ApiClient as _ApiClient, + Auth as _Auth, + Configuration as _Configuration, + DescribeItem as _DescribeItem, + GetSecretValue as _GetSecretValue, + V2Api as _V2Api, ) -from akeyless.models.get_ssh_certificate import GetSSHCertificate -from akeyless.rest import ApiException +from akeyless.models.get_ssh_certificate import ( + GetSSHCertificate as _GetSSHCertificate, +) +from akeyless.rest import ApiException as _ApiException # pylint: enable=import-error,import-self,no-name-in-module -from .plugin import CertFiles, CredentialPlugin +from . import plugin as _plugin + + +__all__ = ( # noqa: WPS410 + 'akeyless_backend', + 'akeyless_plugin', + 'akeyless_ssh_backend', + 'akeyless_ssh_plugin', +) _SUPPORTED_ITEM_TYPES = frozenset(('STATIC_SECRET',)) @@ -79,18 +89,21 @@ class _SshCertResponse(_t.Protocol): class _AkeylessApi(_t.Protocol): - def auth(self: _t.Self, auth: object) -> _AuthResponse: ... + def auth(self: _t.Self, auth: _Auth) -> _AuthResponse: ... - def describe_item(self: _t.Self, req: object) -> _DescribeItemResponse: ... + def describe_item( + self: _t.Self, + req: _DescribeItem, + ) -> _DescribeItemResponse: ... def get_secret_value( self: _t.Self, - req: object, + req: _GetSecretValue, ) -> _Mapping[str, str]: ... def get_ssh_certificate( self: _t.Self, - req: object, + req: _GetSSHCertificate, ) -> _SshCertResponse: ... @@ -213,14 +226,14 @@ def get_ssh_certificate( def _setup_client(gateway_url: str, ca_cert_path: str | None) -> _AkeylessApi: - client_configuration = Configuration(host=gateway_url) + client_configuration = _Configuration(host=gateway_url) if ca_cert_path: client_configuration.ssl_ca_cert = ca_cert_path client_configuration.verify_ssl = True - api_client = ApiClient(client_configuration) + api_client = _ApiClient(client_configuration) api_client.user_agent = 'AWX' api_client.default_headers['akeylessclienttype'] = 'AWX' - return V2Api(api_client) + return _V2Api(api_client) def _authenticate( @@ -229,7 +242,7 @@ def _authenticate( access_key: str, ) -> str: auth_response = api_instance.auth( - Auth( + _Auth( access_id=access_id, access_key=access_key, ), @@ -321,7 +334,7 @@ def _fetch_secret_value( secret_path: str, secret_key: str | None, ) -> str: - describe_item_request = DescribeItem(name=secret_path, token=token) + describe_item_request = _DescribeItem(name=secret_path, token=token) describe_item_response = api_instance.describe_item(describe_item_request) _ensure_supported_item_type(secret_path, describe_item_response.item_type) @@ -331,7 +344,7 @@ def _fetch_secret_value( static_secret_sub_type = describe_item_response.item_sub_type secret_response = api_instance.get_secret_value( - GetSecretValue( + _GetSecretValue( names=[secret_path], token=token, ), @@ -348,7 +361,7 @@ def _fetch_secret_value( def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: """Retrieve a secret value from Akeyless.""" - with CertFiles(kwargs.get('ca_cert')) as ca_cert_path: + with _plugin.CertFiles(kwargs.get('ca_cert')) as ca_cert_path: api_instance = _setup_client( kwargs['gateway_url'].rstrip('/'), ca_cert_path, @@ -359,8 +372,8 @@ def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: kwargs['access_id'], kwargs['access_key'], ) - except ValueError as exc: - raise RuntimeError(str(exc)) from exc + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err try: return _fetch_secret_value( api_instance, @@ -368,10 +381,11 @@ def akeyless_backend(**kwargs: _t.Unpack[_AkeylessBackendKwargs]) -> str: kwargs['secret_path'], kwargs.get('secret_key'), ) - except ApiException as exc: + except _ApiException as api_exc: raise RuntimeError( - f'Akeyless API error: {exc.reason} (Status: {exc.status})', - ) from exc + f'Akeyless API error: {api_exc.reason}' + f' (Status: {api_exc.status})', + ) from api_exc def _coerce_ttl(ttl_value: int | str | None) -> int | None: @@ -389,7 +403,7 @@ def _fetch_ssh_certificate( ssh_inputs: _AkeylessSshBackendKwargs, ) -> str: response = api_instance.get_ssh_certificate( - GetSSHCertificate( + _GetSSHCertificate( token=token, cert_issuer_name=ssh_inputs['cert_issue_name'], cert_username=ssh_inputs['cert_username'], @@ -408,7 +422,7 @@ def akeyless_ssh_backend( **kwargs: _t.Unpack[_AkeylessSshBackendKwargs], ) -> str: """Generate a signed SSH certificate using Akeyless.""" - with CertFiles(kwargs.get('ca_cert')) as ca_cert_path: + with _plugin.CertFiles(kwargs.get('ca_cert')) as ca_cert_path: api_instance = _setup_client( kwargs['gateway_url'].rstrip('/'), ca_cert_path, @@ -419,30 +433,31 @@ def akeyless_ssh_backend( kwargs['access_id'], kwargs['access_key'], ) - except ValueError as exc: - raise RuntimeError(str(exc)) from exc + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err try: return _fetch_ssh_certificate( api_instance, token, kwargs, ) - except ApiException as exc: + except _ApiException as api_exc: raise RuntimeError( - f'Akeyless API error: {exc.reason} (Status: {exc.status})', - ) from exc - except ValueError as exc: - raise RuntimeError(str(exc)) from exc + f'Akeyless API error: {api_exc.reason}' + f' (Status: {api_exc.status})', + ) from api_exc + except ValueError as val_err: + raise RuntimeError(str(val_err)) from val_err -akeyless_plugin = CredentialPlugin( +akeyless_plugin = _plugin.CredentialPlugin( 'Akeyless', inputs=_akeyless_inputs, # type: ignore[arg-type] backend=akeyless_backend, ) -akeyless_ssh_plugin = CredentialPlugin( +akeyless_ssh_plugin = _plugin.CredentialPlugin( 'Akeyless SSH', inputs=_akeyless_ssh_inputs, # type: ignore[arg-type] backend=akeyless_ssh_backend, diff --git a/tests/akeyless_test.py b/tests/akeyless_test.py index dfcaee25a3..3b32701c40 100644 --- a/tests/akeyless_test.py +++ b/tests/akeyless_test.py @@ -241,8 +241,8 @@ def test_akeyless_ssh_backend_with_ttl( ) assert signed_cert == 'signed-cert' - _, call_kwargs = mock_api.get_ssh_certificate.call_args - assert call_kwargs['body'].ttl == _ONE_HOUR_SEC + ssh_request = mock_api.get_ssh_certificate.call_args.args[0] + assert ssh_request.ttl == _ONE_HOUR_SEC def test_ssh_backend_no_cert_data_raises( @@ -291,6 +291,7 @@ def test_coerce_ttl_valid_inputs( expected: int | None, ) -> None: """_coerce_ttl should convert numeric-like values and pass None/'' through.""" + # WPS437: _coerce_ttl is private but is tested directly here intentionally # pylint: disable-next=protected-access assert akeyless_mod._coerce_ttl(ttl_input) == expected # noqa: WPS437 @@ -298,5 +299,6 @@ def test_coerce_ttl_valid_inputs( def test_coerce_ttl_invalid_raises() -> None: """_coerce_ttl should raise ValueError for non-numeric strings.""" with pytest.raises(ValueError, match='integer number of seconds'): + # WPS437: _coerce_ttl is private but is tested directly here intentionally # pylint: disable-next=protected-access akeyless_mod._coerce_ttl('not-a-number') # noqa: WPS437 From ae8022e68d6148d13bdfa5f733f29b0b176a5b18 Mon Sep 17 00:00:00 2001 From: Kobbi Gal Date: Wed, 18 Mar 2026 15:29:53 -0400 Subject: [PATCH 9/9] fix doc build, ignore akeyless sdk types --- docs/conf.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/conf.py b/docs/conf.py index 1cb0f78ee1..c94a71da94 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -234,6 +234,12 @@ 'awx_plugins.interfaces._temporary_private_credential_api.Credential', ), ('py:class', 'EnvVarsType'), + # Akeyless SDK types: auto-generated from OpenAPI + # without type annotations or .pyi stubs + ('py:class', 'akeyless.models.auth.Auth'), + ('py:class', 'akeyless.models.describe_item.DescribeItem'), + ('py:class', 'akeyless.models.get_secret_value.GetSecretValue'), + ('py:class', 'akeyless.models.get_ssh_certificate.GetSSHCertificate'), ]