diff --git a/CLAUDE.md b/CLAUDE.md old mode 100644 new mode 100755 diff --git a/architecture.svg b/architecture.svg old mode 100644 new mode 100755 diff --git a/github_release_manager.py b/github_release_manager.py new file mode 100755 index 0000000..590a64b --- /dev/null +++ b/github_release_manager.py @@ -0,0 +1,625 @@ +"""GitHub API integration for SOL release management. + +This module provides real GitHub API interactions for: +- Creating releases +- Downloading release assets +- Managing release history +- Fetching component metadata +""" + +import hashlib +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple +from urllib.request import urlopen, Request +from urllib.error import HTTPError, URLError + + +class GitHubAPIError(Exception): + """Exception raised for GitHub API errors.""" + pass + + +class GitHubReleaseManager: + """Manages GitHub releases for the SOL ecosystem.""" + + def __init__( + self, + token: str, + owner: str = "eveningsco", + repo: str = "sol-release" + ): + """Initialize the GitHub release manager. + + Args: + token: GitHub personal access token with repo scope + owner: GitHub repository owner (default: eveningsco) + repo: GitHub repository name (default: sol-release) + """ + self.token = token + self.owner = owner + self.repo = repo + self.api_base = "https://api.github.com" + self.headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + "Content-Type": "application/json" + } + + def _make_request( + self, + method: str, + endpoint: str, + data: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Make a GitHub API request. + + Args: + method: HTTP method (GET, POST, PUT, DELETE) + endpoint: API endpoint (e.g., "/repos/{owner}/{repo}/releases") + data: Request body data + + Returns: + Response data as dictionary + + Raises: + GitHubAPIError: If API request fails + """ + url = f"{self.api_base}{endpoint}" + request = Request(url, headers=self.headers, method=method) + + if data: + request.data = json.dumps(data).encode("utf-8") + + try: + with urlopen(request) as response: + return json.loads(response.read().decode("utf-8")) + except HTTPError as e: + error_data = json.loads(e.read().decode("utf-8")) if e.fp else {} + raise GitHubAPIError( + f"GitHub API error {e.code}: {error_data.get('message', str(e))}" + ) + except URLError as e: + raise GitHubAPIError(f"Network error: {e.reason}") + + def create_release( + self, + tag_name: str, + name: str, + body: str, + draft: bool = False, + prerelease: bool = True + ) -> Dict[str, Any]: + """Create a new GitHub release. + + Args: + tag_name: Git tag for the release + name: Release title/name + body: Release body text + draft: Whether to create as draft (default: False) + prerelease: Whether to create as prerelease (default: True) + + Returns: + Release data dictionary + """ + release_data = { + "tag_name": tag_name, + "name": name, + "body": body, + "draft": draft, + "prerelease": prerelease + } + + response = self._make_request( + "POST", + f"/repos/{self.owner}/{self.repo}/releases", + release_data + ) + + return response + + def list_releases( + self, + per_page: int = 30, + page: int = 1 + ) -> List[Dict[str, Any]]: + """List all releases for the repository. + + Args: + per_page: Number of releases per page (default: 30) + page: Page number (default: 1) + + Returns: + List of release data dictionaries + """ + params = f"?per_page={per_page}&page={page}" + response = self._make_request("GET", f"/repos/{self.owner}/{self.repo}/releases{params}") + return response + + def get_release(self, release_id: int) -> Dict[str, Any]: + """Get a specific release by ID. + + Args: + release_id: GitHub release ID + + Returns: + Release data dictionary + """ + return self._make_request( + "GET", + f"/repos/{self.owner}/{self.repo}/releases/{release_id}" + ) + + def get_release_by_tag(self, tag_name: str) -> Optional[Dict[str, Any]]: + """Get a release by its tag name. + + Args: + tag_name: Git tag name to search for + + Returns: + Release data dictionary or None if not found + """ + releases = self.list_releases(per_page=100) + for release in releases: + if release.get("tag_name") == tag_name: + return release + return None + + def delete_release(self, release_id: int) -> bool: + """Delete a release by ID. + + Args: + release_id: GitHub release ID + + Returns: + True if deletion successful + + Raises: + GitHubAPIError: If deletion fails + """ + self._make_request( + "DELETE", + f"/repos/{self.owner}/{self.repo}/releases/{release_id}" + ) + return True + + def cleanup_old_releases(self, keep_count: int = 5) -> List[Dict[str, Any]]: + """Remove old releases keeping only the most recent ones. + + Args: + keep_count: Number of recent releases to keep (default: 5) + + Returns: + List of deleted release information + """ + all_releases = self.list_releases(per_page=100) + + # Sort by creation date (newest first) + sorted_releases = sorted( + all_releases, + key=lambda r: r.get("created_at", ""), + reverse=True + ) + + releases_to_delete = sorted_releases[keep_count:] + deleted_releases = [] + + for release in releases_to_delete: + try: + self.delete_release(release["id"]) + deleted_releases.append({ + "id": release["id"], + "tag": release.get("tag_name", ""), + "name": release.get("name", "") + }) + except GitHubAPIError as e: + # Log error but continue with other deletions + print(f"Failed to delete release {release['id']}: {e}") + + return deleted_releases + + def get_release_assets(self, release_id: int) -> List[Dict[str, Any]]: + """Get all assets for a specific release. + + Args: + release_id: GitHub release ID + + Returns: + List of asset data dictionaries + """ + return self._make_request( + "GET", + f"/repos/{self.owner}/{self.repo}/releases/{release_id}/assets" + ) + + def download_asset( + self, + asset_id: int, + destination: str + ) -> str: + """Download a release asset to a local file. + + Args: + asset_id: GitHub asset ID + destination: Local file path to save the asset + + Returns: + Path to downloaded file + + Raises: + GitHubAPIError: If download fails + """ + url = f"https://github.com/{self.owner}/{self.repo}/releases/download/{asset_id}" + headers = {"Accept": "application/octet-stream"} + + request = Request(url, headers=headers) + + try: + with urlopen(request) as response: + with open(destination, "wb") as f: + f.write(response.read()) + return destination + except HTTPError as e: + raise GitHubAPIError(f"Failed to download asset {asset_id}: {e}") + + def cleanup_assets( + self, + keep_count: int = 100, + pattern: Optional[str] = None + ) -> List[int]: + """Remove old assets from releases, keeping only the most recent ones. + + Args: + keep_count: Number of assets to keep per release + pattern: Optional pattern to filter assets (e.g., "*.zip") + + Returns: + List of deleted asset IDs + """ + all_releases = self.list_releases(per_page=100) + deleted_asset_ids = [] + + for release in all_releases: + assets = self.get_release_assets(release["id"]) + + if pattern: + assets = [a for a in assets if pattern in a.get("name", "")] + + if len(assets) > keep_count: + assets_to_delete = assets[keep_count:] + for asset in assets_to_delete: + try: + self._make_request( + "DELETE", + f"/repos/{self.owner}/{self.repo}/releases/assets/{asset['id']}" + ) + deleted_asset_ids.append(asset["id"]) + except GitHubAPIError as e: + print(f"Failed to delete asset {asset['id']}: {e}") + + return deleted_asset_ids + + +class ComponentMetadataFetcher: + """Fetches metadata from component repository releases.""" + + COMPONENT_MAP = { + "sol-software": { + "owner": "eveningsco", + "repo": "sol-software", + "event_type": "sol_software_release" + }, + "sol-server": { + "owner": "eveningsco", + "repo": "sol-server", + "event_type": "sol_server_release" + }, + "sol-utils": { + "owner": "eveningsco", + "repo": "sol-utils", + "event_type": "sol_utils_release" + } + } + + def __init__(self, token: str): + """Initialize the component metadata fetcher. + + Args: + token: GitHub personal access token + """ + self.token = token + self.api_base = "https://api.github.com" + self.headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json" + } + + def _make_request( + self, + method: str, + url: str, + data: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Make a GitHub API request.""" + request = Request(url, headers=self.headers, method=method) + + if data: + request.data = json.dumps(data).encode("utf-8") + + try: + with urlopen(request) as response: + return json.loads(response.read().decode("utf-8")) + except HTTPError as e: + error_text = e.read().decode("utf-8") if e.fp and e.read(1) != b'' else "" + try: + error_data = json.loads(error_text) if error_text else {} + except json.JSONDecodeError: + error_data = {} + raise GitHubAPIError( + f"GitHub API error {e.code}: {error_data.get('message', str(e))}" + ) + except URLError as e: + raise GitHubAPIError(f"Network error: {e.reason}") + + def get_latest_release(self, component: str) -> Optional[Dict[str, Any]]: + """Get the latest release for a component repository. + + Args: + component: Component name (e.g., "sol-software", "sol-server") + + Returns: + Latest release data or None if not found + """ + if component not in self.COMPONENT_MAP: + raise ValueError(f"Unknown component: {component}") + + config = self.COMPONENT_MAP[component] + url = f"{self.api_base}/repos/{config['owner']}/{config['repo']}/releases/latest" + + try: + response = self._make_request("GET", url) + return response + except GitHubAPIError as e: + print(f"Failed to get latest release for {component}: {e}") + return None + + def fetch_component_metadata(self, component: str) -> Optional[Dict[str, Any]]: + """Fetch complete metadata for a component's latest release. + + Args: + component: Component name (e.g., "sol-software") + + Returns: + Component metadata dictionary or None + """ + release = self.get_latest_release(component) + if not release: + return None + + config = self.COMPONENT_MAP[component] + + # Extract metadata fields + metadata = { + "component": component, + "included": True, + "branch": release.get("target_commitish", ""), + "commit_sha": release.get("target_commitish", ""), + "tag_name": release.get("tag_name", ""), + "release_url": release.get("html_url", ""), + "pr_number": None # Not available in release data + } + + # Try to extract PR number from body (if present) + body = release.get("body", "") + if "PR #" in body: + import re + match = re.search(r"PR #(\d+)", body) + if match: + metadata["pr_number"] = match.group(1) + + return metadata + + def fetch_all_component_metadata(self) -> Dict[str, Any]: + """Fetch metadata for all supported components. + + Returns: + Dictionary mapping component names to their metadata + """ + all_metadata = {} + + for component in self.COMPONENT_MAP: + metadata = self.fetch_component_metadata(component) + if metadata: + all_metadata[component] = metadata + + return all_metadata + + +class ReleaseWorkflowOrchestrator: + """Orchestrates the complete release workflow using GitHub API. + + This class combines release management, artifact downloading, + and metadata collection into a cohesive workflow. + """ + + def __init__( + self, + github_token: str, + owner: str = "eveningsco", + repo: str = "sol-release" + ): + """Initialize the release workflow orchestrator. + + Args: + github_token: GitHub personal access token + owner: Repository owner + repo: Repository name + """ + self.release_manager = GitHubReleaseManager(github_token, owner, repo) + self.component_fetcher = ComponentMetadataFetcher(github_token) + self.workflow_steps: List[str] = [] + + def log_step(self, step: str): + """Log a workflow step.""" + self.workflow_steps.append(step) + print(f"[{step}]") + + def create_release_from_components( + self, + components: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Create a release with metadata from components. + + Args: + components: Optional component metadata dictionary. + If None, fetches from component repos. + + Returns: + Created release information + """ + self.log_step("Starting release creation workflow") + + # Fetch component metadata if not provided + if components is None: + self.log_step("Fetching component metadata from repositories") + components = self.component_fetcher.fetch_all_component_metadata() + + if not components: + raise GitHubAPIError("No component metadata available") + + # Generate release body + body = self._generate_release_body(components) + + # Create release version + timestamp = datetime.now(timezone.utc).strftime("%m%d%Y_%H-%M-%S") + release_name = f"Release-{timestamp}" + tag_name = release_name + + self.log_step(f"Creating release: {release_name}") + + # Create the release + release = self.release_manager.create_release( + tag_name=tag_name, + name=release_name, + body=body, + prerelease=True + ) + + self.log_step(f"Release created: {release.get('html_url', 'N/A')}") + + return { + "release": release, + "components": components, + "steps": self.workflow_steps + } + + def _generate_release_body(self, components: Dict[str, Any]) -> str: + """Generate release body text from component metadata. + + Args: + components: Component metadata dictionary + + Returns: + Release body string + """ + lines = [] + lines.append("Combined SOL release package") + lines.append(f"Build date: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}") + lines.append("") + lines.append("## Component Sources") + + for component, metadata in components.items(): + branch = metadata.get("branch", "") + commit = metadata.get("commit_sha", "")[:7] if metadata.get("commit_sha") else "" + tag = metadata.get("tag_name", "") + + lines.append( + f"- **{component}** {tag} (branch: {branch}, commit: {commit})" + ) + + lines.extend([ + "", + "## Included Files", + "", + "### Executables:", + "- sol-server.zip", + "- sol_software", + "- sol_update_gui", + "- sol_update_backend", + "- sol_update_manager", + "- sol_update_manager_gui", + "- mass_gadget_watchdog", + "- update_version_info", + "- gpio_shutdown_trigger", + "- off_mass_gadget", + "- on_mass_gadget", + "- expand_exfat", + "- provision", + "- mp2624", + "", + "### Service Files:", + "- sol-server.service", + "- sol_software.service", + "- sol-connectivity.service", + "- mass_gadget_watchdog.service", + "- update_version_info.service", + "- filebeat.service", + "", + "### Config Files:", + "- filebeat.yml", + "", + "### Logrotate Files:", + "- mp2624-logrotate", + "- sol-server-logrotate", + "- sol_software-logrotate", + "- mass_gadget_watchdog-logrotate", + "- sol_update_manager-logrotate" + ]) + + return "\n".join(lines) + + def cleanup_old_releases(self, keep_count: int = 5) -> List[Dict[str, Any]]: + """Remove old releases, keeping only the most recent ones. + + Args: + keep_count: Number of releases to keep + + Returns: + List of deleted release information + """ + self.log_step(f"Cleaning up old releases (keeping {keep_count})") + return self.release_manager.cleanup_old_releases(keep_count) + + def get_release_history(self, limit: int = 10) -> List[Dict[str, Any]]: + """Get recent release history. + + Args: + limit: Number of recent releases to fetch + + Returns: + List of recent release information + """ + self.log_step(f"Fetching recent release history (limit: {limit})") + releases = self.release_manager.list_releases(per_page=limit) + + return [ + { + "tag": r.get("tag_name", ""), + "name": r.get("name", ""), + "created_at": r.get("created_at", ""), + "html_url": r.get("html_url", ""), + "is_prerelease": r.get("prerelease", False) + } + for r in releases + ] + + +# Export main classes +__all__ = [ + "GitHubAPIError", + "GitHubReleaseManager", + "ComponentMetadataFetcher", + "ReleaseWorkflowOrchestrator" +] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/tests/test_github_release_manager.py b/tests/test_github_release_manager.py new file mode 100755 index 0000000..70c788c --- /dev/null +++ b/tests/test_github_release_manager.py @@ -0,0 +1,474 @@ +"""Tests for GitHub Release Manager functionality. + +These tests validate the release management, artifact handling, +and workflow orchestration logic. +""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import Mock, patch, MagicMock +from github_release_manager import ( + GitHubAPIError, + GitHubReleaseManager, + ComponentMetadataFetcher, + ReleaseWorkflowOrchestrator +) + + +class TestGitHubAPIError: + """Tests for the GitHubAPIError exception.""" + + def test_exception_message(self): + """Test that error message is preserved.""" + error = GitHubAPIError("Test API error message") + assert str(error) == "Test API error message" + + def test_exception_inherits_from_exception(self): + """Test that GitHubAPIError inherits from Exception.""" + assert isinstance(GitHubAPIError("test"), Exception) + + +class TestGitHubReleaseManager: + """Tests for the GitHubReleaseManager class.""" + + def test_initialization(self): + """Test manager initialization with default values.""" + manager = GitHubReleaseManager("test_token") + + assert manager.token == "test_token" + assert manager.owner == "eveningsco" + assert manager.repo == "sol-release" + assert manager.api_base == "https://api.github.com" + assert "Authorization" in manager.headers + assert "Accept" in manager.headers + + def test_initialization_with_custom_values(self): + """Test manager initialization with custom owner and repo.""" + manager = GitHubReleaseManager("test_token", "custom_owner", "custom_repo") + + assert manager.owner == "custom_owner" + assert manager.repo == "custom_repo" + + @patch('github_release_manager.urlopen') + def test_list_releases(self, mock_urlopen): + """Test listing releases from GitHub API.""" + mock_response = Mock() + mock_response.read.return_value = b'[{"id": 1, "tag_name": "v1.0.0"}]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + releases = manager.list_releases(per_page=10, page=1) + + assert len(releases) == 1 + assert releases[0]["id"] == 1 + assert releases[0]["tag_name"] == "v1.0.0" + + @patch('github_release_manager.urlopen') + def test_get_release(self, mock_urlopen): + """Test fetching a specific release.""" + mock_response = Mock() + mock_response.read.return_value = b'{"id": 123, "tag_name": "v2.0.0"}' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + release = manager.get_release(123) + + assert release["id"] == 123 + assert release["tag_name"] == "v2.0.0" + + @patch('github_release_manager.urlopen') + def test_get_release_by_tag(self, mock_urlopen): + """Test fetching release by tag name.""" + mock_response = Mock() + mock_response.read.return_value = b'[{"id": 456, "tag_name": "v1.5.0"}]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + release = manager.get_release_by_tag("v1.5.0") + + assert release is not None + assert release["id"] == 456 + assert release["tag_name"] == "v1.5.0" + + @patch('github_release_manager.urlopen') + def test_get_release_by_tag_not_found(self, mock_urlopen): + """Test fetching non-existent release by tag.""" + mock_response = Mock() + mock_response.read.return_value = b'[]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + release = manager.get_release_by_tag("nonexistent_tag") + + assert release is None + + @patch('github_release_manager.urlopen') + def test_delete_release(self, mock_urlopen): + """Test deleting a release.""" + mock_response = Mock() + mock_response.read.return_value = b'{}' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + result = manager.delete_release(789) + + assert result is True + + @patch('github_release_manager.urlopen') + def test_cleanup_old_releases(self, mock_urlopen): + """Test cleanup of old releases.""" + mock_response = Mock() + mock_response.read.return_value = b'[]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + deleted = manager.cleanup_old_releases(keep_count=5) + + assert isinstance(deleted, list) + mock_urlopen.assert_called() + + @patch('github_release_manager.urlopen') + def test_get_release_assets(self, mock_urlopen): + """Test fetching release assets.""" + mock_response = Mock() + mock_response.read.return_value = b'[{"id": 1, "name": "asset.zip"}]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + manager = GitHubReleaseManager("test_token") + assets = manager.get_release_assets(123) + + assert len(assets) == 1 + assert assets[0]["id"] == 1 + assert assets[0]["name"] == "asset.zip" + + +class TestComponentMetadataFetcher: + """Tests for the ComponentMetadataFetcher class.""" + + def test_initialization(self): + """Test fetcher initialization.""" + fetcher = ComponentMetadataFetcher("test_token") + + assert "sol-software" in fetcher.COMPONENT_MAP + assert "sol-server" in fetcher.COMPONENT_MAP + assert "sol-utils" in fetcher.COMPONENT_MAP + assert fetcher.api_base == "https://api.github.com" + + @patch('github_release_manager.urlopen') + def test_fetch_component_metadata_success(self, mock_urlopen): + """Test successful component metadata fetch.""" + mock_response = Mock() + mock_response.read.return_value = b''' + { + "tag_name": "v1.2.3", + "target_commitish": "main", + "html_url": "https://github.com/eveningsco/sol-software/releases/v1.2.3" + } + ''' + mock_urlopen.return_value.__enter__.return_value = mock_response + + fetcher = ComponentMetadataFetcher("test_token") + metadata = fetcher.fetch_component_metadata("sol-software") + + assert metadata is not None + assert metadata["component"] == "sol-software" + assert metadata["tag_name"] == "v1.2.3" + assert metadata["branch"] == "main" + + @patch('github_release_manager.urlopen') + def test_fetch_component_metadata_failure(self, mock_urlopen): + """Test component metadata fetch failure.""" + from urllib.error import HTTPError + mock_urlopen.side_effect = HTTPError( + "https://api.github.com/repos/eveningsco/sol-software/releases/latest", + 404, + "Not Found", + {}, + None + ) + + fetcher = ComponentMetadataFetcher("test_token") + metadata = fetcher.fetch_component_metadata("sol-software") + + assert metadata is None + + @patch('github_release_manager.urlopen') + def test_fetch_all_component_metadata(self, mock_urlopen): + """Test fetching metadata for all components.""" + mock_response = Mock() + mock_response.read.return_value = b''' + { + "tag_name": "v1.0.0", + "target_commitish": "main", + "html_url": "https://github.com/eveningsco/sol-software/releases/v1.0.0" + } + ''' + mock_urlopen.return_value.__enter__.return_value = mock_response + + fetcher = ComponentMetadataFetcher("test_token") + metadata = fetcher.fetch_all_component_metadata() + + assert "sol-software" in metadata + assert metadata["sol-software"]["tag_name"] == "v1.0.0" + + def test_invalid_component(self): + """Test fetching metadata for invalid component.""" + fetcher = ComponentMetadataFetcher("test_token") + + with pytest.raises(ValueError) as exc_info: + fetcher.fetch_component_metadata("invalid-component") + + assert "Unknown component" in str(exc_info.value) + + +class TestReleaseWorkflowOrchestrator: + """Tests for the ReleaseWorkflowOrchestrator class.""" + + def test_initialization(self): + """Test orchestrator initialization.""" + orchestrator = ReleaseWorkflowOrchestrator("test_token") + + assert isinstance(orchestrator.release_manager, GitHubReleaseManager) + assert isinstance(orchestrator.component_fetcher, ComponentMetadataFetcher) + assert orchestrator.workflow_steps == [] + + def test_workflow_steps_logging(self): + """Test workflow step logging.""" + orchestrator = ReleaseWorkflowOrchestrator("test_token") + + orchestrator.log_step("Step 1") + orchestrator.log_step("Step 2") + + assert len(orchestrator.workflow_steps) == 2 + assert "Step 1" in orchestrator.workflow_steps + + @patch('github_release_manager.GitHubReleaseManager') + @patch('github_release_manager.ComponentMetadataFetcher') + def test_create_release_from_components(self, mock_fetcher, mock_manager): + """Test creating release from component metadata.""" + mock_manager_instance = Mock() + mock_manager_instance.create_release.return_value = { + "id": 123, + "tag_name": "Release-03272026_14-30-00", + "html_url": "https://github.com/eveningsco/sol-release/releases/tag/v1.0.0" + } + mock_manager.return_value = mock_manager_instance + + mock_fetcher_instance = Mock() + mock_fetcher_instance.fetch_all_component_metadata.return_value = { + "sol-software": { + "component": "sol-software", + "tag_name": "v1.0.0", + "branch": "main", + "commit_sha": "abc123def" + }, + "sol-server": { + "component": "sol-server", + "tag_name": "v2.0.0", + "branch": "develop", + "commit_sha": "xyz789" + } + } + mock_fetcher.return_value = mock_fetcher_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + result = orchestrator.create_release_from_components() + + assert "release" in result + assert "components" in result + assert "steps" in result + assert len(orchestrator.workflow_steps) > 0 + assert any("release" in step.lower() for step in orchestrator.workflow_steps) + + @patch('github_release_manager.GitHubReleaseManager') + def test_create_release_no_components(self, mock_manager_class): + """Test release creation with no components.""" + mock_manager_instance = Mock() + mock_manager_instance.create_release.return_value = {"id": 123} + mock_manager_class.return_value = mock_manager_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + + with pytest.raises(GitHubAPIError) as exc_info: + orchestrator.create_release_from_components({}) + + assert "No component metadata" in str(exc_info.value) + + @patch('github_release_manager.GitHubReleaseManager') + @patch('github_release_manager.ComponentMetadataFetcher') + def test_cleanup_old_releases(self, mock_fetcher, mock_manager): + """Test cleanup of old releases.""" + mock_manager_instance = Mock() + mock_manager_instance.cleanup_old_releases.return_value = [] + mock_manager.return_value = mock_manager_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + deleted = orchestrator.cleanup_old_releases(keep_count=5) + + assert isinstance(deleted, list) + mock_manager_instance.cleanup_old_releases.assert_called_once_with(5) + + @patch('github_release_manager.GitHubReleaseManager') + @patch('github_release_manager.ComponentMetadataFetcher') + def test_get_release_history(self, mock_fetcher, mock_manager): + """Test getting recent release history.""" + mock_manager_instance = Mock() + mock_manager_instance.list_releases.return_value = [ + { + "tag_name": "v1.0.0", + "name": "Release-01012026", + "created_at": "2026-01-01T00:00:00Z", + "html_url": "https://github.com/eveningsco/sol-release/releases/v1.0.0", + "prerelease": True + } + ] + mock_manager.return_value = mock_manager_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + history = orchestrator.get_release_history(limit=5) + + assert len(history) == 1 + assert history[0]["tag"] == "v1.0.0" + assert history[0]["name"] == "Release-01012026" + + +class TestIntegration: + """Integration tests for GitHub release management.""" + + @patch('github_release_manager.GitHubReleaseManager') + @patch('github_release_manager.ComponentMetadataFetcher') + def test_full_release_workflow(self, mock_fetcher, mock_manager): + """Test complete release workflow from component fetch to release creation.""" + # Setup mock components + mock_manager_instance = Mock() + mock_manager_instance.create_release.return_value = { + "id": 999, + "tag_name": "Release-12312025_23-59-59", + "html_url": "https://github.com/eveningsco/sol-release/releases/tag/v1.0.0" + } + mock_manager.return_value = mock_manager_instance + + mock_fetcher_instance = Mock() + mock_fetcher_instance.fetch_all_component_metadata.return_value = { + "sol-software": { + "component": "sol-software", + "tag_name": "v1.0.0", + "branch": "main", + "commit_sha": "abc123def456" + } + } + mock_fetcher.return_value = mock_fetcher_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + + # Execute workflow + result = orchestrator.create_release_from_components() + + # Verify results + assert result["release"]["id"] == 999 + assert "sol-software" in result["components"] + assert len(orchestrator.workflow_steps) >= 3 + + # Verify API was called + mock_manager_instance.create_release.assert_called_once() + + @patch('github_release_manager.GitHubReleaseManager') + def test_release_with_custom_components(self, mock_manager): + """Test release creation with custom component metadata.""" + mock_manager_instance = Mock() + mock_manager_instance.create_release.return_value = {"id": 1} + mock_manager.return_value = mock_manager_instance + + orchestrator = ReleaseWorkflowOrchestrator("test_token") + custom_components = { + "sol-server": { + "component": "sol-server", + "tag_name": "v2.0.0", + "branch": "release", + "commit_sha": "xyz789" + } + } + + result = orchestrator.create_release_from_components(custom_components) + + assert "sol-server" in result["components"] + mock_manager_instance.create_release.assert_called_once() + + +class TestEdgeCases: + """Tests for edge cases and error handling.""" + + @patch('github_release_manager.urlopen') + def test_network_error_handling(self, mock_urlopen): + """Test handling of network errors.""" + from urllib.error import URLError + mock_urlopen.side_effect = URLError("Network unreachable") + + manager = GitHubReleaseManager("test_token") + + with pytest.raises(GitHubAPIError) as exc_info: + manager.list_releases() + + assert "Network error" in str(exc_info.value) + + @patch('github_release_manager.urlopen') + def test_http_error_handling(self, mock_urlopen): + """Test handling of HTTP errors.""" + from urllib.error import HTTPError + import json + + # Create proper empty body for the error response + mock_response = Mock() + mock_response.read.return_value = b'' + mock_urlopen.side_effect = HTTPError( + "https://api.github.com/repos/test/test/releases/latest", + 401, + "Unauthorized", + {}, + mock_response + ) + + fetcher = ComponentMetadataFetcher("invalid_token") + + metadata = fetcher.fetch_component_metadata("sol-software") + assert metadata is None # Should gracefully handle error + + def test_release_body_generation(self): + """Test that release body is properly formatted.""" + orchestrator = ReleaseWorkflowOrchestrator("test_token") + + test_components = { + "sol-software": { + "component": "sol-software", + "tag_name": "v1.0.0", + "branch": "main", + "commit_sha": "abc123def456" + } + } + + body = orchestrator._generate_release_body(test_components) + + assert "Combined SOL release package" in body + assert "Component Sources" in body + assert "sol-software" in body + assert "v1.0.0" in body + assert "main" in body + # Commit SHA is truncated to 7 characters + assert "abc123d" in body + + @patch('github_release_manager.urlopen') + def test_empty_component_metadata(self, mock_urlopen): + """Test handling of empty component metadata.""" + mock_response = Mock() + mock_response.read.return_value = b'[]' + mock_urlopen.return_value.__enter__.return_value = mock_response + + fetcher = ComponentMetadataFetcher("test_token") + metadata = fetcher.fetch_all_component_metadata() + + # Should return empty dict, not None + assert metadata == {} + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_metadata_generation.py b/tests/test_metadata_generation.py new file mode 100755 index 0000000..ae62c67 --- /dev/null +++ b/tests/test_metadata_generation.py @@ -0,0 +1,1324 @@ +"""Metadata Generation Tests for SOL Release System. + +These tests validate metadata generation logic for releases, +component version tracking, and build information collection. +""" + +import pytest +from datetime import datetime, timezone +from typing import Optional +from unittest.mock import Mock, patch, MagicMock +import json +import hashlib +import tempfile +import os + + +class ReleaseMetadataCollector: + """Collects and manages release metadata.""" + + def __init__(self): + """Initialize metadata collector.""" + self.releases: dict = {} + self.metadata_log: list = [] + + def collect_release_metadata( + self, + release_id: int, + tag_name: str, + name: str, + body: str, + created_at: str, + commit_sha: str, + branch: str, + assets: Optional[list] = None + ): + """Collect metadata for a single release. + + Args: + release_id: GitHub release ID + tag_name: Git tag name + name: Release title + body: Release body text + created_at: ISO timestamp of creation + commit_sha: Commit SHA associated with release + branch: Git branch + assets: List of assets in the release + """ + asset_list = assets or [] + metadata = { + "release_id": release_id, + "tag_name": tag_name, + "name": name, + "body": body, + "created_at": created_at, + "commit_sha": commit_sha, + "branch": branch, + "assets": asset_list, + "asset_count": len(asset_list) + } + + self.releases[tag_name] = metadata + self.metadata_log.append({ + "action": "collect", + "tag": tag_name, + "timestamp": datetime.now(timezone.utc).isoformat() + }) + + return metadata + + def get_latest_release(self, tag_prefix: str = "") -> dict: + """Get the latest release matching prefix. + + Args: + tag_prefix: Optional prefix to filter tags + + Returns: + Latest release metadata or None + """ + if not self.releases: + return None + + candidate_tags = [ + tag for tag in self.releases.keys() + if tag.startswith(tag_prefix) + ] + + if not candidate_tags: + return None + + # Sort by created_at descending + sorted_tags = sorted( + candidate_tags, + key=lambda t: self.releases[t].get("created_at", ""), + reverse=True + ) + + latest_tag = sorted_tags[0] + return self.releases[latest_tag] + + def get_all_releases_sorted(self) -> list: + """Get all releases sorted by creation date. + + Returns: + List of release metadata dictionaries + """ + sorted_releases = sorted( + self.releases.values(), + key=lambda r: r.get("created_at", ""), + reverse=True + ) + return sorted_releases + + def get_release_by_tag(self, tag_name: str) -> dict: + """Get release metadata by tag name. + + Args: + tag_name: Git tag name + + Returns: + Release metadata or None + """ + return self.releases.get(tag_name) + + def get_releases_by_branch(self, branch: str) -> list: + """Get all releases for a specific branch. + + Args: + branch: Git branch name + + Returns: + List of release metadata dictionaries + """ + return [ + r for r in self.releases.values() + if r.get("branch") == branch + ] + + def export_to_json(self, filepath: str): + """Export all metadata to JSON file. + + Args: + filepath: Path to output file + """ + data = { + "collected_at": datetime.now(timezone.utc).isoformat(), + "total_releases": len(self.releases), + "releases": self.releases, + "metadata_log": self.metadata_log + } + + with open(filepath, 'w') as f: + json.dump(data, f, indent=2) + + @staticmethod + def generate_metadata_hash(metadata: dict) -> str: + """Generate a hash for metadata verification. + + Args: + metadata: Metadata dictionary to hash + + Returns: + SHA256 hash of metadata + """ + metadata_str = json.dumps(metadata, sort_keys=True) + return hashlib.sha256(metadata_str.encode()).hexdigest() + + +class WorkflowMetadataTracker: + """Tracks metadata for workflow executions.""" + + def __init__(self): + """Initialize workflow metadata tracker.""" + self.workflows: dict = {} + self.workflow_log: list = [] + + def record_workflow( + self, + workflow_id: str, + event_type: str, + triggered_at: str, + components: list, + status: str = "completed" + ): + """Record a workflow execution. + + Args: + workflow_id: Unique workflow execution ID + event_type: GitHub event type that triggered workflow + triggered_at: ISO timestamp + components: List of component names involved + status: Workflow status (completed, failed, running) + """ + metadata = { + "workflow_id": workflow_id, + "event_type": event_type, + "triggered_at": triggered_at, + "components": components, + "status": status, + "duration": None, + "result": {} + } + + self.workflows[workflow_id] = metadata + self.workflow_log.append({ + "action": "record", + "workflow_id": workflow_id, + "timestamp": datetime.now(timezone.utc).isoformat() + }) + + def complete_workflow( + self, + workflow_id: str, + completed_at: str, + result: dict + ): + """Mark a workflow as completed. + + Args: + workflow_id: Workflow execution ID + completed_at: Completion timestamp + result: Workflow result data + """ + if workflow_id not in self.workflows: + return + + self.workflows[workflow_id]["status"] = "completed" + self.workflows[workflow_id]["completed_at"] = completed_at + self.workflows[workflow_id]["result"] = result + + # Calculate duration + if "triggered_at" in self.workflows[workflow_id]: + start = datetime.fromisoformat( + self.workflows[workflow_id]["triggered_at"] + ) + end = datetime.fromisoformat(completed_at) + self.workflows[workflow_id]["duration"] = (end - start).total_seconds() + + def get_workflow_by_id(self, workflow_id: str) -> dict: + """Get workflow metadata by ID. + + Args: + workflow_id: Workflow execution ID + + Returns: + Workflow metadata or None + """ + return self.workflows.get(workflow_id) + + def get_workflows_by_event_type(self, event_type: str) -> list: + """Get all workflows triggered by specific event type. + + Args: + event_type: GitHub event type + + Returns: + List of workflow metadata + """ + return [ + w for w in self.workflows.values() + if w.get("event_type") == event_type + ] + + def get_workflow_statistics(self) -> dict: + """Get workflow execution statistics. + + Returns: + Statistics dictionary + """ + if not self.workflows: + return { + "total": 0, + "completed": 0, + "failed": 0, + "avg_duration": 0 + } + + completed = [ + w for w in self.workflows.values() + if w.get("status") == "completed" + ] + + failed = [ + w for w in self.workflows.values() + if w.get("status") == "failed" + ] + + durations = [ + w.get("duration") for w in completed + if w.get("duration") is not None + ] + + return { + "total": len(self.workflows), + "completed": len(completed), + "failed": len(failed), + "avg_duration": sum(durations) / len(durations) if durations else 0 + } + + +class DispatchEventParser: + """Parses GitHub repository dispatch events.""" + + EVENT_MAPPING = { + "sol_software_release": { + "component": "sol-software", + "owner": "eveningsco", + "repo": "sol-software" + }, + "sol_server_release": { + "component": "sol-server", + "owner": "eveningsco", + "repo": "sol-server" + }, + "sol_utils_release": { + "component": "sol-utils", + "owner": "eveningsco", + "repo": "sol-utils" + } + } + + def __init__(self, payload: dict): + """Initialize dispatch event parser. + + Args: + payload: GitHub webhook payload dictionary + """ + self.payload = payload + self.parsed_data: dict = {} + + def parse(self) -> dict: + """Parse the dispatch event. + + Returns: + Parsed event data + """ + event_type = self.payload.get("client_payload", {}).get("event_type", "") + + if event_type not in self.EVENT_MAPPING: + raise ValueError(f"Unknown event type: {event_type}") + + config = self.EVENT_MAPPING[event_type] + + client_payload = self.payload.get("client_payload", {}) + + self.parsed_data = { + "event_type": event_type, + "component": config["component"], + "owner": config["owner"], + "repo": config["repo"], + "branch": client_payload.get("branch", ""), + "commit_sha": client_payload.get("commit_sha", ""), + "tag_name": client_payload.get("tag_name", ""), + "pr_number": self._extract_pr_number(client_payload.get("body", "")) + } + + return self.parsed_data + + def _extract_pr_number(self, body: str) -> int: + """Extract PR number from body text. + + Args: + body: Text body to search + + Returns: + PR number if found, None otherwise + """ + import re + match = re.search(r"PR\s*#?(\d+)", body) + if match: + return int(match.group(1)) + return None + + def is_valid_dispatch(self) -> bool: + """Check if this is a valid dispatch event. + + Returns: + True if event is valid + """ + return "client_payload" in self.payload and \ + "event_type" in self.payload.get("client_payload", {}) + + def get_component_info(self) -> dict: + """Get component information from parsed event. + + Returns: + Component information dictionary + """ + self.parse() + return { + "name": self.parsed_data.get("component", ""), + "branch": self.parsed_data.get("branch", ""), + "commit": self.parsed_data.get("commit_sha", ""), + "tag": self.parsed_data.get("tag_name", "") + } + + +class ReleaseBodyGenerator: + """Generates release body text from metadata.""" + + ASSET_CATEGORIES = { + "executables": [ + "sol-server", "sol_software", "sol_update_gui", + "sol_update_backend", "sol_update_manager", + "mass_gadget_watchdog", "provision", "mp2624" + ], + "services": [ + "sol-server.service", "sol_software.service", + "mass_gadget_watchdog.service" + ], + "config": ["filebeat.yml"], + "logrotate": [] + } + + def __init__(self, metadata: dict, component_metadata: dict = None): + """Initialize release body generator. + + Args: + metadata: Release metadata dictionary + component_metadata: Optional component metadata + """ + self.metadata = metadata + self.component_metadata = component_metadata or {} + + def generate(self) -> str: + """Generate release body text. + + Returns: + Formatted release body string + """ + lines = [ + "SOL Release Package", + f"Build Date: {self._format_date(self.metadata.get('created_at', ''))}", + "", + "## Component Information" + ] + + if self.component_metadata: + lines.append("") + for component, comp_data in self.component_metadata.items(): + tag = comp_data.get("tag_name", "N/A") + branch = comp_data.get("branch", "N/A") + commit = comp_data.get("commit_sha", "")[:8] if comp_data.get("commit_sha") else "N/A" + lines.append(f"- **{component}**: {tag} (branch: {branch}, commit: {commit})") + + lines.extend([ + "", + "## Release Information", + f"- Release ID: {self.metadata.get('release_id', 'N/A')}", + f"- Tag: {self.metadata.get('tag_name', 'N/A')}", + f"- Commit: {self.metadata.get('commit_sha', 'N/A')[:8] if self.metadata.get('commit_sha') else 'N/A'}", + f"- Assets: {self.metadata.get('asset_count', 0)} files" + ]) + + lines.extend([ + "", + "## Included Files" + ]) + + # Add assets if available + assets = self.metadata.get("assets", []) + if assets: + for i, asset in enumerate(assets[:10], 1): + lines.append(f"{i}. {asset.get('name', 'Unknown')}") + if len(assets) > 10: + lines.append(f"... and {len(assets) - 10} more assets") + + return "\n".join(lines) + + def _format_date(self, date_str: str) -> str: + """Format date string for display. + + Args: + date_str: ISO date string + + Returns: + Formatted date string + """ + if not date_str: + return "N/A" + + try: + dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") + except: + return date_str[:19] + + +class ComponentMetadataFetcher: + """Simulates fetching metadata from component repositories.""" + + COMPONENTS = { + "sol-software": { + "latest_tag": "v1.2.3", + "branch": "main", + "commit_sha": "abc123def456", + "releases": [ + {"tag": "v1.2.3", "date": "2026-03-25"}, + {"tag": "v1.2.2", "date": "2026-03-20"}, + {"tag": "v1.2.1", "date": "2026-03-15"} + ] + }, + "sol-server": { + "latest_tag": "v2.1.0", + "branch": "main", + "commit_sha": "xyz789abc123", + "releases": [ + {"tag": "v2.1.0", "date": "2026-03-26"}, + {"tag": "v2.0.0", "date": "2026-03-18"}, + {"tag": "v1.9.0", "date": "2026-03-10"} + ] + }, + "sol-utils": { + "latest_tag": "v1.0.5", + "branch": "main", + "commit_sha": "def456xyz789", + "releases": [ + {"tag": "v1.0.5", "date": "2026-03-24"}, + {"tag": "v1.0.4", "date": "2026-03-12"} + ] + } + } + + def __init__(self): + """Initialize component metadata fetcher.""" + self.fetched_metadata: dict = {} + + def fetch_component_metadata(self, component: str) -> dict: + """Fetch metadata for a component. + + Args: + component: Component name + + Returns: + Component metadata dictionary + """ + if component not in self.COMPONENTS: + raise ValueError(f"Unknown component: {component}") + + comp_data = self.COMPONENTS[component] + + metadata = { + "component": component, + "included": True, + "branch": comp_data["branch"], + "commit_sha": comp_data["commit_sha"], + "tag_name": comp_data["latest_tag"], + "release_url": f"https://github.com/eveningsco/{component}/releases/tag/{comp_data['latest_tag']}", + "pr_number": None, + "history": comp_data["releases"] + } + + self.fetched_metadata[component] = metadata + return metadata + + def fetch_all_components(self) -> dict: + """Fetch metadata for all components. + + Returns: + Dictionary of component metadata + """ + for component in self.COMPONENTS: + self.fetch_component_metadata(component) + + return self.fetched_metadata + + def get_component_history(self, component: str, limit: int = 5) -> list: + """Get release history for a component. + + Args: + component: Component name + limit: Maximum number of releases to return + + Returns: + List of release information + """ + if component not in self.COMPONENTS: + return [] + + return self.COMPONENTS[component]["releases"][:limit] + + def compare_components(self, components: list) -> dict: + """Compare metadata across components. + + Args: + components: List of component names to compare + + Returns: + Comparison results + """ + comparison = {} + + for component in components: + if component in self.COMPONENTS: + comp_data = self.COMPONENTS[component] + comparison[component] = { + "latest_tag": comp_data["latest_tag"], + "commit_sha": comp_data["commit_sha"], + "branch": comp_data["branch"], + "release_count": len(comp_data["releases"]) + } + + return comparison + + +# ============================================================================ +# TESTS +# ============================================================================ + +class TestReleaseMetadataCollector: + """Tests for release metadata collector.""" + + def test_collect_release_metadata(self): + """Test collecting release metadata.""" + collector = ReleaseMetadataCollector() + + metadata = collector.collect_release_metadata( + release_id=123, + tag_name="v1.0.0", + name="Release 1.0.0", + body="Release body", + created_at="2026-03-25T10:00:00Z", + commit_sha="abc123def", + branch="main", + assets=["file1.zip", "file2.zip"] + ) + + assert metadata["release_id"] == 123 + assert metadata["tag_name"] == "v1.0.0" + assert metadata["asset_count"] == 2 + + def test_get_latest_release(self): + """Test getting latest release.""" + collector = ReleaseMetadataCollector() + + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-01-01T00:00:00Z", + commit_sha="abc", branch="main", + assets=["file1.zip"] + ) + collector.collect_release_metadata( + release_id=2, tag_name="v2.0.0", name="v2.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="def", branch="main", + assets=["file2.zip"] + ) + + latest = collector.get_latest_release() + + assert latest["tag_name"] == "v2.0.0" + assert latest["release_id"] == 2 + + def test_get_latest_release_with_prefix(self): + """Test getting latest release with tag prefix.""" + collector = ReleaseMetadataCollector() + + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-01-01T00:00:00Z", + commit_sha="abc", branch="main", + assets=["file1.zip"] + ) + collector.collect_release_metadata( + release_id=2, tag_name="v2.0.0", name="v2.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="def", branch="main", + assets=["file2.zip"] + ) + + latest = collector.get_latest_release(tag_prefix="v1.") + + assert latest["tag_name"] == "v1.0.0" + + def test_get_release_by_tag(self): + """Test getting release by tag name.""" + collector = ReleaseMetadataCollector() + + collector.collect_release_metadata( + release_id=123, tag_name="v1.0.0", name="v1.0.0", + body="Test", created_at="2026-03-25T00:00:00Z", + commit_sha="abc123", branch="main" + ) + + release = collector.get_release_by_tag("v1.0.0") + + assert release is not None + assert release["release_id"] == 123 + + def test_get_release_by_tag_not_found(self): + """Test getting non-existent release by tag.""" + collector = ReleaseMetadataCollector() + + release = collector.get_release_by_tag("nonexistent") + + assert release is None + + def test_get_releases_by_branch(self): + """Test getting releases by branch.""" + collector = ReleaseMetadataCollector() + + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-01-01T00:00:00Z", + commit_sha="abc", branch="main" + ) + collector.collect_release_metadata( + release_id=2, tag_name="v2.0.0", name="v2.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="def", branch="develop" + ) + collector.collect_release_metadata( + release_id=3, tag_name="v1.1.0", name="v1.1.0", + body="", created_at="2026-03-20T00:00:00Z", + commit_sha="ghi", branch="main" + ) + + main_releases = collector.get_releases_by_branch("main") + + assert len(main_releases) == 2 + assert all(r["branch"] == "main" for r in main_releases) + + def test_get_all_releases_sorted(self): + """Test getting all releases sorted by date.""" + collector = ReleaseMetadataCollector() + + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-01-01T00:00:00Z", + commit_sha="abc", branch="main" + ) + collector.collect_release_metadata( + release_id=2, tag_name="v2.0.0", name="v2.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="def", branch="main" + ) + + sorted_releases = collector.get_all_releases_sorted() + + assert len(sorted_releases) == 2 + assert sorted_releases[0]["tag_name"] == "v2.0.0" + + def test_generate_metadata_hash(self): + """Test generating metadata hash.""" + collector = ReleaseMetadataCollector() + + metadata = { + "release_id": 123, + "tag_name": "v1.0.0", + "name": "Test" + } + + hash1 = collector.generate_metadata_hash(metadata) + hash2 = collector.generate_metadata_hash(metadata) + + assert len(hash1) == 64 # SHA256 hex length + assert hash1 == hash2 + + metadata["name"] = "Different" + hash3 = collector.generate_metadata_hash(metadata) + + assert hash1 != hash3 + + def test_export_to_json(self): + """Test exporting metadata to JSON.""" + collector = ReleaseMetadataCollector() + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: + temp_path = f.name + + try: + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="abc", branch="main" + ) + + collector.export_to_json(temp_path) + + with open(temp_path, 'r') as f: + data = json.load(f) + + assert "collected_at" in data + assert data["total_releases"] == 1 + assert "v1.0.0" in data["releases"] + + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + +class TestWorkflowMetadataTracker: + """Tests for workflow metadata tracker.""" + + def test_record_workflow(self): + """Test recording a workflow execution.""" + tracker = WorkflowMetadataTracker() + + tracker.record_workflow( + workflow_id="wf-001", + event_type="sol_software_release", + triggered_at="2026-03-25T10:00:00Z", + components=["sol-software"], + status="completed" + ) + + workflow = tracker.get_workflow_by_id("wf-001") + + assert workflow["event_type"] == "sol_software_release" + assert workflow["status"] == "completed" + assert "sol-software" in workflow["components"] + + def test_complete_workflow(self): + """Test completing a workflow.""" + tracker = WorkflowMetadataTracker() + + # Record workflow + tracker.record_workflow( + workflow_id="wf-001", + event_type="sol_software_release", + triggered_at="2026-03-25T10:00:00Z", + components=["sol-software"] + ) + + # Complete workflow + tracker.complete_workflow( + workflow_id="wf-001", + completed_at="2026-03-25T10:05:00Z", + result={"success": True, "assets": 5} + ) + + workflow = tracker.get_workflow_by_id("wf-001") + + assert workflow["status"] == "completed" + assert workflow["duration"] == 300.0 # 5 minutes in seconds + assert workflow["result"]["success"] is True + + def test_get_workflows_by_event_type(self): + """Test getting workflows by event type.""" + tracker = WorkflowMetadataTracker() + + tracker.record_workflow("wf-1", "sol_software_release", "2026-03-25T10:00:00Z", []) + tracker.record_workflow("wf-2", "sol_server_release", "2026-03-25T11:00:00Z", []) + tracker.record_workflow("wf-3", "sol_software_release", "2026-03-25T12:00:00Z", []) + + software_workflows = tracker.get_workflows_by_event_type("sol_software_release") + + assert len(software_workflows) == 2 + + def test_get_workflow_statistics(self): + """Test getting workflow statistics.""" + tracker = WorkflowMetadataTracker() + + tracker.record_workflow("wf-1", "event", "2026-03-25T10:00:00Z", [], status="completed") + tracker.record_workflow("wf-2", "event", "2026-03-25T10:00:00Z", [], status="completed") + tracker.record_workflow("wf-3", "event", "2026-03-25T10:00:00Z", [], status="running") + + stats = tracker.get_workflow_statistics() + + assert stats["total"] == 3 + assert stats["completed"] == 2 + assert stats["failed"] == 0 + + def test_get_workflow_by_id_not_found(self): + """Test getting non-existent workflow.""" + tracker = WorkflowMetadataTracker() + + workflow = tracker.get_workflow_by_id("nonexistent") + + assert workflow is None + + +class TestDispatchEventParser: + """Tests for dispatch event parser.""" + + def test_parse_valid_dispatch(self): + """Test parsing valid dispatch event.""" + payload = { + "event_type": "repository_dispatch", + "client_payload": { + "event_type": "sol_software_release", + "branch": "main", + "commit_sha": "abc123def456", + "tag_name": "v1.0.0" + } + } + + parser = DispatchEventParser(payload) + parsed = parser.parse() + + assert parsed["event_type"] == "sol_software_release" + assert parsed["component"] == "sol-software" + assert parsed["branch"] == "main" + assert parsed["commit_sha"] == "abc123def456" + + def test_parse_invalid_event_type(self): + """Test parsing invalid event type.""" + payload = { + "client_payload": { + "event_type": "unknown_event" + } + } + + parser = DispatchEventParser(payload) + + with pytest.raises(ValueError): + parser.parse() + + def test_extract_pr_number(self): + """Test extracting PR number from body.""" + payload = { + "client_payload": { + "event_type": "sol_software_release", + "body": "Release v1.0.0 - PR #123" + } + } + + parser = DispatchEventParser(payload) + parsed = parser.parse() + + assert parsed["pr_number"] == 123 + + def test_extract_pr_number_no_pr(self): + """Test parsing when no PR number in body.""" + payload = { + "client_payload": { + "event_type": "sol_server_release", + "body": "Release v2.0.0" + } + } + + parser = DispatchEventParser(payload) + parsed = parser.parse() + + assert parsed["pr_number"] is None + + def test_is_valid_dispatch(self): + """Test dispatch validation.""" + valid_payload = { + "client_payload": { + "event_type": "sol_software_release", + "branch": "main" + } + } + invalid_payload = {} + + assert DispatchEventParser(valid_payload).is_valid_dispatch() is True + assert DispatchEventParser(invalid_payload).is_valid_dispatch() is False + + def test_get_component_info(self): + """Test getting component information.""" + payload = { + "client_payload": { + "event_type": "sol_server_release", + "branch": "develop", + "commit_sha": "xyz789", + "tag_name": "v1.5.0" + } + } + + parser = DispatchEventParser(payload) + component_info = parser.get_component_info() + + assert component_info["name"] == "sol-server" + assert component_info["branch"] == "develop" + + +class TestReleaseBodyGenerator: + """Tests for release body generator.""" + + def test_generate_basic(self): + """Test generating basic release body.""" + metadata = { + "release_id": "123", + "tag_name": "v1.0.0", + "commit_sha": "abc123def", + "asset_count": 5, + "created_at": "2026-03-25T10:00:00Z" + } + + generator = ReleaseBodyGenerator(metadata) + body = generator.generate() + + assert "SOL Release Package" in body + assert "Build Date:" in body + assert "v1.0.0" in body + # Commit SHA is truncated to 8 characters + assert "abc123de" in body + + def test_generate_with_component_metadata(self): + """Test generating release body with component info.""" + metadata = { + "release_id": "123", + "tag_name": "v1.0.0", + "commit_sha": "abc123", + "asset_count": 5, + "created_at": "2026-03-25T10:00:00Z" + } + + component_metadata = { + "sol-software": { + "tag_name": "v1.2.3", + "branch": "main", + "commit_sha": "xyz789" + }, + "sol-server": { + "tag_name": "v2.0.0", + "branch": "develop", + "commit_sha": "abc456" + } + } + + generator = ReleaseBodyGenerator(metadata, component_metadata) + body = generator.generate() + + assert "sol-software" in body + assert "sol-server" in body + assert "v1.2.3" in body + assert "v2.0.0" in body + + def test_generate_with_assets(self): + """Test generating release body with assets.""" + metadata = { + "release_id": "123", + "tag_name": "v1.0.0", + "commit_sha": "abc123", + "asset_count": 3, + "assets": [ + {"name": "sol-server.zip"}, + {"name": "sol_software"}, + {"name": "filebeat.yml"} + ], + "created_at": "2026-03-25T10:00:00Z" + } + + generator = ReleaseBodyGenerator(metadata) + body = generator.generate() + + assert "sol-server.zip" in body + assert "1. sol-server.zip" in body + + def test_format_date(self): + """Test date formatting.""" + generator = ReleaseBodyGenerator({}) + + # Test valid ISO date + formatted = generator._format_date("2026-03-25T10:00:00Z") + assert "2026-03-25" in formatted + assert "UTC" in formatted + + # Test invalid date + formatted = generator._format_date("not-a-date") + assert "not-a-date" in formatted + + # Test empty date + formatted = generator._format_date("") + assert "N/A" in formatted + + +class TestComponentMetadataFetcher: + """Tests for component metadata fetcher.""" + + def test_fetch_component_metadata(self): + """Test fetching component metadata.""" + fetcher = ComponentMetadataFetcher() + + metadata = fetcher.fetch_component_metadata("sol-software") + + assert metadata["component"] == "sol-software" + assert metadata["tag_name"] == "v1.2.3" + assert metadata["branch"] == "main" + assert "abc123def456" in metadata["commit_sha"] + + def test_fetch_invalid_component(self): + """Test fetching metadata for invalid component.""" + fetcher = ComponentMetadataFetcher() + + with pytest.raises(ValueError): + fetcher.fetch_component_metadata("invalid-component") + + def test_fetch_all_components(self): + """Test fetching all components.""" + fetcher = ComponentMetadataFetcher() + + metadata = fetcher.fetch_all_components() + + assert "sol-software" in metadata + assert "sol-server" in metadata + assert "sol-utils" in metadata + + def test_get_component_history(self): + """Test getting component release history.""" + fetcher = ComponentMetadataFetcher() + + history = fetcher.get_component_history("sol-server", limit=2) + + assert len(history) == 2 + assert history[0]["tag"] == "v2.1.0" + + def test_get_component_history_no_component(self): + """Test getting history for invalid component.""" + fetcher = ComponentMetadataFetcher() + + history = fetcher.get_component_history("invalid-component") + + assert history == [] + + def test_compare_components(self): + """Test comparing components.""" + fetcher = ComponentMetadataFetcher() + + comparison = fetcher.compare_components(["sol-software", "sol-server"]) + + assert "sol-software" in comparison + assert "sol-server" in comparison + assert comparison["sol-software"]["latest_tag"] == "v1.2.3" + assert comparison["sol-server"]["latest_tag"] == "v2.1.0" + + def test_compare_components_mixed(self): + """Test comparing with mix of valid and invalid components.""" + fetcher = ComponentMetadataFetcher() + + comparison = fetcher.compare_components([ + "sol-software", + "invalid-component", + "sol-server" + ]) + + assert "sol-software" in comparison + assert "invalid-component" not in comparison + assert "sol-server" in comparison + + +class TestMetadataIntegration: + """Integration tests for metadata generation.""" + + def test_full_metadata_workflow(self): + """Test complete metadata workflow.""" + # Collect releases with proper asset format + collector = ReleaseMetadataCollector() + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-01-01T00:00:00Z", + commit_sha="abc123", branch="main", + assets=[{"name": "file1.zip"}] + ) + collector.collect_release_metadata( + release_id=2, tag_name="v2.0.0", name="v2.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="def456", branch="main", + assets=[{"name": "file2.zip"}, {"name": "file3.zip"}] + ) + + # Generate release body + latest = collector.get_latest_release() + generator = ReleaseBodyGenerator(latest) + body = generator.generate() + + # Verify body contains expected content + assert "v2.0.0" in body + assert "Build Date:" in body + # Note: asset_count is the count of assets, not the string "3 files" + assert "file2.zip" in body or "file3.zip" in body + + def test_workflow_and_dispatch_integration(self): + """Test integrating workflow tracking with dispatch events.""" + # Parse dispatch event + payload = { + "event_type": "repository_dispatch", + "client_payload": { + "event_type": "sol_software_release", + "branch": "main", + "commit_sha": "xyz789", + "tag_name": "v1.0.0" + } + } + + parser = DispatchEventParser(payload) + parsed = parser.parse() + + # Record workflow + tracker = WorkflowMetadataTracker() + tracker.record_workflow( + workflow_id="wf-001", + event_type=parsed["event_type"], + triggered_at="2026-03-25T10:00:00Z", + components=[parsed["component"]] + ) + + # Complete workflow + tracker.complete_workflow( + workflow_id="wf-001", + completed_at="2026-03-25T10:05:00Z", + result={"component": parsed["component"]} + ) + + # Verify + workflow = tracker.get_workflow_by_id("wf-001") + assert workflow["status"] == "completed" + assert "sol-software" in workflow["components"] + + def test_metadata_consistency_across_components(self): + """Test metadata consistency across multiple components.""" + fetcher = ComponentMetadataFetcher() + + # Fetch all components + all_metadata = fetcher.fetch_all_components() + + # Verify each component has required fields + for component, metadata in all_metadata.items(): + assert "component" in metadata + assert "tag_name" in metadata + assert "branch" in metadata + assert "commit_sha" in metadata + assert "history" in metadata + + # Verify versions are correct + assert all_metadata["sol-software"]["tag_name"] == "v1.2.3" + assert all_metadata["sol-server"]["tag_name"] == "v2.1.0" + assert all_metadata["sol-utils"]["tag_name"] == "v1.0.5" + + def test_metadata_persistence(self): + """Test metadata persistence across operations.""" + collector = ReleaseMetadataCollector() + + # Collect releases with assets + collector.collect_release_metadata( + release_id=1, tag_name="v1.0.0", name="v1.0.0", + body="", created_at="2026-03-25T00:00:00Z", + commit_sha="abc123", branch="main", + assets=[{"name": "test.zip"}] + ) + + # Export to JSON + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: + temp_path = f.name + + try: + collector.export_to_json(temp_path) + + # Re-import and verify + with open(temp_path, 'r') as f: + imported_data = json.load(f) + + assert "releases" in imported_data + assert "v1.0.0" in imported_data["releases"] + assert imported_data["releases"]["v1.0.0"]["tag_name"] == "v1.0.0" + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_metadata_hash_verification(self): + """Test metadata hash for integrity verification.""" + collector = ReleaseMetadataCollector() + + # Create and hash metadata + metadata = { + "release_id": 1, + "tag_name": "v1.0.0", + "commit_sha": "abc123def" + } + + hash1 = collector.generate_metadata_hash(metadata) + + # Verify same metadata produces same hash + hash2 = collector.generate_metadata_hash(metadata) + assert hash1 == hash2 + + # Verify different metadata produces different hash + metadata["tag_name"] = "v1.1.0" + hash3 = collector.generate_metadata_hash(metadata) + assert hash1 != hash3 + + +class TestMetadataEdgeCases: + """Tests for metadata edge cases.""" + + def test_empty_metadata_collector(self): + """Test operations on empty metadata collector.""" + collector = ReleaseMetadataCollector() + + latest = collector.get_latest_release() + assert latest is None + + sorted_releases = collector.get_all_releases_sorted() + assert sorted_releases == [] + + def test_workflow_tracker_with_no_workflows(self): + """Test workflow tracker with no workflows.""" + tracker = WorkflowMetadataTracker() + + stats = tracker.get_workflow_statistics() + + assert stats["total"] == 0 + assert stats["completed"] == 0 + assert stats["avg_duration"] == 0 + + def test_dispatch_parser_invalid_payload(self): + """Test dispatch parser with invalid payload.""" + parser = DispatchEventParser({}) + + with pytest.raises(ValueError): + parser.parse() + + def test_release_body_generator_empty_metadata(self): + """Test release body generator with empty metadata.""" + generator = ReleaseBodyGenerator({}) + body = generator.generate() + + # Should still generate valid body with N/A values + assert "SOL Release Package" in body + assert "N/A" in body + + def test_component_fetcher_partial_history(self): + """Test component fetcher with limited history.""" + fetcher = ComponentMetadataFetcher() + + history = fetcher.get_component_history("sol-utils", limit=1) + + assert len(history) == 1 + assert history[0]["tag"] == "v1.0.5" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_workflow_simulation.py b/tests/test_workflow_simulation.py new file mode 100755 index 0000000..a47ac25 --- /dev/null +++ b/tests/test_workflow_simulation.py @@ -0,0 +1,986 @@ +"""Workflow Logic Simulation Tests for SOL Release Orchestrator. + +These tests validate the GitHub Actions workflow orchestration logic, +artifact handling simulation, and release creation workflows. +""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import Mock, patch, MagicMock +import json +import hashlib +import tempfile +import os + + +class WorkflowEventSimulator: + """Simulates GitHub workflow dispatch events.""" + + EVENT_TYPES = { + "sol_software_release": { + "owner": "eveningsco", + "repo": "sol-software", + "branch": "main" + }, + "sol_server_release": { + "owner": "eveningsco", + "repo": "sol-server", + "branch": "main" + }, + "sol_utils_release": { + "owner": "eveningsco", + "repo": "sol-utils", + "branch": "main" + }, + "manual_dispatch": { + "owner": "eveningsco", + "repo": "sol-release", + "branch": "main" + } + } + + def __init__(self, event_type: str, payload: dict): + """Initialize workflow event simulator. + + Args: + event_type: Type of GitHub event + payload: Event payload data + """ + self.event_type = event_type + self.payload = payload + self.timestamp = datetime.now(timezone.utc).isoformat() + + def get_component_info(self) -> dict: + """Get component information from event. + + Returns: + Dictionary with component owner, repo, and branch + """ + if self.event_type not in self.EVENT_TYPES: + raise ValueError(f"Unknown event type: {self.event_type}") + + return { + **self.EVENT_TYPES[self.event_type], + "tag": self.payload.get("tag_name", ""), + "commit_sha": self.payload.get("commit_sha", ""), + "branch": self.payload.get("branch", "") + } + + def is_valid_component_event(self) -> bool: + """Check if this is a valid component release event.""" + return self.event_type in [ + "sol_software_release", + "sol_server_release", + "sol_utils_release" + ] + + def is_manual_dispatch(self) -> bool: + """Check if this is a manual dispatch event.""" + return self.event_type == "manual_dispatch" + + +class WorkflowStepSimulator: + """Simulates individual workflow steps.""" + + def __init__(self): + """Initialize workflow step simulator.""" + self.steps: list = [] + self.current_step: str = "" + self.errors: list = [] + + def add_step(self, step_name: str, status: str = "pending", **kwargs): + """Record a workflow step. + + Args: + step_name: Name of the step + status: Step status (pending, in_progress, completed, failed) + **kwargs: Additional step data + """ + step = { + "name": step_name, + "status": status, + "timestamp": datetime.now(timezone.utc).isoformat(), + **kwargs + } + self.steps.append(step) + self.current_step = step_name + + def mark_completed(self, result: dict = None): + """Mark current step as completed. + + Args: + result: Optional step result data + """ + if self.steps: + self.steps[-1]["status"] = "completed" + if result: + self.steps[-1]["result"] = result + + def mark_failed(self, error: str): + """Mark current step as failed. + + Args: + error: Error message + """ + if self.steps: + self.steps[-1]["status"] = "failed" + self.steps[-1]["error"] = error + self.errors.append(error) + + def get_status(self) -> dict: + """Get workflow status summary. + + Returns: + Dictionary with step counts and overall status + """ + if not self.steps: + return {"total": 0, "completed": 0, "failed": 0, "status": "empty"} + + completed = sum(1 for s in self.steps if s["status"] == "completed") + failed = sum(1 for s in self.steps if s["status"] == "failed") + pending = sum(1 for s in self.steps if s["status"] == "pending") + + overall_status = "pending" + if failed > 0: + overall_status = "failed" + elif pending == 0 and completed == len(self.steps): + overall_status = "completed" + + return { + "total": len(self.steps), + "completed": completed, + "failed": failed, + "pending": pending, + "status": overall_status + } + + def to_log(self) -> str: + """Convert steps to log format string.""" + lines = [] + for i, step in enumerate(self.steps, 1): + status_icon = { + "completed": "✓", + "failed": "✗", + "pending": "○", + "in_progress": "→" + }.get(step["status"], "?") + + timestamp = step["timestamp"][:19] + lines.append(f"[{i:02d}] [{status_icon}] {step['name']} @ {timestamp}") + + if "error" in step: + lines.append(f" Error: {step['error']}") + + return "\n".join(lines) + + +class ArtifactDownloaderSimulator: + """Simulates downloading artifacts from component releases.""" + + def __init__(self): + """Initialize artifact downloader.""" + self.downloaded_assets: dict = {} + self.download_log: list = [] + + def download_asset(self, asset_name: str, asset_url: str) -> bool: + """Simulate downloading an asset. + + Args: + asset_name: Name of the asset to download + asset_url: URL of the asset + + Returns: + True if download simulated successfully + """ + try: + # Simulate download by storing in memory + content_hash = hashlib.sha256(asset_url.encode()).hexdigest()[:8] + self.downloaded_assets[asset_name] = { + "url": asset_url, + "hash": content_hash, + "downloaded_at": datetime.now(timezone.utc).isoformat() + } + self.download_log.append({ + "action": "download", + "asset": asset_name, + "status": "success", + "hash": content_hash + }) + return True + except Exception as e: + self.download_log.append({ + "action": "download", + "asset": asset_name, + "status": "failed", + "error": str(e) + }) + return False + + def verify_asset(self, asset_name: str) -> bool: + """Verify an asset was downloaded. + + Args: + asset_name: Name of asset to verify + + Returns: + True if asset exists in downloaded_assets + """ + return asset_name in self.downloaded_assets + + def get_missing_assets(self, required_assets: list) -> list: + """Get list of required assets not downloaded. + + Args: + required_assets: List of required asset names + + Returns: + List of missing asset names + """ + return [ + name for name in required_assets + if name not in self.downloaded_assets + ] + + def get_summary(self) -> dict: + """Get download summary. + + Returns: + Dictionary with download statistics + """ + return { + "total_downloaded": len(self.downloaded_assets), + "assets": list(self.downloaded_assets.keys()), + "downloads": len(self.download_log) + } + + +class ReleaseMetadataGenerator: + """Generates release metadata from component information.""" + + REQUIRED_ASSETS = [ + "sol-server.zip", + "sol_software", + "sol_update_gui", + "sol_update_backend", + "sol_update_manager", + "mass_gadget_watchdog", + "filebeat.yml", + "sol-server.service", + "sol_software.service" + ] + + def __init__(self, event_simulator: WorkflowEventSimulator, + downloader: ArtifactDownloaderSimulator): + """Initialize metadata generator. + + Args: + event_simulator: Workflow event simulator + downloader: Artifact downloader simulator + """ + self.event_simulator = event_simulator + self.downloader = downloader + self.metadata: dict = {} + + def generate_metadata(self) -> dict: + """Generate release metadata. + + Returns: + Complete metadata dictionary + """ + component_info = self.event_simulator.get_component_info() + + self.metadata = { + "release_id": hashlib.sha256( + f"{component_info['commit_sha']}_{datetime.now(timezone.utc).timestamp()}".encode() + ).hexdigest()[:16], + "build_date": datetime.now(timezone.utc).isoformat(), + "event_type": self.event_simulator.event_type, + "component": { + "owner": component_info["owner"], + "repo": component_info["repo"], + "branch": component_info["branch"], + "commit_sha": component_info["commit_sha"], + "tag": component_info["tag"] + }, + "assets": self._generate_asset_list(), + "validation": self._validate_assets() + } + + return self.metadata + + def _generate_asset_list(self) -> list: + """Generate list of assets with status.""" + assets = [] + for asset_name in self.REQUIRED_ASSETS: + is_downloaded = self.downloader.verify_asset(asset_name) + + assets.append({ + "name": asset_name, + "status": "downloaded" if is_downloaded else "pending", + "included": is_downloaded + }) + + return assets + + def _validate_assets(self) -> dict: + """Validate asset completeness. + + Returns: + Validation results + """ + missing = self.downloader.get_missing_assets(self.REQUIRED_ASSETS) + downloaded_count = len(self.downloader.downloaded_assets) + total_required = len(self.REQUIRED_ASSETS) + + is_complete = len(missing) == 0 + + return { + "is_complete": is_complete, + "downloaded": downloaded_count, + "total_required": total_required, + "missing": missing, + "completion_percentage": (downloaded_count / total_required * 100) if total_required > 0 else 0 + } + + def generate_release_body(self) -> str: + """Generate release body text. + + Returns: + Formatted release body string + """ + self.generate_metadata() + + lines = [ + "SOL Release Package", + f"Build Date: {self.metadata['build_date'][:10]}", + "", + "## Component Information", + f"- Repository: {self.metadata['component']['owner']}/{self.metadata['component']['repo']}", + f"- Branch: {self.metadata['component']['branch']}", + f"- Commit: {self.metadata['component']['commit_sha'][:8]}", + f"- Tag: {self.metadata['component']['tag'] or 'N/A'}", + "", + "## Included Assets", + f"- Total: {len(self.metadata['assets'])}", + f"- Downloaded: {sum(1 for a in self.metadata['assets'] if a['status'] == 'downloaded')}", + f"- Missing: {len(self._validate_assets()['missing'])}", + "" + ] + + for asset in self.metadata['assets']: + status_icon = "✓" if asset['status'] == 'downloaded' else "○" + lines.append(f"{status_icon} {asset['name']}") + + return "\n".join(lines) + + def get_component_metadata_dict(self) -> dict: + """Get metadata in format compatible with ComponentMetadataFetcher. + + Returns: + Metadata dictionary + """ + self.generate_metadata() + + return { + "component": f"{self.metadata['component']['owner']}/{self.metadata['component']['repo']}", + "included": self.metadata['validation']['is_complete'], + "branch": self.metadata['component']['branch'], + "commit_sha": self.metadata['component']['commit_sha'], + "tag_name": self.metadata['component']['tag'] or "", + "release_url": f"https://github.com/{self.metadata['component']['owner']}/{self.metadata['component']['repo']}/releases/tag/{self.metadata['component']['tag']}" if self.metadata['component']['tag'] else "", + "pr_number": None + } + + +class WorkflowOrchestratorSimulator: + """Simulates the complete release workflow orchestration.""" + + def __init__(self, event_type: str, payload: dict): + """Initialize workflow orchestrator simulator. + + Args: + event_type: GitHub event type + payload: Event payload + """ + self.event = WorkflowEventSimulator(event_type, payload) + self.steps = WorkflowStepSimulator() + self.downloader = ArtifactDownloaderSimulator() + self.metadata_generator = ReleaseMetadataGenerator(self.event, self.downloader) + + def execute_workflow(self) -> dict: + """Execute the complete workflow. + + Returns: + Workflow execution results + """ + try: + self.steps.add_step("workflow_started", status="completed") + + # Step 1: Validate event + self.steps.add_step("validate_event", status="in_progress") + if not self.event.is_valid_component_event(): + self.steps.mark_failed("Invalid event type for component release") + return self.steps.get_status() + self.steps.mark_completed() + + # Step 2: Parse event payload + self.steps.add_step("parse_event_payload", status="in_progress") + component_info = self.event.get_component_info() + self.steps.mark_completed({"component": component_info["repo"]}) + + # Step 3: Download artifacts + self.steps.add_step("download_artifacts", status="in_progress") + self._simulate_downloads(component_info) + missing = self.downloader.get_missing_assets(self.metadata_generator.REQUIRED_ASSETS) + if missing: + self.steps.mark_completed({"warning": f"Missing assets: {', '.join(missing[:3])}"}) + else: + self.steps.mark_completed() + + # Step 4: Generate metadata + self.steps.add_step("generate_metadata", status="in_progress") + metadata = self.metadata_generator.generate_metadata() + self.steps.mark_completed({"release_id": metadata['release_id']}) + + # Step 5: Create release + self.steps.add_step("create_release", status="in_progress") + release_body = self.metadata_generator.generate_release_body() + self.steps.mark_completed({"body_length": len(release_body)}) + + # Mark workflow complete + self.steps.add_step("workflow_completed", status="completed") + + return { + "status": self.steps.get_status(), + "metadata": metadata, + "download_summary": self.downloader.get_summary(), + "steps_log": self.steps.to_log() + } + + except Exception as e: + self.steps.add_step("workflow_errored", status="failed", error=str(e)) + return { + "status": self.steps.get_status(), + "error": str(e) + } + + def _simulate_downloads(self, component_info: dict): + """Simulate downloading assets. + + Args: + component_info: Component information dictionary + """ + # Simulate downloading some assets + for asset in self.metadata_generator.REQUIRED_ASSETS[:5]: + url = f"https://github.com/{component_info['owner']}/{component_info['repo']}/releases/download/{component_info['tag']}/{asset}" + self.downloader.download_asset(asset, url) + + def get_status(self) -> dict: + """Get current workflow status. + + Returns: + Status dictionary + """ + return self.steps.get_status() + + def get_log(self) -> str: + """Get workflow execution log. + + Returns: + Formatted log string + """ + return self.steps.to_log() + + +# ============================================================================ +# TESTS +# ============================================================================ + +class TestWorkflowEventSimulator: + """Tests for workflow event simulator.""" + + def test_init_with_valid_event(self): + """Test initialization with valid component event.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123", "branch": "main"} + ) + + assert event.event_type == "sol_software_release" + assert event.is_valid_component_event() + assert not event.is_manual_dispatch() + + def test_init_with_invalid_event(self): + """Test initialization with invalid event type.""" + event = WorkflowEventSimulator( + "invalid_event", + {} + ) + + with pytest.raises(ValueError): + event.get_component_info() + + def test_get_component_info(self): + """Test getting component information.""" + event = WorkflowEventSimulator( + "sol_server_release", + {"tag_name": "v2.0.0", "commit_sha": "def456", "branch": "develop"} + ) + + info = event.get_component_info() + + assert info["owner"] == "eveningsco" + assert info["repo"] == "sol-server" + assert info["tag"] == "v2.0.0" + assert info["branch"] == "develop" + + def test_is_manual_dispatch(self): + """Test manual dispatch detection.""" + manual_event = WorkflowEventSimulator( + "manual_dispatch", + {} + ) + + assert manual_event.is_manual_dispatch() + assert not manual_event.is_valid_component_event() + + +class TestWorkflowStepSimulator: + """Tests for workflow step simulator.""" + + def test_add_step(self): + """Test adding workflow steps.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Step 1", status="completed") + simulator.add_step("Step 2", status="pending") + + assert len(simulator.steps) == 2 + assert simulator.steps[0]["name"] == "Step 1" + assert simulator.steps[1]["name"] == "Step 2" + + def test_mark_completed(self): + """Test marking step as completed.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Test Step") + simulator.mark_completed({"result": "success"}) + + assert simulator.steps[0]["status"] == "completed" + assert "result" in simulator.steps[0] + + def test_mark_failed(self): + """Test marking step as failed.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Test Step") + simulator.mark_failed("Error occurred") + + assert simulator.steps[0]["status"] == "failed" + assert "error" in simulator.steps[0] + assert len(simulator.errors) == 1 + + def test_get_status_completed_workflow(self): + """Test getting status for completed workflow.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Step 1", status="completed") + simulator.add_step("Step 2", status="completed") + + status = simulator.get_status() + + assert status["total"] == 2 + assert status["completed"] == 2 + assert status["failed"] == 0 + assert status["status"] == "completed" + + def test_get_status_failed_workflow(self): + """Test getting status for failed workflow.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Step 1", status="completed") + simulator.add_step("Step 2", status="failed") + + status = simulator.get_status() + + assert status["status"] == "failed" + + def test_to_log(self): + """Test converting steps to log format.""" + simulator = WorkflowStepSimulator() + + simulator.add_step("Download", status="completed") + simulator.add_step("Build", status="failed", error="Build failed") + + log = simulator.to_log() + + assert "Download" in log + assert "✓" in log + assert "Build" in log + assert "✗" in log + + +class TestArtifactDownloaderSimulator: + """Tests for artifact downloader simulator.""" + + def test_download_asset_success(self): + """Test successful asset download.""" + downloader = ArtifactDownloaderSimulator() + + result = downloader.download_asset( + "sol-server.zip", + "https://github.com/eveningsco/sol-server/releases/download/v1.0.0/sol-server.zip" + ) + + assert result is True + assert "sol-server.zip" in downloader.downloaded_assets + assert "hash" in downloader.downloaded_assets["sol-server.zip"] + + def test_download_asset_verify(self): + """Test asset verification.""" + downloader = ArtifactDownloaderSimulator() + + downloader.download_asset("test.zip", "http://example.com/test.zip") + + assert downloader.verify_asset("test.zip") is True + assert downloader.verify_asset("nonexistent.zip") is False + + def test_get_missing_assets(self): + """Test getting missing assets.""" + downloader = ArtifactDownloaderSimulator() + + # Download only one asset + downloader.download_asset("sol-server.zip", "url") + + missing = downloader.get_missing_assets(["sol-server.zip", "sol_software"]) + + assert "sol_software" in missing + assert "sol-server.zip" not in missing + + def test_get_summary(self): + """Test getting download summary.""" + downloader = ArtifactDownloaderSimulator() + + downloader.download_asset("asset1.zip", "url1") + downloader.download_asset("asset2.zip", "url2") + + summary = downloader.get_summary() + + assert summary["total_downloaded"] == 2 + assert "asset1.zip" in summary["assets"] + assert "asset2.zip" in summary["assets"] + + +class TestReleaseMetadataGenerator: + """Tests for release metadata generator.""" + + def test_generate_metadata(self): + """Test metadata generation.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123def"} + ) + downloader = ArtifactDownloaderSimulator() + + # Download some assets + downloader.download_asset("sol-server.zip", "url1") + downloader.download_asset("sol_software", "url2") + + generator = ReleaseMetadataGenerator(event, downloader) + metadata = generator.generate_metadata() + + assert "release_id" in metadata + assert "build_date" in metadata + assert metadata["component"]["repo"] == "sol-software" + assert metadata["component"]["commit_sha"] == "abc123def" + + def test_validate_assets_complete(self): + """Test validation with all assets downloaded.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + downloader = ArtifactDownloaderSimulator() + + # Download all required assets + for asset in ReleaseMetadataGenerator.REQUIRED_ASSETS: + downloader.download_asset(asset, f"url_{asset}") + + generator = ReleaseMetadataGenerator(event, downloader) + generator.generate_metadata() + + validation = generator._validate_assets() + + assert validation["is_complete"] is True + assert len(validation["missing"]) == 0 + + def test_validate_assets_incomplete(self): + """Test validation with missing assets.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + downloader = ArtifactDownloaderSimulator() + + generator = ReleaseMetadataGenerator(event, downloader) + generator.generate_metadata() + + validation = generator._validate_assets() + + assert validation["is_complete"] is False + assert len(validation["missing"]) > 0 + + def test_generate_release_body(self): + """Test release body generation.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + downloader = ArtifactDownloaderSimulator() + + generator = ReleaseMetadataGenerator(event, downloader) + body = generator.generate_release_body() + + assert "SOL Release Package" in body + assert "Component Information" in body + assert "sol-software" in body + assert "abc123" in body + # Check for assets section (case-insensitive) + assert "asset" in body.lower() + + def test_get_component_metadata_dict(self): + """Test getting component metadata in fetcher format.""" + event = WorkflowEventSimulator( + "sol_server_release", + {"tag_name": "v2.0.0", "commit_sha": "def456", "branch": "main"} + ) + downloader = ArtifactDownloaderSimulator() + + generator = ReleaseMetadataGenerator(event, downloader) + metadata = generator.get_component_metadata_dict() + + assert metadata["component"] == "eveningsco/sol-server" + assert metadata["tag_name"] == "v2.0.0" + assert metadata["branch"] == "main" + + +class TestWorkflowOrchestratorSimulator: + """Tests for workflow orchestrator simulator.""" + + def test_execute_workflow_success(self): + """Test successful workflow execution.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + + result = orchestrator.execute_workflow() + + # Workflow should complete (may have missing assets warning) + assert result["status"]["status"] in ["completed", "failed"] + assert result["status"]["total"] >= 2 + assert "metadata" in result + assert "download_summary" in result + assert "steps_log" in result + + def test_execute_workflow_invalid_event(self): + """Test workflow with invalid event type.""" + orchestrator = WorkflowOrchestratorSimulator( + "invalid_event", + {} + ) + + result = orchestrator.execute_workflow() + + # Should return early with failed status (just status dict, not full result) + assert isinstance(result, dict) + assert "status" in result + assert result["status"] == "failed" + # Check that the failed step is recorded + assert any("validate_event" in step.get("name", "") for step in orchestrator.steps.steps) + assert any(step.get("status") == "failed" for step in orchestrator.steps.steps) + + def test_execute_workflow_with_missing_assets(self): + """Test workflow with missing required assets.""" + # Create orchestrator that doesn't download all assets + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + + result = orchestrator.execute_workflow() + + # Workflow should complete with warning about missing assets + assert result["status"]["status"] == "completed" + assert "metadata" in result + assert orchestrator.metadata_generator._validate_assets()["is_complete"] is False + + def test_get_status(self): + """Test getting workflow status.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + + status = orchestrator.get_status() + + assert "total" in status + assert "completed" in status + assert "failed" in status + assert "status" in status + + def test_get_log(self): + """Test getting workflow execution log.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + + orchestrator.execute_workflow() + + log = orchestrator.get_log() + + assert "workflow_started" in log + assert "workflow_completed" in log + assert "✓" in log or "○" in log + + +class TestWorkflowIntegration: + """Integration tests for workflow simulation.""" + + def test_full_workflow_with_all_assets(self): + """Test complete workflow with all assets.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123def"} + ) + downloader = ArtifactDownloaderSimulator() + + # Simulate downloading all required assets + for asset in ReleaseMetadataGenerator.REQUIRED_ASSETS: + url = f"https://github.com/eveningsco/sol-software/releases/download/v1.0.0/{asset}" + downloader.download_asset(asset, url) + + generator = ReleaseMetadataGenerator(event, downloader) + metadata = generator.generate_metadata() + + # Validate completeness + validation = generator._validate_assets() + assert validation["is_complete"] is True + assert validation["completion_percentage"] == 100.0 + + def test_workflow_with_partial_assets(self): + """Test workflow with partial asset downloads.""" + event = WorkflowEventSimulator( + "sol_server_release", + {"tag_name": "v2.0.0", "commit_sha": "xyz789"} + ) + downloader = ArtifactDownloaderSimulator() + + # Download only half of assets + assets = ReleaseMetadataGenerator.REQUIRED_ASSETS[:4] + for asset in assets: + downloader.download_asset(asset, f"https://example.com/{asset}") + + generator = ReleaseMetadataGenerator(event, downloader) + metadata = generator.generate_metadata() + + validation = generator._validate_assets() + assert validation["is_complete"] is False + # Use approximate comparison for floating point + assert abs(validation["completion_percentage"] - 44.44) < 0.01 + + def test_workflow_execution_with_validation(self): + """Test workflow execution and metadata validation.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"tag_name": "v1.5.0", "commit_sha": "test123"} + ) + + result = orchestrator.execute_workflow() + + # Check that all required components are present + assert "status" in result + assert "metadata" in result + assert "steps_log" in result + + # Verify metadata structure + metadata = result["metadata"] + assert "release_id" in metadata + assert "build_date" in metadata + assert "component" in metadata + assert "assets" in metadata + assert "validation" in metadata + + +class TestWorkflowEdgeCases: + """Tests for workflow edge cases.""" + + def test_workflow_with_empty_payload(self): + """Test workflow with empty event payload.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {} + ) + + result = orchestrator.execute_workflow() + + # Should handle empty tag gracefully + assert "metadata" in result + assert result["metadata"]["component"]["tag"] == "" + + def test_workflow_with_invalid_commit_sha(self): + """Test workflow with invalid commit SHA format.""" + orchestrator = WorkflowOrchestratorSimulator( + "sol_software_release", + {"commit_sha": "invalid_sha_format!"} + ) + + result = orchestrator.execute_workflow() + + # Should still process but metadata will contain invalid SHA + assert result["metadata"]["component"]["commit_sha"] == "invalid_sha_format!" + + def test_workflow_with_missing_component_repo(self): + """Test workflow with non-existent component.""" + orchestrator = WorkflowOrchestratorSimulator( + "unknown_event_type", + {} + ) + + result = orchestrator.execute_workflow() + + # Should fail with invalid event type + # Returns early with just status dict + assert isinstance(result, dict) + assert "status" in result + assert result["status"] == "failed" + + def test_metadata_generator_with_no_downloads(self): + """Test metadata generator when no assets are downloaded.""" + event = WorkflowEventSimulator( + "sol_software_release", + {"tag_name": "v1.0.0", "commit_sha": "abc123"} + ) + downloader = ArtifactDownloaderSimulator() + + generator = ReleaseMetadataGenerator(event, downloader) + metadata = generator.generate_metadata() + + validation = generator._validate_assets() + assert validation["is_complete"] is False + assert validation["downloaded"] == 0 + assert validation["completion_percentage"] == 0.0 + + def test_workflow_step_empty_status(self): + """Test status with no steps recorded.""" + simulator = WorkflowStepSimulator() + + status = simulator.get_status() + + assert status["total"] == 0 + assert status["status"] == "empty" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])