Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 161 additions & 11 deletions scripts/api_review_validator_v0_6.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(self, commonalities_version: str = "0.6", review_type: str = "relea
self.implemented_version = "0.6" # This validator only implements v0.6 rules
self.api_spec = None # Will store the API spec for reference resolution
self.review_type = review_type # Store review type for validation behavior
self._current_api_name = None # Store current API name for type detection

# Warn if requested version doesn't match implemented version
if self.expected_commonalities_version != self.implemented_version:
Expand Down Expand Up @@ -223,12 +224,13 @@ def validate_api_file(self, file_path: str) -> ValidationResult:
"Ensure servers[*].url follows format: {apiRoot}/<api-name>/<api-version>"
))

self._current_api_name = api_name
result.api_name = api_name

result.version = info.get('version', 'unknown')

# Detect API type first for targeted validation
result.api_type = self._detect_api_type(api_spec)
result.api_type = self._detect_api_type(api_spec, api_name)
result.checks_performed.append(f"API type detection: {result.api_type.value}")

# Check for Commonalities version mismatch
Expand Down Expand Up @@ -304,10 +306,23 @@ def _get_manual_checks_for_type(self, api_type: APIType) -> List[str]:

return common_checks

def _detect_api_type(self, api_spec: dict) -> APIType:
def _detect_api_type(self, api_spec: dict, api_name: str = None) -> APIType:
"""Enhanced API type detection with better subscription pattern recognition"""
paths = api_spec.get('paths', {})

# Use provided api_name or fall back to stored one
if api_name is None and hasattr(self, '_current_api_name'):
api_name = self._current_api_name

# Check if API name follows subscription API naming convention
is_subscription_api_by_name = False
if api_name:
api_name_lower = api_name.lower()
is_subscription_api_by_name = (
api_name_lower.endswith('-subscriptions') or
api_name_lower.endswith('_subscriptions')
)

# Check for explicit subscription endpoints
subscription_patterns = ['/subscriptions', '/subscription']
for path in paths.keys():
Expand Down Expand Up @@ -340,14 +355,31 @@ def _detect_api_type(self, api_spec: dict) -> APIType:
# Check components for subscription-related schemas
components = api_spec.get('components', {})
schemas = components.get('schemas', {})
has_event_subscription_schema = False
has_event_schema = False

for schema_name, schema_def in schemas.items():
schema_name_lower = schema_name.lower()
if any(keyword in schema_name_lower for keyword in ['subscription', 'webhook', 'event', 'notification']):
if 'subscription' in schema_name_lower:
return APIType.EXPLICIT_SUBSCRIPTION
else:
return APIType.IMPLICIT_SUBSCRIPTION


# Check for event/notification schemas
if any(keyword in schema_name_lower for keyword in ['webhook', 'event', 'notification', 'cloudevent']):
has_event_schema = True

# Check for subscription schemas with SubscriptionId property
elif 'subscription' in schema_name_lower:
if isinstance(schema_def, dict):
properties = schema_def.get('properties', {})
# Event subscription schemas have SubscriptionId
if 'subscriptionId' in properties or 'SubscriptionId' in properties:
has_event_subscription_schema = True

# Make decision based on multiple factors
if has_event_subscription_schema and (is_subscription_api_by_name or
any('/subscription' in path for path in paths.keys())):
return APIType.EXPLICIT_SUBSCRIPTION
elif has_event_schema:
return APIType.IMPLICIT_SUBSCRIPTION

return APIType.REGULAR

def _validate_info_object(self, api_spec: dict, result: ValidationResult):
Expand Down Expand Up @@ -966,9 +998,20 @@ def _validate_notifications_bearer_auth_scheme(self, scheme_def: dict, scheme_na
f"components.securitySchemes.{scheme_name}.bearerFormat"
))

def _validate_operation_security(self, operation: dict, operation_name: str, result: ValidationResult):
"""Validate operation-level security requirements for callbacks and OpenID Connect usage"""
security = operation.get('security')
def _validate_operation_security(self, operation: dict, path: str, method: str,
api_name: str, result: ValidationResult):
"""Validate security settings for an operation"""

# Detect API type first
api_type = self._detect_api_type(self.api_spec, api_name)

# For explicit subscription APIs, use special validation
if api_type == APIType.EXPLICIT_SUBSCRIPTION:
self._validate_explicit_subscription_scopes(operation, path, method, api_name, result)
return

# For other API types, continue with existing validation logic
security = operation.get('security', [])

# Check if this is a callback operation (different security rules)
is_callback = 'callbacks' in operation_name.lower() or 'notification' in operation_name.lower()
Expand Down Expand Up @@ -1023,6 +1066,113 @@ def _validate_operation_security(self, operation: dict, operation_name: str, res
f"{operation_name}.security"
))

def _validate_explicit_subscription_scopes(self, operation: dict, path: str, method: str,
api_name: str, result: ValidationResult):
"""Validate scopes for explicit subscription APIs according to CAMARA guidelines

For explicit subscription APIs:
- CREATE operations: api-name:event-type:create
- READ operations: api-name:read
- DELETE operations: api-name:delete
"""
security = operation.get('security', [])

for security_item in security:
if isinstance(security_item, dict) and 'openId' in security_item:
scopes = security_item['openId']

if not isinstance(scopes, list):
continue

for scope in scopes:
# Determine expected scope pattern based on operation
if method.lower() == 'post' and path.endswith('/subscriptions'):
# CREATE operation - should have event type in scope
if not self._is_valid_event_subscription_create_scope(scope, api_name):
result.issues.append(ValidationIssue(
Severity.MEDIUM, "Scope Naming",
f"Event subscription creation scope should follow pattern `api-name:event-type:create`: `{scope}`",
f"{method.upper()} {path}.security",
"Use format: api-name:org.camaraproject.api-name.version.event-name:create"
))

elif method.lower() == 'get':
# READ operation
expected_scope = f"{api_name}:read"
if scope != expected_scope:
result.issues.append(ValidationIssue(
Severity.MEDIUM, "Scope Naming",
f"Event subscription read scope should be `{expected_scope}`, found: `{scope}`",
f"{method.upper()} {path}.security"
))

elif method.lower() == 'delete':
# DELETE operation
expected_scope = f"{api_name}:delete"
if scope != expected_scope:
result.issues.append(ValidationIssue(
Severity.MEDIUM, "Scope Naming",
f"Event subscription delete scope should be `{expected_scope}`, found: `{scope}`",
f"{method.upper()} {path}.security"
))

def _is_valid_event_subscription_create_scope(self, scope: str, api_name: str) -> bool:
"""Check if a scope follows the event subscription create pattern

Pattern: api-name:event-type:create
Where event-type is like: org.camaraproject.api-name.version.event-name
"""
parts = scope.split(':')

# Should have exactly 3 parts: api-name:event-type:create
if len(parts) != 3:
return False

scope_api_name, event_type, action = parts

# Check api name matches
if scope_api_name != api_name:
return False

# Check action is 'create'
if action != 'create':
return False

# Check event type format (org.camaraproject.api-name.version.event-name)
if not event_type.startswith('org.camaraproject.'):
return False

# Optionally, verify the event type exists in the API spec
if hasattr(self, 'api_spec'):
# Look for the event type in the API's defined event types
if not self._event_type_exists_in_spec(event_type):
return False

return True

def _event_type_exists_in_spec(self, event_type: str) -> bool:
"""Check if an event type is defined in the API specification"""
# Look in common places where event types are defined
paths_to_check = [
['components', 'schemas', 'SubscriptionEventType', 'enum'],
['components', 'schemas', 'EventTypeNotification', 'enum'],
# Add more paths as needed based on API structure
]

for path in paths_to_check:
current = self.api_spec
for key in path:
if isinstance(current, dict) and key in current:
current = current[key]
else:
current = None
break

if isinstance(current, list) and event_type in current:
return True

return True # Default to true if we can't find the enum

def _validate_security_schemes(self, api_spec: dict, result: ValidationResult):
"""Validate top-level security configuration"""
result.checks_performed.append("Security configuration validation")
Expand Down