From ca1edc27591716854aca754ded558130fd1c7112 Mon Sep 17 00:00:00 2001 From: Herbert Damker <52109189+hdamker@users.noreply.github.com> Date: Sun, 13 Jul 2025 19:53:11 +0200 Subject: [PATCH 1/3] Refinement of detect_api_type --- scripts/api_review_validator_v0_6.py | 49 +++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/scripts/api_review_validator_v0_6.py b/scripts/api_review_validator_v0_6.py index 0ef8641..5c85d08 100644 --- a/scripts/api_review_validator_v0_6.py +++ b/scripts/api_review_validator_v0_6.py @@ -147,6 +147,7 @@ def __init__(self, commonalities_version: str = "0.6", review_type: str = "relea self.implemented_version = "0.6" # This validator only implements v0.6 rules self.api_spec = None # Will store the API spec for reference resolution self.review_type = review_type # Store review type for validation behavior + self._current_api_name = None # Store current API name for type detection # Warn if requested version doesn't match implemented version if self.expected_commonalities_version != self.implemented_version: @@ -223,12 +224,12 @@ def validate_api_file(self, file_path: str) -> ValidationResult: "Ensure servers[*].url follows format: {apiRoot}//" )) - result.api_name = api_name + self._current_api_name = api_name result.version = info.get('version', 'unknown') # Detect API type first for targeted validation - result.api_type = self._detect_api_type(api_spec) + result.api_type = self._detect_api_type(api_spec, api_name) result.checks_performed.append(f"API type detection: {result.api_type.value}") # Check for Commonalities version mismatch @@ -304,10 +305,23 @@ def _get_manual_checks_for_type(self, api_type: APIType) -> List[str]: return common_checks - def _detect_api_type(self, api_spec: dict) -> APIType: + def _detect_api_type(self, api_spec: dict, api_name: str = None) -> APIType: """Enhanced API type detection with better subscription pattern recognition""" paths = api_spec.get('paths', {}) + # Use provided api_name or fall back to stored one + if api_name is None and hasattr(self, '_current_api_name'): + api_name = self._current_api_name + + # Check if API name follows subscription API naming convention + is_subscription_api_by_name = False + if api_name: + api_name_lower = api_name.lower() + is_subscription_api_by_name = ( + api_name_lower.endswith('-subscriptions') or + api_name_lower.endswith('_subscriptions') + ) + # Check for explicit subscription endpoints subscription_patterns = ['/subscriptions', '/subscription'] for path in paths.keys(): @@ -340,14 +354,31 @@ def _detect_api_type(self, api_spec: dict) -> APIType: # Check components for subscription-related schemas components = api_spec.get('components', {}) schemas = components.get('schemas', {}) + has_event_subscription_schema = False + has_event_schema = False + for schema_name, schema_def in schemas.items(): schema_name_lower = schema_name.lower() - if any(keyword in schema_name_lower for keyword in ['subscription', 'webhook', 'event', 'notification']): - if 'subscription' in schema_name_lower: - return APIType.EXPLICIT_SUBSCRIPTION - else: - return APIType.IMPLICIT_SUBSCRIPTION - + + # Check for event/notification schemas + if any(keyword in schema_name_lower for keyword in ['webhook', 'event', 'notification', 'cloudevent']): + has_event_schema = True + + # Check for subscription schemas with SubscriptionId property + elif 'subscription' in schema_name_lower: + if isinstance(schema_def, dict): + properties = schema_def.get('properties', {}) + # Event subscription schemas have SubscriptionId + if 'subscriptionId' in properties or 'SubscriptionId' in properties: + has_event_subscription_schema = True + + # Make decision based on multiple factors + if has_event_subscription_schema and (is_subscription_api_by_name or + any('/subscription' in path for path in paths.keys())): + return APIType.EXPLICIT_SUBSCRIPTION + elif has_event_schema: + return APIType.IMPLICIT_SUBSCRIPTION + return APIType.REGULAR def _validate_info_object(self, api_spec: dict, result: ValidationResult): From 3234ae65a7452bebd91bacb9c150dfda72a015db Mon Sep 17 00:00:00 2001 From: Herbert Damker <52109189+hdamker@users.noreply.github.com> Date: Sun, 13 Jul 2025 20:09:31 +0200 Subject: [PATCH 2/3] Update api_review_validator_v0_6.py Add accidentially removed line --- scripts/api_review_validator_v0_6.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/api_review_validator_v0_6.py b/scripts/api_review_validator_v0_6.py index 5c85d08..5fccf8e 100644 --- a/scripts/api_review_validator_v0_6.py +++ b/scripts/api_review_validator_v0_6.py @@ -225,6 +225,7 @@ def validate_api_file(self, file_path: str) -> ValidationResult: )) self._current_api_name = api_name + result.api_name = api_name result.version = info.get('version', 'unknown') From 248d5cd4d082746ff82bc10bc436d8b70f671ce4 Mon Sep 17 00:00:00 2001 From: Herbert Damker <52109189+hdamker@users.noreply.github.com> Date: Sun, 13 Jul 2025 21:45:36 +0200 Subject: [PATCH 3/3] Update api_review_validator_v0_6.py --- scripts/api_review_validator_v0_6.py | 124 ++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 3 deletions(-) diff --git a/scripts/api_review_validator_v0_6.py b/scripts/api_review_validator_v0_6.py index 5fccf8e..22745a1 100644 --- a/scripts/api_review_validator_v0_6.py +++ b/scripts/api_review_validator_v0_6.py @@ -998,9 +998,20 @@ def _validate_notifications_bearer_auth_scheme(self, scheme_def: dict, scheme_na f"components.securitySchemes.{scheme_name}.bearerFormat" )) - def _validate_operation_security(self, operation: dict, operation_name: str, result: ValidationResult): - """Validate operation-level security requirements for callbacks and OpenID Connect usage""" - security = operation.get('security') + def _validate_operation_security(self, operation: dict, path: str, method: str, + api_name: str, result: ValidationResult): + """Validate security settings for an operation""" + + # Detect API type first + api_type = self._detect_api_type(self.api_spec, api_name) + + # For explicit subscription APIs, use special validation + if api_type == APIType.EXPLICIT_SUBSCRIPTION: + self._validate_explicit_subscription_scopes(operation, path, method, api_name, result) + return + + # For other API types, continue with existing validation logic + security = operation.get('security', []) # Check if this is a callback operation (different security rules) is_callback = 'callbacks' in operation_name.lower() or 'notification' in operation_name.lower() @@ -1055,6 +1066,113 @@ def _validate_operation_security(self, operation: dict, operation_name: str, res f"{operation_name}.security" )) + def _validate_explicit_subscription_scopes(self, operation: dict, path: str, method: str, + api_name: str, result: ValidationResult): + """Validate scopes for explicit subscription APIs according to CAMARA guidelines + + For explicit subscription APIs: + - CREATE operations: api-name:event-type:create + - READ operations: api-name:read + - DELETE operations: api-name:delete + """ + security = operation.get('security', []) + + for security_item in security: + if isinstance(security_item, dict) and 'openId' in security_item: + scopes = security_item['openId'] + + if not isinstance(scopes, list): + continue + + for scope in scopes: + # Determine expected scope pattern based on operation + if method.lower() == 'post' and path.endswith('/subscriptions'): + # CREATE operation - should have event type in scope + if not self._is_valid_event_subscription_create_scope(scope, api_name): + result.issues.append(ValidationIssue( + Severity.MEDIUM, "Scope Naming", + f"Event subscription creation scope should follow pattern `api-name:event-type:create`: `{scope}`", + f"{method.upper()} {path}.security", + "Use format: api-name:org.camaraproject.api-name.version.event-name:create" + )) + + elif method.lower() == 'get': + # READ operation + expected_scope = f"{api_name}:read" + if scope != expected_scope: + result.issues.append(ValidationIssue( + Severity.MEDIUM, "Scope Naming", + f"Event subscription read scope should be `{expected_scope}`, found: `{scope}`", + f"{method.upper()} {path}.security" + )) + + elif method.lower() == 'delete': + # DELETE operation + expected_scope = f"{api_name}:delete" + if scope != expected_scope: + result.issues.append(ValidationIssue( + Severity.MEDIUM, "Scope Naming", + f"Event subscription delete scope should be `{expected_scope}`, found: `{scope}`", + f"{method.upper()} {path}.security" + )) + + def _is_valid_event_subscription_create_scope(self, scope: str, api_name: str) -> bool: + """Check if a scope follows the event subscription create pattern + + Pattern: api-name:event-type:create + Where event-type is like: org.camaraproject.api-name.version.event-name + """ + parts = scope.split(':') + + # Should have exactly 3 parts: api-name:event-type:create + if len(parts) != 3: + return False + + scope_api_name, event_type, action = parts + + # Check api name matches + if scope_api_name != api_name: + return False + + # Check action is 'create' + if action != 'create': + return False + + # Check event type format (org.camaraproject.api-name.version.event-name) + if not event_type.startswith('org.camaraproject.'): + return False + + # Optionally, verify the event type exists in the API spec + if hasattr(self, 'api_spec'): + # Look for the event type in the API's defined event types + if not self._event_type_exists_in_spec(event_type): + return False + + return True + + def _event_type_exists_in_spec(self, event_type: str) -> bool: + """Check if an event type is defined in the API specification""" + # Look in common places where event types are defined + paths_to_check = [ + ['components', 'schemas', 'SubscriptionEventType', 'enum'], + ['components', 'schemas', 'EventTypeNotification', 'enum'], + # Add more paths as needed based on API structure + ] + + for path in paths_to_check: + current = self.api_spec + for key in path: + if isinstance(current, dict) and key in current: + current = current[key] + else: + current = None + break + + if isinstance(current, list) and event_type in current: + return True + + return True # Default to true if we can't find the enum + def _validate_security_schemes(self, api_spec: dict, result: ValidationResult): """Validate top-level security configuration""" result.checks_performed.append("Security configuration validation")