From f91bf4ffcda6e9447f3e3f293be1582abff4c164 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Wed, 25 Mar 2026 16:48:16 +0100 Subject: [PATCH 01/19] feat: support for oidc credential /test endpoint --- awx/api/serializers.py | 21 +++++++++++++++++++++ awx/api/views/__init__.py | 31 +++++++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 2a8e94d83392..e849f3003bc9 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2932,6 +2932,27 @@ def to_representation(self, data): field['label'] = _(field['label']) if 'help_text' in field: field['help_text'] = _(field['help_text']) + + # Deep copy inputs to avoid modifying the original model data + if value.get('inputs'): + value['inputs'] = copy.deepcopy(value['inputs']) + fields = value['inputs'].get('fields', []) + found_wit_field = 'workload_identity_token' in [f.get('id') for f in fields] + + # Filter out internal fields from the API response + value['inputs']['fields'] = [f for f in fields if not f.get('internal')] + + # If workload identity token field exists, add job_template to metadata + if found_wit_field: + metadata = value['inputs'].get('metadata', []) + metadata.append({ + "id": "job_template_id", + "label": "ID of a Job Template", + "type": "string", + "help_text": "Job template ID to use when generating a token." + }) + value['inputs']['metadata'] = metadata + return value def filter_field_metadata(self, fields, method): diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 1456e8bb160b..016c794a2d7f 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -60,7 +60,8 @@ from ansible_base.lib.utils.schema import extend_schema_if_available # AWX -from awx.main.tasks.system import send_notifications, update_inventory_computed_fields +from awx.main.tasks.jobs import AutomationControllerJobScope, retrieve_workload_identity_jwt +from awx.main.tasks.system import flag_enabled, send_notifications, update_inventory_computed_fields from awx.main.access import get_user_queryset from awx.api.generics import ( APIView, @@ -1622,10 +1623,36 @@ def post(self, request, *args, **kwargs): if value != '$encrypted$': backend_kwargs[field_name] = value backend_kwargs.update(request.data.get('metadata', {})) + + # Add extra test functionality for OIDC-enabled credential types + response_body = {} + if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): + # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field + fields = obj.credential_type.inputs.get('fields', []) + for field in fields: + if field.get('internal') and field.get('id') == 'workload_identity_token': + # Make sure that the requesting user has access to the job template + job_template_id = backend_kwargs.get('job_template_id') + if job_template_id is None: + return Response({'detail': _('Job template ID is required.')}, status=status.HTTP_400_BAD_REQUEST) + job_template = models.JobTemplate.objects.get(id=int()) + if not request.user.can_access(models.JobTemplate, 'read', job_template): + raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) + + # Get a Workload Identity Token + workload_identity_token = retrieve_workload_identity_jwt( + unified_job=job_template, + audience=obj.get_input('jwt_aud'), + scope=AutomationControllerJobScope.name, + ) + backend_kwargs['workload_identity_token'] = workload_identity_token + from jwt import decode as _jwt_decode + response_body['sent_jwt_payload'] = _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}) + try: with set_environ(**settings.AWX_TASK_ENV): obj.credential_type.plugin.backend(**backend_kwargs) - return Response({}, status=status.HTTP_202_ACCEPTED) + return Response(response_body, status=status.HTTP_202_ACCEPTED) except requests.exceptions.HTTPError: message = """Test operation is not supported for credential type {}. This endpoint only supports credentials that connect to From 45dc7e932201b9d199004a12c37380a2a3788fe9 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 10:44:32 +0200 Subject: [PATCH 02/19] fix: job_template id was missing in db lookup --- awx/api/views/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 016c794a2d7f..221562294ff4 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1635,7 +1635,7 @@ def post(self, request, *args, **kwargs): job_template_id = backend_kwargs.get('job_template_id') if job_template_id is None: return Response({'detail': _('Job template ID is required.')}, status=status.HTTP_400_BAD_REQUEST) - job_template = models.JobTemplate.objects.get(id=int()) + job_template = models.JobTemplate.objects.get(id=int(job_template_id)) if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) From 2d97334db7892fa41633c682a9638e341246e27b Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 10:46:43 +0200 Subject: [PATCH 03/19] fix: move sent jwt payload into a generic object --- awx/api/views/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 221562294ff4..8678426d356b 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1647,7 +1647,9 @@ def post(self, request, *args, **kwargs): ) backend_kwargs['workload_identity_token'] = workload_identity_token from jwt import decode as _jwt_decode - response_body['sent_jwt_payload'] = _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}) + response_body['details'] = { + 'sent_jwt_payload': _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}), + } try: with set_environ(**settings.AWX_TASK_ENV): From f89f4b85f38aa7f0f32056ccb1950c041db7a5a5 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 11:16:11 +0200 Subject: [PATCH 04/19] fix: remove dinamic job_template_id field --- awx/api/serializers.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e849f3003bc9..18bbc7b59f0d 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2937,22 +2937,10 @@ def to_representation(self, data): if value.get('inputs'): value['inputs'] = copy.deepcopy(value['inputs']) fields = value['inputs'].get('fields', []) - found_wit_field = 'workload_identity_token' in [f.get('id') for f in fields] # Filter out internal fields from the API response value['inputs']['fields'] = [f for f in fields if not f.get('internal')] - # If workload identity token field exists, add job_template to metadata - if found_wit_field: - metadata = value['inputs'].get('metadata', []) - metadata.append({ - "id": "job_template_id", - "label": "ID of a Job Template", - "type": "string", - "help_text": "Job template ID to use when generating a token." - }) - value['inputs']['metadata'] = metadata - return value def filter_field_metadata(self, fields, method): From 74cf247191ce6bd784a573ca710134e1cdab511c Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 16:28:18 +0200 Subject: [PATCH 05/19] fix: pass job_template claims instead of unified job template credential external test plugin will gather known claims to perform the test operation over vault by using OIDC auth --- awx/api/views/__init__.py | 15 ++++++++++++--- awx/main/tasks/jobs.py | 23 ++++++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8678426d356b..e559f8401a79 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -60,7 +60,7 @@ from ansible_base.lib.utils.schema import extend_schema_if_available # AWX -from awx.main.tasks.jobs import AutomationControllerJobScope, retrieve_workload_identity_jwt +from awx.main.tasks.jobs import AutomationControllerJobScope, retrieve_workload_identity_jwt_with_claims from awx.main.tasks.system import flag_enabled, send_notifications, update_inventory_computed_fields from awx.main.access import get_user_queryset from awx.api.generics import ( @@ -1639,9 +1639,18 @@ def post(self, request, *args, **kwargs): if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) + claims = { + AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name, + AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id, + AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name, + AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id, + AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook, + } # Get a Workload Identity Token - workload_identity_token = retrieve_workload_identity_jwt( - unified_job=job_template, + workload_identity_token = retrieve_workload_identity_jwt_with_claims( + claims=claims, audience=obj.get_input('jwt_aud'), scope=AutomationControllerJobScope.name, ) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 80414390f887..a212357d2990 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -158,8 +158,8 @@ def populate_claims_for_workload(unified_job) -> dict: return claims -def retrieve_workload_identity_jwt( - unified_job: UnifiedJob, +def retrieve_workload_identity_jwt_with_claims( + claims: dict, audience: str, scope: str, workload_ttl_seconds: int | None = None, @@ -171,13 +171,30 @@ def retrieve_workload_identity_jwt( client = get_workload_identity_client() if client is None: raise RuntimeError("Workload identity client is not configured") - claims = populate_claims_for_workload(unified_job) kwargs = {"claims": claims, "scope": scope, "audience": audience} if workload_ttl_seconds: kwargs["workload_ttl_seconds"] = workload_ttl_seconds return client.request_workload_jwt(**kwargs).jwt +def retrieve_workload_identity_jwt( + unified_job: UnifiedJob, + audience: str, + scope: str, + workload_ttl_seconds: int | None = None, +) -> str: + """Retrieve JWT token from workload claims. + Raises: + RuntimeError: if the workload identity client is not configured. + """ + return retrieve_workload_identity_jwt_with_claims( + populate_claims_for_workload(unified_job), + audience, + scope, + workload_ttl_seconds, + ) + + def with_path_cleanup(f): @functools.wraps(f) def _wrapped(self, *args, **kwargs): From e5dae3b97512be219279a16c849aee199e7ff3fe Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 17:31:49 +0200 Subject: [PATCH 06/19] feat: enable oidc testing for new credentials --- awx/api/views/__init__.py | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index e559f8401a79..62cda256cacc 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1723,9 +1723,46 @@ def post(self, request, *args, **kwargs): obj = self.get_object() backend_kwargs = request.data.get('inputs', {}) backend_kwargs.update(request.data.get('metadata', {})) + + # Add extra test functionality for OIDC-enabled credential types + response_body = {} + if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): + # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field + fields = obj.inputs.get('fields', []) + for field in fields: + if field.get('internal') and field.get('id') == 'workload_identity_token': + # Make sure that the requesting user has access to the job template + job_template_id = backend_kwargs.get('job_template_id') + if job_template_id is None: + return Response({'detail': _('Job template ID is required.')}, status=status.HTTP_400_BAD_REQUEST) + job_template = models.JobTemplate.objects.get(id=int(job_template_id)) + if not request.user.can_access(models.JobTemplate, 'read', job_template): + raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) + + claims = { + AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name, + AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id, + AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name, + AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id, + AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook, + } + # Get a Workload Identity Token + workload_identity_token = retrieve_workload_identity_jwt_with_claims( + claims=claims, + audience=backend_kwargs.get('jwt_aud'), + scope=AutomationControllerJobScope.name, + ) + backend_kwargs['workload_identity_token'] = workload_identity_token + from jwt import decode as _jwt_decode + response_body['details'] = { + 'sent_jwt_payload': _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}), + } + try: obj.plugin.backend(**backend_kwargs) - return Response({}, status=status.HTTP_202_ACCEPTED) + return Response(response_body, status=status.HTTP_202_ACCEPTED) except requests.exceptions.HTTPError as exc: message = 'HTTP {}'.format(exc.response.status_code) return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) From 8926db5118bde7ed143bcb500b649194d1b8e691 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Mon, 30 Mar 2026 17:41:32 +0200 Subject: [PATCH 07/19] chore: refactor jwt claims code in credential test endpoint --- awx/api/views/__init__.py | 56 +++++++++++++++------------------------ 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 62cda256cacc..fcbab7783f78 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -164,6 +164,24 @@ def api_exception_handler(exc, context): return exception_handler(exc, context) +def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str: + claims = { + AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name, + AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id, + AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name, + AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name, + AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id, + AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook, + } + # Get a Workload Identity Token + return retrieve_workload_identity_jwt_with_claims( + claims=claims, + audience=jwt_aud, + scope=AutomationControllerJobScope.name, + ) + + class DashboardView(APIView): deprecated = True @@ -1639,25 +1657,10 @@ def post(self, request, *args, **kwargs): if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - claims = { - AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name, - AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id, - AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name, - AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id, - AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name, - AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id, - AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook, - } - # Get a Workload Identity Token - workload_identity_token = retrieve_workload_identity_jwt_with_claims( - claims=claims, - audience=obj.get_input('jwt_aud'), - scope=AutomationControllerJobScope.name, - ) - backend_kwargs['workload_identity_token'] = workload_identity_token + backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) from jwt import decode as _jwt_decode response_body['details'] = { - 'sent_jwt_payload': _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}), + 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), } try: @@ -1739,25 +1742,10 @@ def post(self, request, *args, **kwargs): if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - claims = { - AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name, - AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id, - AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name, - AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id, - AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name, - AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id, - AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook, - } - # Get a Workload Identity Token - workload_identity_token = retrieve_workload_identity_jwt_with_claims( - claims=claims, - audience=backend_kwargs.get('jwt_aud'), - scope=AutomationControllerJobScope.name, - ) - backend_kwargs['workload_identity_token'] = workload_identity_token + backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) from jwt import decode as _jwt_decode response_body['details'] = { - 'sent_jwt_payload': _jwt_decode(workload_identity_token, algorithms=["RS256"], options={"verify_signature": False}), + 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), } try: From 12017112f9a74b5db7fbeec7e723fffafba14bfa Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Tue, 31 Mar 2026 14:20:04 +0200 Subject: [PATCH 08/19] chore: lint views init file --- awx/api/views/__init__.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index fcbab7783f78..f4f72cef6544 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1627,11 +1627,15 @@ class CredentialExternalTest(SubDetailAPIView): obj_permission_type = 'use' resource_purpose = 'test external credential' - @extend_schema_if_available(extensions={"x-ai-description": """Test update the input values and metadata of an external credential. + @extend_schema_if_available( + extensions={ + "x-ai-description": """Test update the input values and metadata of an external credential. This endpoint supports testing credentials that connect to external secret management systems such as CyberArk AIM, CyberArk Conjur, HashiCorp Vault, AWS Secrets Manager, Azure Key Vault, Centrify Vault, Thycotic DevOps Secrets Vault, and GitHub App Installation Access Token Lookup. - It does not support standard credential types such as Machine, SCM, and Cloud."""}) + It does not support standard credential types such as Machine, SCM, and Cloud.""" + } + ) def post(self, request, *args, **kwargs): obj = self.get_object() backend_kwargs = {} @@ -1656,9 +1660,10 @@ def post(self, request, *args, **kwargs): job_template = models.JobTemplate.objects.get(id=int(job_template_id)) if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - + backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) from jwt import decode as _jwt_decode + response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), } @@ -1671,7 +1676,9 @@ def post(self, request, *args, **kwargs): message = """Test operation is not supported for credential type {}. This endpoint only supports credentials that connect to external secret management systems such as CyberArk, HashiCorp - Vault, or cloud-based secret managers.""".format(obj.credential_type.kind) + Vault, or cloud-based secret managers.""".format( + obj.credential_type.kind + ) return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: message = exc.__class__.__name__ @@ -1741,9 +1748,10 @@ def post(self, request, *args, **kwargs): job_template = models.JobTemplate.objects.get(id=int(job_template_id)) if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - + backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) from jwt import decode as _jwt_decode + response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), } From 86c0364a71ec6548cbfd6e04c18022a21266e6c2 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Wed, 1 Apr 2026 14:30:20 +0200 Subject: [PATCH 09/19] fix: import AutomationControllerJobScope from dab --- awx/api/views/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index f4f72cef6544..3cb3f1790420 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -58,9 +58,10 @@ from ansible_base.lib.utils.requests import get_remote_hosts from ansible_base.rbac.models import RoleEvaluation from ansible_base.lib.utils.schema import extend_schema_if_available +from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope # AWX -from awx.main.tasks.jobs import AutomationControllerJobScope, retrieve_workload_identity_jwt_with_claims +from awx.main.tasks.jobs import retrieve_workload_identity_jwt_with_claims from awx.main.tasks.system import flag_enabled, send_notifications, update_inventory_computed_fields from awx.main.access import get_user_queryset from awx.api.generics import ( From 149773a16119c1cc4d139c7551eab529b0aa25f1 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Wed, 1 Apr 2026 15:33:44 +0200 Subject: [PATCH 10/19] chore: move retrieve_workload_identity_jwt_with_claims to a shared module --- awx/main/tasks/jobs.py | 21 +-------------------- awx/main/utils/workload_identity.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index a212357d2990..fd385b39b80f 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -94,7 +94,7 @@ # Workload Identity from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope -from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client +from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims logger = logging.getLogger('awx.main.tasks.jobs') @@ -158,25 +158,6 @@ def populate_claims_for_workload(unified_job) -> dict: return claims -def retrieve_workload_identity_jwt_with_claims( - claims: dict, - audience: str, - scope: str, - workload_ttl_seconds: int | None = None, -) -> str: - """Retrieve JWT token from workload claims. - Raises: - RuntimeError: if the workload identity client is not configured. - """ - client = get_workload_identity_client() - if client is None: - raise RuntimeError("Workload identity client is not configured") - kwargs = {"claims": claims, "scope": scope, "audience": audience} - if workload_ttl_seconds: - kwargs["workload_ttl_seconds"] = workload_ttl_seconds - return client.request_workload_jwt(**kwargs).jwt - - def retrieve_workload_identity_jwt( unified_job: UnifiedJob, audience: str, diff --git a/awx/main/utils/workload_identity.py b/awx/main/utils/workload_identity.py index e69de29bb2d1..392613b8ec82 100644 --- a/awx/main/utils/workload_identity.py +++ b/awx/main/utils/workload_identity.py @@ -0,0 +1,23 @@ +from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client + + +__all__ = ['retrieve_workload_identity_jwt_with_claims'] + + +def retrieve_workload_identity_jwt_with_claims( + claims: dict, + audience: str, + scope: str, + workload_ttl_seconds: int | None = None, +) -> str: + """Retrieve JWT token from workload claims. + Raises: + RuntimeError: if the workload identity client is not configured. + """ + client = get_workload_identity_client() + if client is None: + raise RuntimeError("Workload identity client is not configured") + kwargs = {"claims": claims, "scope": scope, "audience": audience} + if workload_ttl_seconds: + kwargs["workload_ttl_seconds"] = workload_ttl_seconds + return client.request_workload_jwt(**kwargs).jwt From 5ba4e65da60c32bc308acae5230c7e384d828883 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Wed, 1 Apr 2026 16:00:30 +0200 Subject: [PATCH 11/19] fix: surround get workload id token in try catch --- awx/api/views/__init__.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 3cb3f1790420..b8e7fe8b0c29 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -14,6 +14,7 @@ import time from base64 import b64encode from collections import OrderedDict +from jwt import decode as _jwt_decode from urllib3.exceptions import ConnectTimeoutError @@ -1662,8 +1663,12 @@ def post(self, request, *args, **kwargs): if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) - from jwt import decode as _jwt_decode + try: + backend_kwargs['workload_identity_token'] = _get_workload_identity_token( + job_template, backend_kwargs.get('jwt_aud') + ) + except RuntimeError as exc: + return Response({'detail': str(exc)}, status=status.HTTP_400_BAD_REQUEST) response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), @@ -1750,8 +1755,12 @@ def post(self, request, *args, **kwargs): if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.get('jwt_aud')) - from jwt import decode as _jwt_decode + try: + backend_kwargs['workload_identity_token'] = _get_workload_identity_token( + job_template, backend_kwargs.get('jwt_aud') + ) + except RuntimeError as exc: + return Response({'detail': str(exc)}, status=status.HTTP_400_BAD_REQUEST) response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), From 2b23b27411347bc7117011351afd945767be4526 Mon Sep 17 00:00:00 2001 From: Daniel Finca Date: Wed, 1 Apr 2026 16:31:29 +0200 Subject: [PATCH 12/19] fix: make backend return jwt payload in error as well --- awx/api/views/__init__.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index b8e7fe8b0c29..5ff6b07860ee 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1658,7 +1658,8 @@ def post(self, request, *args, **kwargs): # Make sure that the requesting user has access to the job template job_template_id = backend_kwargs.get('job_template_id') if job_template_id is None: - return Response({'detail': _('Job template ID is required.')}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = _('Job template ID is required.') + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) job_template = models.JobTemplate.objects.get(id=int(job_template_id)) if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) @@ -1668,7 +1669,8 @@ def post(self, request, *args, **kwargs): job_template, backend_kwargs.get('jwt_aud') ) except RuntimeError as exc: - return Response({'detail': str(exc)}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = str(exc) + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), @@ -1685,14 +1687,16 @@ def post(self, request, *args, **kwargs): Vault, or cloud-based secret managers.""".format( obj.credential_type.kind ) - return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = message + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: message = exc.__class__.__name__ exc_args = getattr(exc, 'args', []) for a in exc_args: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): message = str(a.reason) - return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = message + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView): @@ -1750,7 +1754,8 @@ def post(self, request, *args, **kwargs): # Make sure that the requesting user has access to the job template job_template_id = backend_kwargs.get('job_template_id') if job_template_id is None: - return Response({'detail': _('Job template ID is required.')}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = _('Job template ID is required.') + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) job_template = models.JobTemplate.objects.get(id=int(job_template_id)) if not request.user.can_access(models.JobTemplate, 'read', job_template): raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) @@ -1760,7 +1765,8 @@ def post(self, request, *args, **kwargs): job_template, backend_kwargs.get('jwt_aud') ) except RuntimeError as exc: - return Response({'detail': str(exc)}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = str(exc) + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) response_body['details'] = { 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), @@ -1771,14 +1777,16 @@ def post(self, request, *args, **kwargs): return Response(response_body, status=status.HTTP_202_ACCEPTED) except requests.exceptions.HTTPError as exc: message = 'HTTP {}'.format(exc.response.status_code) - return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = message + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: message = exc.__class__.__name__ args_exc = getattr(exc, 'args', []) for a in args_exc: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): message = str(a.reason) - return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) + response_body['details']['error_message'] = message + return Response(response_body, status=status.HTTP_400_BAD_REQUEST) class HostRelatedSearchMixin(object): From be8059f61c71a2db9bf44dd9b2b427718cf87a7f Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 10:06:09 -0400 Subject: [PATCH 13/19] Fix bugs and refactor OIDC credential test endpoints - Fix KeyError when building error responses (8 occurrences) - Use .pop() to prevent passing job_template_id/jwt_aud to backend - Add exception handling for ValueError and JobTemplate.DoesNotExist - Fix Django translation pattern (use % formatting instead of f-strings) - Refactor duplicated OIDC logic into _handle_oidc_credential_test() --- awx/api/views/__init__.py | 126 ++++++++++++++++++-------------------- 1 file changed, 61 insertions(+), 65 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 5ff6b07860ee..0dc07f38cd37 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -184,6 +184,54 @@ def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) ) +def _handle_oidc_credential_test(credential_type_inputs, backend_kwargs, request): + """ + Handle OIDC workload identity token generation for external credential test endpoints. + + Returns: + tuple: (response_body dict, error Response or None) + Modifies backend_kwargs in place to add workload_identity_token. + """ + response_body = {} + + if not flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): + return response_body, None + + # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field + fields = credential_type_inputs.get('fields', []) + for field in fields: + if field.get('internal') and field.get('id') == 'workload_identity_token': + # Make sure that the requesting user has access to the job template + job_template_id = backend_kwargs.pop('job_template_id', None) + if job_template_id is None: + response_body['details'] = {'error_message': _('Job template ID is required.')} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + + try: + job_template = models.JobTemplate.objects.get(id=int(job_template_id)) + except ValueError: + response_body['details'] = {'error_message': _('Job template ID must be an integer.')} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + except models.JobTemplate.DoesNotExist: + response_body['details'] = {'error_message': _('Job template with ID %(id)s does not exist.') % {'id': job_template_id}} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + + if not request.user.can_access(models.JobTemplate, 'read', job_template): + raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id}) + + try: + backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None)) + except RuntimeError as exc: + response_body['details'] = {'error_message': str(exc)} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + + response_body['details'] = { + 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), + } + + return response_body, None + + class DashboardView(APIView): deprecated = True @@ -1629,15 +1677,11 @@ class CredentialExternalTest(SubDetailAPIView): obj_permission_type = 'use' resource_purpose = 'test external credential' - @extend_schema_if_available( - extensions={ - "x-ai-description": """Test update the input values and metadata of an external credential. + @extend_schema_if_available(extensions={"x-ai-description": """Test update the input values and metadata of an external credential. This endpoint supports testing credentials that connect to external secret management systems such as CyberArk AIM, CyberArk Conjur, HashiCorp Vault, AWS Secrets Manager, Azure Key Vault, Centrify Vault, Thycotic DevOps Secrets Vault, and GitHub App Installation Access Token Lookup. - It does not support standard credential types such as Machine, SCM, and Cloud.""" - } - ) + It does not support standard credential types such as Machine, SCM, and Cloud."""}) def post(self, request, *args, **kwargs): obj = self.get_object() backend_kwargs = {} @@ -1649,32 +1693,9 @@ def post(self, request, *args, **kwargs): backend_kwargs.update(request.data.get('metadata', {})) # Add extra test functionality for OIDC-enabled credential types - response_body = {} - if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): - # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field - fields = obj.credential_type.inputs.get('fields', []) - for field in fields: - if field.get('internal') and field.get('id') == 'workload_identity_token': - # Make sure that the requesting user has access to the job template - job_template_id = backend_kwargs.get('job_template_id') - if job_template_id is None: - response_body['details']['error_message'] = _('Job template ID is required.') - return Response(response_body, status=status.HTTP_400_BAD_REQUEST) - job_template = models.JobTemplate.objects.get(id=int(job_template_id)) - if not request.user.can_access(models.JobTemplate, 'read', job_template): - raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - - try: - backend_kwargs['workload_identity_token'] = _get_workload_identity_token( - job_template, backend_kwargs.get('jwt_aud') - ) - except RuntimeError as exc: - response_body['details']['error_message'] = str(exc) - return Response(response_body, status=status.HTTP_400_BAD_REQUEST) - - response_body['details'] = { - 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), - } + response_body, error_response = _handle_oidc_credential_test(obj.credential_type.inputs, backend_kwargs, request) + if error_response: + return error_response try: with set_environ(**settings.AWX_TASK_ENV): @@ -1684,10 +1705,8 @@ def post(self, request, *args, **kwargs): message = """Test operation is not supported for credential type {}. This endpoint only supports credentials that connect to external secret management systems such as CyberArk, HashiCorp - Vault, or cloud-based secret managers.""".format( - obj.credential_type.kind - ) - response_body['details']['error_message'] = message + Vault, or cloud-based secret managers.""".format(obj.credential_type.kind) + response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: message = exc.__class__.__name__ @@ -1695,7 +1714,7 @@ def post(self, request, *args, **kwargs): for a in exc_args: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): message = str(a.reason) - response_body['details']['error_message'] = message + response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) @@ -1745,39 +1764,16 @@ def post(self, request, *args, **kwargs): backend_kwargs.update(request.data.get('metadata', {})) # Add extra test functionality for OIDC-enabled credential types - response_body = {} - if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): - # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field - fields = obj.inputs.get('fields', []) - for field in fields: - if field.get('internal') and field.get('id') == 'workload_identity_token': - # Make sure that the requesting user has access to the job template - job_template_id = backend_kwargs.get('job_template_id') - if job_template_id is None: - response_body['details']['error_message'] = _('Job template ID is required.') - return Response(response_body, status=status.HTTP_400_BAD_REQUEST) - job_template = models.JobTemplate.objects.get(id=int(job_template_id)) - if not request.user.can_access(models.JobTemplate, 'read', job_template): - raise PermissionDenied(_(f'You do not have access to job template with id: {job_template.id}.')) - - try: - backend_kwargs['workload_identity_token'] = _get_workload_identity_token( - job_template, backend_kwargs.get('jwt_aud') - ) - except RuntimeError as exc: - response_body['details']['error_message'] = str(exc) - return Response(response_body, status=status.HTTP_400_BAD_REQUEST) - - response_body['details'] = { - 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), - } + response_body, error_response = _handle_oidc_credential_test(obj.inputs, backend_kwargs, request) + if error_response: + return error_response try: obj.plugin.backend(**backend_kwargs) return Response(response_body, status=status.HTTP_202_ACCEPTED) except requests.exceptions.HTTPError as exc: message = 'HTTP {}'.format(exc.response.status_code) - response_body['details']['error_message'] = message + response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: message = exc.__class__.__name__ @@ -1785,7 +1781,7 @@ def post(self, request, *args, **kwargs): for a in args_exc: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): message = str(a.reason) - response_body['details']['error_message'] = message + response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) From ec09e1e2a3dff06d60467b0b334866b74e0894b2 Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 10:15:59 -0400 Subject: [PATCH 14/19] Fix potential NoneType error in CredentialType serializer --- awx/api/serializers.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 18bbc7b59f0d..dd03400bdc34 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2934,9 +2934,13 @@ def to_representation(self, data): field['help_text'] = _(field['help_text']) # Deep copy inputs to avoid modifying the original model data - if value.get('inputs'): - value['inputs'] = copy.deepcopy(value['inputs']) + inputs = value.get('inputs') + if not isinstance(inputs, dict): + inputs = {} + value['inputs'] = copy.deepcopy(inputs) fields = value['inputs'].get('fields', []) + if not isinstance(fields, list): + fields = [] # Filter out internal fields from the API response value['inputs']['fields'] = [f for f in fields if not f.get('internal')] From ee086dcf4209cdfd1b5dcfa865e26353d7da92d5 Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 14:23:33 -0400 Subject: [PATCH 15/19] Add comprehensive unit tests --- .../functional/api/test_credential_type.py | 96 ++++++- .../api/test_oidc_credential_test.py | 260 ++++++++++++++++++ awx/main/utils/workload_identity.py | 1 - 3 files changed, 355 insertions(+), 2 deletions(-) create mode 100644 awx/main/tests/functional/api/test_oidc_credential_test.py diff --git a/awx/main/tests/functional/api/test_credential_type.py b/awx/main/tests/functional/api/test_credential_type.py index ed0f1e9f28fb..3a92ce41899d 100644 --- a/awx/main/tests/functional/api/test_credential_type.py +++ b/awx/main/tests/functional/api/test_credential_type.py @@ -159,7 +159,8 @@ def test_create_as_admin(get, post, admin): response = get(reverse('api:credential_type_list'), admin) assert response.data['count'] == 1 assert response.data['results'][0]['name'] == 'Custom Credential Type' - assert response.data['results'][0]['inputs'] == {} + # Serializer normalizes empty inputs to {'fields': []} + assert response.data['results'][0]['inputs'] == {'fields': []} assert response.data['results'][0]['injectors'] == {} assert response.data['results'][0]['managed'] is False @@ -474,3 +475,96 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e data = {'inputs': {}, 'metadata': {}} assert post(url, data, admin).status_code == 202 assert post(url, data, alice).status_code == 403 + + +# --- Tests for internal field filtering with None/invalid inputs --- + + +@pytest.mark.django_db +def test_credential_type_with_none_inputs(get, admin): + """Test that credential type with None inputs doesn't cause NoneType error.""" + # Create a credential type with None inputs (edge case) + ct = CredentialType.objects.create( + kind='cloud', + name='Test Type', + managed=False, + inputs=None, # This could happen in edge cases + ) + + url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk}) + response = get(url, admin) + assert response.status_code == 200 + # Should have normalized inputs to empty dict + assert 'inputs' in response.data + assert isinstance(response.data['inputs'], dict) + assert response.data['inputs']['fields'] == [] + + +@pytest.mark.django_db +def test_credential_type_with_invalid_inputs_type(get, admin): + """Test that credential type with non-dict inputs doesn't cause errors.""" + # Create a credential type with invalid inputs type + ct = CredentialType.objects.create(kind='cloud', name='Test Type', managed=False, inputs={'fields': 'not-a-list'}) + + url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk}) + response = get(url, admin) + assert response.status_code == 200 + # Should gracefully handle invalid fields type + assert 'inputs' in response.data + assert response.data['inputs']['fields'] == [] + + +@pytest.mark.django_db +def test_credential_type_filters_internal_fields(get, admin): + """Test that internal fields are filtered from API responses.""" + ct = CredentialType.objects.create( + kind='cloud', + name='Test OIDC Type', + managed=False, + inputs={ + 'fields': [ + {'id': 'url', 'label': 'URL', 'type': 'string'}, + {'id': 'token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True}, + {'id': 'public_field', 'label': 'Public', 'type': 'string'}, + ] + }, + ) + + url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk}) + response = get(url, admin) + assert response.status_code == 200 + + field_ids = [f['id'] for f in response.data['inputs']['fields']] + # Internal field should be filtered out + assert 'token' not in field_ids + assert 'url' in field_ids + assert 'public_field' in field_ids + + +@pytest.mark.django_db +def test_credential_type_list_filters_internal_fields(get, admin): + """Test that internal fields are filtered in list view.""" + ct = CredentialType.objects.create( + kind='cloud', + name='Test OIDC Type', + managed=False, + inputs={ + 'fields': [ + {'id': 'url', 'label': 'URL', 'type': 'string'}, + {'id': 'workload_identity_token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True}, + ] + }, + ) + + url = reverse('api:credential_type_list') + response = get(url, admin) + assert response.status_code == 200 + + # Find our credential type in the results + test_ct = next((ct for ct in response.data['results'] if ct['name'] == 'Test OIDC Type'), None) + assert test_ct is not None + + field_ids = [f['id'] for f in test_ct['inputs']['fields']] + # Internal field should be filtered out + assert 'workload_identity_token' not in field_ids + assert 'url' in field_ids diff --git a/awx/main/tests/functional/api/test_oidc_credential_test.py b/awx/main/tests/functional/api/test_oidc_credential_test.py new file mode 100644 index 000000000000..1e879eb9b464 --- /dev/null +++ b/awx/main/tests/functional/api/test_oidc_credential_test.py @@ -0,0 +1,260 @@ +""" +Tests for OIDC workload identity credential test endpoints. + +Tests the /api/v2/credentials//test/ and /api/v2/credential_types//test/ +endpoints when used with OIDC-enabled credential types. +""" + +import pytest +from unittest import mock + +from django.test import override_settings +from rest_framework import status + +from awx.main.models import Credential, CredentialType, JobTemplate +from awx.api.versioning import reverse + + +@pytest.fixture +def job_template(organization, project): + """Job template with organization and project for OIDC JWT generation.""" + return JobTemplate.objects.create(name='test-jt', organization=organization, project=project, playbook='helloworld.yml') + + +@pytest.fixture +def oidc_credentialtype(): + """Create a credential type with workload_identity_token internal field.""" + oidc_type_inputs = { + 'fields': [ + {'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'}, + {'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'}, + {'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'}, + {'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'}, + {'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True}, + ], + 'metadata': [ + {'id': 'secret_path', 'label': 'Secret Path', 'type': 'string'}, + {'id': 'job_template_id', 'label': 'Job Template ID', 'type': 'string'}, + ], + 'required': ['url', 'auth_path', 'role_id'], + } + + class MockPlugin(object): + def backend(self, **kwargs): + # Simulate successful backend call + return 'secret' + + with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin: + mock_plugin.return_value = MockPlugin() + oidc_type = CredentialType(kind='external', managed=True, namespace='hashivault-kv-oidc', name='HashiCorp Vault KV (OIDC)', inputs=oidc_type_inputs) + oidc_type.save() + yield oidc_type + + +@pytest.fixture +def oidc_credential(oidc_credentialtype): + """Create a credential using the OIDC credential type.""" + return Credential.objects.create( + credential_type=oidc_credentialtype, + name='oidc-vault-cred', + inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'}, + ) + + +@pytest.fixture +def mock_oidc_backend(): + """Fixture that mocks OIDC JWT generation and credential backend.""" + with mock.patch('awx.api.views.retrieve_workload_identity_jwt_with_claims') as mock_jwt, mock.patch('awx.api.views._jwt_decode') as mock_decode, mock.patch( + 'awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock + ) as mock_plugin: + + # Set default return values + mock_jwt.return_value = 'fake.jwt.token' + mock_decode.return_value = {'iss': 'http://gateway/o', 'aud': 'vault'} + + # Create mock backend + mock_backend = mock.MagicMock() + mock_backend.backend.return_value = 'secret' + mock_plugin.return_value = mock_backend + + # Yield all mocks for test customization + yield { + 'jwt': mock_jwt, + 'decode': mock_decode, + 'plugin': mock_plugin, + 'backend': mock_backend, + } + + +# --- Tests for CredentialExternalTest endpoint --- + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=False) +def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential): + """Test that credential test works without OIDC feature flag enabled.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': '1'}} + + with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin: + mock_backend = mock.MagicMock() + mock_backend.backend.return_value = 'secret' + mock_plugin.return_value = mock_backend + + response = post(url, data, admin) + assert response.status_code == 202 + # Should not contain JWT payload when feature flag is disabled + assert 'details' not in response.data or 'sent_jwt_payload' not in response.data.get('details', {}) + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +@pytest.mark.parametrize( + 'job_template_id, expected_error', + [ + (None, 'Job template ID is required'), + ('not-an-integer', 'must be an integer'), + ('99999', 'does not exist'), + ], + ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'], +) +def test_credential_test_job_template_validation(post, admin, oidc_credential, job_template_id, expected_error): + """Test that invalid job_template_id values return 400 with appropriate error messages.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret'}} + if job_template_id is not None: + data['metadata']['job_template_id'] = job_template_id + + response = post(url, data, admin) + assert response.status_code == 400 + assert 'details' in response.data + assert 'error_message' in response.data['details'] + assert expected_error in response.data['details']['error_message'] + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_test_no_access_to_job_template(post, alice, oidc_credential, job_template): + """Test that user without access to job template gets 403.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} + + # Give alice use permission on credential but not on job template + oidc_credential.use_role.members.add(alice) + + response = post(url, data, alice) + assert response.status_code == 403 + assert 'You do not have access to job template' in str(response.data) + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_test_success_returns_jwt_payload(post, admin, oidc_credential, job_template, mock_oidc_backend): + """Test that successful test returns JWT payload in response.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} + + # Customize mock for this test + mock_oidc_backend['decode'].return_value = { + 'iss': 'http://gateway/o', + 'sub': 'system:serviceaccount:default:awx-operator', + 'aud': 'vault', + 'job_template_id': job_template.id, + } + + response = post(url, data, admin) + assert response.status_code == 202 + assert 'details' in response.data + assert 'sent_jwt_payload' in response.data['details'] + assert response.data['details']['sent_jwt_payload']['job_template_id'] == job_template.id + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_test_backend_failure_returns_jwt_and_error(post, admin, oidc_credential, job_template, mock_oidc_backend): + """Test that backend failure still returns JWT payload along with error message.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} + + # Make backend fail + mock_oidc_backend['backend'].backend.side_effect = RuntimeError('Connection failed') + + response = post(url, data, admin) + assert response.status_code == 400 + assert 'details' in response.data + # Both JWT payload and error message should be present + assert 'sent_jwt_payload' in response.data['details'] + assert 'error_message' in response.data['details'] + assert 'Connection failed' in response.data['details']['error_message'] + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_test_jwt_generation_failure(post, admin, oidc_credential, job_template): + """Test that JWT generation failure returns error without JWT payload.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} + + with mock.patch('awx.api.views._get_workload_identity_token') as mock_jwt: + mock_jwt.side_effect = RuntimeError('Failed to generate JWT') + + response = post(url, data, admin) + assert response.status_code == 400 + assert 'details' in response.data + assert 'error_message' in response.data['details'] + assert 'Failed to generate JWT' in response.data['details']['error_message'] + # No JWT payload when generation fails + assert 'sent_jwt_payload' not in response.data['details'] + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_test_job_template_id_not_passed_to_backend(post, admin, oidc_credential, job_template, mock_oidc_backend): + """Test that job_template_id and jwt_aud are removed from backend_kwargs.""" + url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) + data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} + + response = post(url, data, admin) + assert response.status_code == 202 + + # Check that backend was called without job_template_id or jwt_aud + call_kwargs = mock_oidc_backend['backend'].backend.call_args[1] + assert 'job_template_id' not in call_kwargs + assert 'jwt_aud' not in call_kwargs + assert 'workload_identity_token' in call_kwargs + + +# --- Tests for CredentialTypeExternalTest endpoint --- + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_type_test_missing_job_template_id(post, admin, oidc_credentialtype): + """Test that missing job_template_id returns 400 for credential type test endpoint.""" + url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk}) + data = { + 'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'}, + 'metadata': {'secret_path': 'test/secret'}, + } + + response = post(url, data, admin) + assert response.status_code == 400 + assert 'details' in response.data + assert 'error_message' in response.data['details'] + assert 'Job template ID is required' in response.data['details']['error_message'] + + +@pytest.mark.django_db +@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +def test_credential_type_test_success_returns_jwt_payload(post, admin, oidc_credentialtype, job_template, mock_oidc_backend): + """Test that successful credential type test returns JWT payload.""" + url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk}) + data = { + 'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'}, + 'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}, + } + + response = post(url, data, admin) + assert response.status_code == 202 + assert 'details' in response.data + assert 'sent_jwt_payload' in response.data['details'] diff --git a/awx/main/utils/workload_identity.py b/awx/main/utils/workload_identity.py index 392613b8ec82..50582e224597 100644 --- a/awx/main/utils/workload_identity.py +++ b/awx/main/utils/workload_identity.py @@ -1,6 +1,5 @@ from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client - __all__ = ['retrieve_workload_identity_jwt_with_claims'] From 91963111c5f6be9e12fa8c83b51ec6faa6e76aac Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 14:32:59 -0400 Subject: [PATCH 16/19] Fix api-lint errors --- awx/main/tests/functional/api/test_credential_type.py | 2 +- awx/main/tests/functional/api/test_oidc_credential_test.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/awx/main/tests/functional/api/test_credential_type.py b/awx/main/tests/functional/api/test_credential_type.py index 3a92ce41899d..7984fc3285a3 100644 --- a/awx/main/tests/functional/api/test_credential_type.py +++ b/awx/main/tests/functional/api/test_credential_type.py @@ -544,7 +544,7 @@ def test_credential_type_filters_internal_fields(get, admin): @pytest.mark.django_db def test_credential_type_list_filters_internal_fields(get, admin): """Test that internal fields are filtered in list view.""" - ct = CredentialType.objects.create( + CredentialType.objects.create( kind='cloud', name='Test OIDC Type', managed=False, diff --git a/awx/main/tests/functional/api/test_oidc_credential_test.py b/awx/main/tests/functional/api/test_oidc_credential_test.py index 1e879eb9b464..56bba91b872c 100644 --- a/awx/main/tests/functional/api/test_oidc_credential_test.py +++ b/awx/main/tests/functional/api/test_oidc_credential_test.py @@ -9,7 +9,6 @@ from unittest import mock from django.test import override_settings -from rest_framework import status from awx.main.models import Credential, CredentialType, JobTemplate from awx.api.versioning import reverse From 79cc79dfe23823c1c3db9bd19deec93c26211bd3 Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 14:54:42 -0400 Subject: [PATCH 17/19] Fix mock in tests --- .../functional/api/test_credential_type.py | 6 ++-- .../api/test_oidc_credential_test.py | 32 +++++++++---------- awx/main/tests/unit/tasks/test_jobs.py | 8 ++--- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/awx/main/tests/functional/api/test_credential_type.py b/awx/main/tests/functional/api/test_credential_type.py index 7984fc3285a3..29f1875f438e 100644 --- a/awx/main/tests/functional/api/test_credential_type.py +++ b/awx/main/tests/functional/api/test_credential_type.py @@ -482,13 +482,13 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e @pytest.mark.django_db def test_credential_type_with_none_inputs(get, admin): - """Test that credential type with None inputs doesn't cause NoneType error.""" - # Create a credential type with None inputs (edge case) + """Test that credential type with empty inputs dict works correctly.""" + # Create a credential type with empty dict ct = CredentialType.objects.create( kind='cloud', name='Test Type', managed=False, - inputs=None, # This could happen in edge cases + inputs={}, # Empty dict, not None (DB has NOT NULL constraint) ) url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk}) diff --git a/awx/main/tests/functional/api/test_oidc_credential_test.py b/awx/main/tests/functional/api/test_oidc_credential_test.py index 56bba91b872c..d7147a57fbb6 100644 --- a/awx/main/tests/functional/api/test_oidc_credential_test.py +++ b/awx/main/tests/functional/api/test_oidc_credential_test.py @@ -107,7 +107,7 @@ def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential) @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) +@mock.patch('awx.api.views.flag_enabled', return_value=True) @pytest.mark.parametrize( 'job_template_id, expected_error', [ @@ -117,7 +117,7 @@ def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential) ], ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'], ) -def test_credential_test_job_template_validation(post, admin, oidc_credential, job_template_id, expected_error): +def test_credential_test_job_template_validation(mock_flag, post, admin, oidc_credential, job_template_id, expected_error): """Test that invalid job_template_id values return 400 with appropriate error messages.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret'}} @@ -132,8 +132,8 @@ def test_credential_test_job_template_validation(post, admin, oidc_credential, j @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_test_no_access_to_job_template(post, alice, oidc_credential, job_template): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_test_no_access_to_job_template(mock_flag, post, alice, oidc_credential, job_template): """Test that user without access to job template gets 403.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} @@ -147,8 +147,8 @@ def test_credential_test_no_access_to_job_template(post, alice, oidc_credential, @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_test_success_returns_jwt_payload(post, admin, oidc_credential, job_template, mock_oidc_backend): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend): """Test that successful test returns JWT payload in response.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} @@ -169,8 +169,8 @@ def test_credential_test_success_returns_jwt_payload(post, admin, oidc_credentia @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_test_backend_failure_returns_jwt_and_error(post, admin, oidc_credential, job_template, mock_oidc_backend): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_test_backend_failure_returns_jwt_and_error(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend): """Test that backend failure still returns JWT payload along with error message.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} @@ -188,8 +188,8 @@ def test_credential_test_backend_failure_returns_jwt_and_error(post, admin, oidc @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_test_jwt_generation_failure(post, admin, oidc_credential, job_template): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_credential, job_template): """Test that JWT generation failure returns error without JWT payload.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} @@ -207,8 +207,8 @@ def test_credential_test_jwt_generation_failure(post, admin, oidc_credential, jo @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_test_job_template_id_not_passed_to_backend(post, admin, oidc_credential, job_template, mock_oidc_backend): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend): """Test that job_template_id and jwt_aud are removed from backend_kwargs.""" url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk}) data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}} @@ -227,8 +227,8 @@ def test_credential_test_job_template_id_not_passed_to_backend(post, admin, oidc @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_type_test_missing_job_template_id(post, admin, oidc_credentialtype): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oidc_credentialtype): """Test that missing job_template_id returns 400 for credential type test endpoint.""" url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk}) data = { @@ -244,8 +244,8 @@ def test_credential_type_test_missing_job_template_id(post, admin, oidc_credenti @pytest.mark.django_db -@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=True) -def test_credential_type_test_success_returns_jwt_payload(post, admin, oidc_credentialtype, job_template, mock_oidc_backend): +@mock.patch('awx.api.views.flag_enabled', return_value=True) +def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend): """Test that successful credential type test returns JWT payload.""" url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk}) data = { diff --git a/awx/main/tests/unit/tasks/test_jobs.py b/awx/main/tests/unit/tasks/test_jobs.py index bcc6f4d0fd52..e4df52b63f16 100644 --- a/awx/main/tests/unit/tasks/test_jobs.py +++ b/awx/main/tests/unit/tasks/test_jobs.py @@ -473,7 +473,7 @@ def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims): assert claims == expected_claims -@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') +@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client): """retrieve_workload_identity_jwt returns the JWT string from the client.""" mock_client = mock.MagicMock() @@ -502,7 +502,7 @@ def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client) assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job' -@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') +@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client): """retrieve_workload_identity_jwt passes audience and scope to the client.""" mock_client = mock.MagicMock() @@ -518,7 +518,7 @@ def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_clien mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience) -@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') +@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client): """retrieve_workload_identity_jwt passes workload_ttl_seconds when provided.""" mock_client = mock.Mock() @@ -542,7 +542,7 @@ def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client): ) -@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') +@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client): """retrieve_workload_identity_jwt raises RuntimeError when client is None.""" mock_get_client.return_value = None From d015226072bc4cadb5b2679e5d5a589b4e9e3c4c Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 15:24:15 -0400 Subject: [PATCH 18/19] Use exception message if available --- awx/api/views/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0dc07f38cd37..728375e71a21 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -1709,7 +1709,8 @@ def post(self, request, *args, **kwargs): response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: - message = exc.__class__.__name__ + # Use the exception message if available, otherwise fall back to the class name + message = str(exc) if str(exc) else exc.__class__.__name__ exc_args = getattr(exc, 'args', []) for a in exc_args: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): @@ -1776,7 +1777,8 @@ def post(self, request, *args, **kwargs): response_body.setdefault('details', {})['error_message'] = message return Response(response_body, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: - message = exc.__class__.__name__ + # Use the exception message if available, otherwise fall back to the class name + message = str(exc) if str(exc) else exc.__class__.__name__ args_exc = getattr(exc, 'args', []) for a in args_exc: if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): From c154378516e70e8f0e462e18c8bb7bb4c40a4a1a Mon Sep 17 00:00:00 2001 From: melissalkelly Date: Thu, 2 Apr 2026 15:55:05 -0400 Subject: [PATCH 19/19] Address failing SonarCloud --- awx/api/views/__init__.py | 79 +++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 728375e71a21..b31888662350 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -184,6 +184,34 @@ def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) ) +def _validate_and_get_job_template(job_template_id): + """Validate job template ID and return the JobTemplate instance. + + Returns: + tuple: (JobTemplate instance or None, error_message or None) + """ + if job_template_id is None: + return None, _('Job template ID is required.') + + try: + return models.JobTemplate.objects.get(id=int(job_template_id)), None + except ValueError: + return None, _('Job template ID must be an integer.') + except models.JobTemplate.DoesNotExist: + return None, _('Job template with ID %(id)s does not exist.') % {'id': job_template_id} + + +def _decode_jwt_payload_for_display(jwt_token): + """Decode JWT payload for display purposes only (signature not verified). + + This is safe because the JWT was just created by AWX and is only decoded + to show the user what claims are being sent to the external system. + The external system will perform proper signature verification. + """ + # NOSONAR - Signature verification intentionally disabled for display-only decoding + return _jwt_decode(jwt_token, algorithms=["RS256"], options={"verify_signature": False}) + + def _handle_oidc_credential_test(credential_type_inputs, backend_kwargs, request): """ Handle OIDC workload identity token generation for external credential test endpoints. @@ -197,37 +225,32 @@ def _handle_oidc_credential_test(credential_type_inputs, backend_kwargs, request if not flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'): return response_body, None - # Get a Workload Identity Token if credential contains an internal 'workload_identity_token' field + # Check if credential type has an internal workload_identity_token field fields = credential_type_inputs.get('fields', []) - for field in fields: - if field.get('internal') and field.get('id') == 'workload_identity_token': - # Make sure that the requesting user has access to the job template - job_template_id = backend_kwargs.pop('job_template_id', None) - if job_template_id is None: - response_body['details'] = {'error_message': _('Job template ID is required.')} - return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) - - try: - job_template = models.JobTemplate.objects.get(id=int(job_template_id)) - except ValueError: - response_body['details'] = {'error_message': _('Job template ID must be an integer.')} - return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) - except models.JobTemplate.DoesNotExist: - response_body['details'] = {'error_message': _('Job template with ID %(id)s does not exist.') % {'id': job_template_id}} - return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + has_oidc_field = any(field.get('internal') and field.get('id') == 'workload_identity_token' for field in fields) - if not request.user.can_access(models.JobTemplate, 'read', job_template): - raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id}) - - try: - backend_kwargs['workload_identity_token'] = _get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None)) - except RuntimeError as exc: - response_body['details'] = {'error_message': str(exc)} - return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + if not has_oidc_field: + return response_body, None - response_body['details'] = { - 'sent_jwt_payload': _jwt_decode(backend_kwargs['workload_identity_token'], algorithms=["RS256"], options={"verify_signature": False}), - } + # Validate job template + job_template_id = backend_kwargs.pop('job_template_id', None) + job_template, error_msg = _validate_and_get_job_template(job_template_id) + if error_msg: + response_body['details'] = {'error_message': error_msg} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) + + # Check user access + if not request.user.can_access(models.JobTemplate, 'read', job_template): + raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id}) + + # Generate workload identity token + try: + jwt_token = _get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None)) + backend_kwargs['workload_identity_token'] = jwt_token + response_body['details'] = {'sent_jwt_payload': _decode_jwt_payload_for_display(jwt_token)} + except RuntimeError as exc: + response_body['details'] = {'error_message': str(exc)} + return response_body, Response(response_body, status=status.HTTP_400_BAD_REQUEST) return response_body, None