From cbe1e73565d80656931cf7a89cd4fb6a3a262323 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 05:11:54 +0000 Subject: [PATCH 1/6] Initial plan From a4cac82272f37cd9e2ac788017c10d1fe3f3b3ee Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 05:16:17 +0000 Subject: [PATCH 2/6] Add new feature modules from PR #66 Co-authored-by: AdmGenSameer <154604600+AdmGenSameer@users.noreply.github.com> --- archpkg/config_manager.py | 135 +++++++++ archpkg/download_manager.py | 353 +++++++++++++++++++++++ archpkg/github_install.py | 511 ++++++++++++++++++++++++++++++++++ archpkg/installed_apps.py | 276 ++++++++++++++++++ archpkg/security.py | 264 ++++++++++++++++++ archpkg/templates/home.html | 332 ++++++++++++++++++++++ archpkg/templates/search.html | 218 +++++++++++++++ archpkg/update_manager.py | 225 +++++++++++++++ archpkg/web.py | 123 ++++++++ pyproject.toml | 8 +- 10 files changed, 2443 insertions(+), 2 deletions(-) create mode 100644 archpkg/config_manager.py create mode 100644 archpkg/download_manager.py create mode 100644 archpkg/github_install.py create mode 100644 archpkg/installed_apps.py create mode 100644 archpkg/security.py create mode 100644 archpkg/templates/home.html create mode 100644 archpkg/templates/search.html create mode 100644 archpkg/update_manager.py create mode 100644 archpkg/web.py diff --git a/archpkg/config_manager.py b/archpkg/config_manager.py new file mode 100644 index 0000000..2d12a0f --- /dev/null +++ b/archpkg/config_manager.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +Configuration management for archpkg-helper +""" + +import json +import os +from pathlib import Path +from typing import Dict, Any, Optional +from dataclasses import dataclass, asdict +from archpkg.logging_config import get_logger + +logger = get_logger(__name__) + +@dataclass +class UserConfig: + """User configuration settings""" + auto_update_enabled: bool = False + auto_update_mode: str = "manual" # "automatic" or "manual" + update_check_interval_hours: int = 24 + background_download_enabled: bool = True + notification_enabled: bool = True + +class ConfigManager: + """Manages user configuration with atomic file operations""" + + def __init__(self): + self.config_dir = Path.home() / ".archpkg" + self.config_file = self.config_dir / "config.json" + self._ensure_config_dir() + + def _ensure_config_dir(self) -> None: + """Ensure configuration directory exists""" + self.config_dir.mkdir(parents=True, exist_ok=True) + + def _atomic_write(self, data: Dict[str, Any]) -> None: + """Atomically write configuration to file""" + temp_file = self.config_file.with_suffix('.tmp') + + try: + # Write to temporary file first + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + # Atomic move to final location + temp_file.replace(self.config_file) + logger.info(f"Configuration saved to {self.config_file}") + + except Exception as e: + # Clean up temp file on error + if temp_file.exists(): + temp_file.unlink() + logger.error(f"Failed to save configuration: {e}") + raise + + def load_config(self) -> UserConfig: + """Load configuration from file""" + if not self.config_file.exists(): + logger.info("No configuration file found, using defaults") + return UserConfig() + + try: + with open(self.config_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Create config object from loaded data + config = UserConfig() + for key, value in data.items(): + if hasattr(config, key): + setattr(config, key, value) + + logger.info("Configuration loaded successfully") + return config + + except Exception as e: + logger.warning(f"Failed to load configuration, using defaults: {e}") + return UserConfig() + + def save_config(self, config: UserConfig) -> None: + """Save configuration to file atomically""" + data = asdict(config) + self._atomic_write(data) + + def get_config_value(self, key: str) -> Any: + """Get a specific configuration value""" + config = self.load_config() + return getattr(config, key, None) + + def set_config_value(self, key: str, value: Any) -> None: + """Set a specific configuration value""" + config = self.load_config() + if hasattr(config, key): + setattr(config, key, value) + self.save_config(config) + logger.info(f"Configuration updated: {key} = {value}") + else: + raise ValueError(f"Unknown configuration key: {key}") + + def show_config(self) -> str: + """Get formatted configuration display""" + config = self.load_config() + lines = [ + f"Configuration file: {self.config_file}", + "", + "Current settings:", + f" Auto-update enabled: {config.auto_update_enabled}", + f" Auto-update mode: {config.auto_update_mode}", + f" Update check interval: {config.update_check_interval_hours} hours", + f" Background download: {config.background_download_enabled}", + f" Notifications: {config.notification_enabled}", + ] + return "\n".join(lines) + +# Global configuration manager instance +config_manager = ConfigManager() + +def get_user_config() -> UserConfig: + """Get current user configuration""" + return config_manager.load_config() + +def save_user_config(config: UserConfig) -> None: + """Save user configuration""" + config_manager.save_config(config) + +def set_config_option(key: str, value: Any) -> None: + """Set a configuration option""" + config_manager.set_config_value(key, value) + +def get_config_option(key: str) -> Any: + """Get a configuration option""" + return config_manager.get_config_value(key) + +def show_current_config() -> str: + """Show current configuration""" + return config_manager.show_config() \ No newline at end of file diff --git a/archpkg/download_manager.py b/archpkg/download_manager.py new file mode 100644 index 0000000..f618997 --- /dev/null +++ b/archpkg/download_manager.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +Background downloading and installation for archpkg-helper +""" + +import os +import hashlib +import tempfile +import threading +import time +from typing import Dict, List, Optional, Any, Callable +from pathlib import Path +from datetime import datetime, timezone +from urllib.request import urlopen, Request +from urllib.error import URLError, HTTPError +from archpkg.logging_config import get_logger +from archpkg.installed_apps import ( + get_all_installed_packages, + update_package_info, + get_packages_with_updates +) +from archpkg.config_manager import get_user_config + +logger = get_logger(__name__) + +class DownloadManager: + """Manages background downloads with resumability""" + + def __init__(self): + self.active_downloads: Dict[str, Dict[str, Any]] = {} + self.download_dir = Path.home() / ".archpkg" / "downloads" + self.download_dir.mkdir(parents=True, exist_ok=True) + + def start_download(self, package_name: str, download_url: str, + callback: Optional[Callable] = None) -> str: + """Start a background download""" + if package_name in self.active_downloads: + logger.warning(f"Download already in progress for {package_name}") + return self.active_downloads[package_name]["download_id"] + + download_id = f"{package_name}_{int(datetime.now(timezone.utc).timestamp())}" + temp_file = self.download_dir / f"{download_id}.tmp" + + download_info = { + "download_id": download_id, + "package_name": package_name, + "url": download_url, + "temp_file": temp_file, + "status": "starting", + "progress": 0, + "total_size": 0, + "downloaded_size": 0, + "callback": callback, + "thread": None + } + + self.active_downloads[package_name] = download_info + + # Start download in background thread + thread = threading.Thread( + target=self._download_worker, + args=(download_info,), + daemon=True, + name=f"download-{package_name}" + ) + download_info["thread"] = thread + thread.start() + + logger.info(f"Started download for {package_name}: {download_id}") + return download_id + + def _download_worker(self, download_info: Dict[str, Any]) -> None: + """Background download worker""" + package_name = download_info["package_name"] + url = download_info["url"] + temp_file = download_info["temp_file"] + + try: + download_info["status"] = "downloading" + + # Create request with resume support + headers = {} + if temp_file.exists(): + # Resume download + downloaded_size = temp_file.stat().st_size + headers["Range"] = f"bytes={downloaded_size}-" + download_info["downloaded_size"] = downloaded_size + logger.info(f"Resuming download for {package_name} from {downloaded_size} bytes") + + req = Request(url, headers=headers) + + with urlopen(req) as response: + total_size = int(response.headers.get("content-length", 0)) + download_info["total_size"] = total_size + + if total_size > 0 and download_info["downloaded_size"] > 0: + # Verify we're resuming correctly + if response.code != 206: # 206 Partial Content + logger.warning(f"Server doesn't support resume for {package_name}, restarting") + download_info["downloaded_size"] = 0 + temp_file.unlink(missing_ok=True) + + mode = "ab" if download_info["downloaded_size"] > 0 else "wb" + + with open(temp_file, mode) as f: + downloaded = download_info["downloaded_size"] + + while True: + chunk = response.read(8192) + if not chunk: + break + + f.write(chunk) + downloaded += len(chunk) + download_info["downloaded_size"] = downloaded + + if total_size > 0: + progress = (downloaded / total_size) * 100 + download_info["progress"] = progress + + download_info["status"] = "completed" + logger.info(f"Download completed for {package_name}") + + # Notify callback if provided + if download_info["callback"]: + try: + download_info["callback"](download_info) + except Exception as e: + logger.error(f"Download callback error for {package_name}: {e}") + + except HTTPError as e: + download_info["status"] = "failed" + download_info["error"] = f"HTTP {e.code}: {e.reason}" + logger.error(f"Download failed for {package_name}: {e}") + except URLError as e: + download_info["status"] = "failed" + download_info["error"] = str(e.reason) + logger.error(f"Download failed for {package_name}: {e}") + except Exception as e: + download_info["status"] = "failed" + download_info["error"] = str(e) + logger.error(f"Download failed for {package_name}: {e}") + + def get_download_status(self, package_name: str) -> Optional[Dict[str, Any]]: + """Get status of a download""" + return self.active_downloads.get(package_name) + + def cancel_download(self, package_name: str) -> bool: + """Cancel a download""" + if package_name not in self.active_downloads: + return False + + download_info = self.active_downloads[package_name] + download_info["status"] = "cancelled" + + # Clean up temp file + temp_file = download_info["temp_file"] + if temp_file.exists(): + temp_file.unlink() + + del self.active_downloads[package_name] + logger.info(f"Download cancelled for {package_name}") + return True + + def get_completed_downloads(self) -> List[Dict[str, Any]]: + """Get list of completed downloads""" + return [ + info for info in self.active_downloads.values() + if info["status"] == "completed" + ] + + def cleanup_old_downloads(self, days_old: int = 7) -> None: + """Clean up old temporary download files""" + cutoff_time = datetime.now(timezone.utc).timestamp() - (days_old * 24 * 3600) + + for temp_file in self.download_dir.glob("*.tmp"): + if temp_file.stat().st_mtime < cutoff_time: + temp_file.unlink() + logger.debug(f"Cleaned up old download file: {temp_file}") + +class UpdateInstaller: + """Handles installation of downloaded updates""" + + def __init__(self): + self.download_manager = DownloadManager() + + def install_updates(self, package_names: Optional[List[str]] = None) -> Dict[str, Any]: + """Install updates for specified packages or all available updates""" + config = get_user_config() + + if package_names is None: + # Get all packages with available updates + packages_with_updates = get_packages_with_updates() + package_names = [p.name for p in packages_with_updates] + + if not package_names: + return {"status": "success", "installed": 0, "message": "No updates available"} + + installed_count = 0 + failed_count = 0 + results = [] + + for package_name in package_names: + try: + result = self._install_single_update(package_name) + results.append(result) + + if result["status"] == "success": + installed_count += 1 + else: + failed_count += 1 + + except Exception as e: + logger.error(f"Failed to install update for {package_name}: {e}") + results.append({ + "package": package_name, + "status": "error", + "error": str(e) + }) + failed_count += 1 + + return { + "status": "success", + "installed": installed_count, + "failed": failed_count, + "results": results + } + + def _install_single_update(self, package_name: str) -> Dict[str, Any]: + """Install update for a single package""" + logger.info(f"Installing update for {package_name}") + + # This is a placeholder - actual installation would depend on the package source + # For now, we'll just mark it as installed + + try: + # In a real implementation, this would: + # 1. Get the package info + # 2. Run the appropriate installation command based on source + # 3. Verify installation + # 4. Update the installed package info + + update_package_info( + package_name, + installed_version="latest", # Would be actual version + update_available=False, + last_updated=datetime.now(timezone.utc).isoformat() + ) + + return { + "package": package_name, + "status": "success", + "message": "Update installed successfully" + } + + except Exception as e: + return { + "package": package_name, + "status": "failed", + "error": str(e) + } + +class BackgroundUpdateService: + """Complete background update service""" + + def __init__(self): + self.download_manager = DownloadManager() + self.update_installer = UpdateInstaller() + self.is_running = False + self.background_thread = None + + def start_service(self) -> None: + """Start the background update service""" + if self.is_running: + return + + config = get_user_config() + if not config.auto_update_enabled: + logger.info("Auto-update not enabled") + return + + self.is_running = True + + self.background_thread = threading.Thread( + target=self._background_worker, + daemon=True, + name="archpkg-background-updates" + ) + self.background_thread.start() + + logger.info("Background update service started") + + def stop_service(self) -> None: + """Stop the background update service""" + self.is_running = False + if self.background_thread: + self.background_thread.join(timeout=5) + logger.info("Background update service stopped") + + def _background_worker(self) -> None: + """Background worker for automatic updates""" + logger.info("Background update worker started") + + while self.is_running: + try: + config = get_user_config() + + if config.auto_install_updates: + # Automatically install available updates + packages_with_updates = get_packages_with_updates() + if packages_with_updates: + logger.info(f"Auto-installing {len(packages_with_updates)} updates") + result = self.update_installer.install_updates() + logger.info(f"Auto-install result: {result}") + + # Clean up old downloads periodically + self.download_manager.cleanup_old_downloads() + + # Sleep for check interval + sleep_time = config.update_check_interval_hours * 3600 + time.sleep(min(sleep_time, 3600)) # Sleep in 1-hour chunks + + except Exception as e: + logger.error(f"Background update worker error: {e}") + time.sleep(3600) # Wait 1 hour before retrying + + logger.info("Background update worker stopped") + +# Global instances +download_manager = DownloadManager() +update_installer = UpdateInstaller() +background_update_service = BackgroundUpdateService() + +def start_download(package_name: str, download_url: str, + callback: Optional[Callable] = None) -> str: + """Start a background download""" + return download_manager.start_download(package_name, download_url, callback) + +def get_download_status(package_name: str) -> Optional[Dict[str, Any]]: + """Get download status""" + return download_manager.get_download_status(package_name) + +def install_updates(package_names: Optional[List[str]] = None) -> Dict[str, Any]: + """Install updates""" + return update_installer.install_updates(package_names) + +def start_background_update_service() -> None: + """Start background update service""" + background_update_service.start_service() + +def stop_background_update_service() -> None: + """Stop background update service""" + background_update_service.stop_service() \ No newline at end of file diff --git a/archpkg/github_install.py b/archpkg/github_install.py new file mode 100644 index 0000000..39ff12f --- /dev/null +++ b/archpkg/github_install.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 +""" +GitHub repository installation module for archpkg-helper +""" + +import os +import sys +import shutil +import tempfile +import subprocess +import re +from typing import Optional, Dict, List, Tuple, Any +from pathlib import Path +from abc import ABC, abstractmethod + +from archpkg.logging_config import get_logger + +logger = get_logger(__name__) + +class ProjectTypeHandler(ABC): + """Abstract base class for project type handlers""" + + @property + @abstractmethod + def name(self) -> str: + """Name of the project type""" + pass + + @property + @abstractmethod + def indicators(self) -> List[str]: + """Files that indicate this project type""" + pass + + @abstractmethod + def can_handle(self, repo_path: Path) -> bool: + """Check if this handler can handle the project""" + pass + + @abstractmethod + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + """Build and install the project""" + pass + + def log_command(self, command: List[str], description: str) -> None: + """Log a command execution""" + logger.info(f"{description}: {' '.join(command)}") + print(f" → {description}: {' '.join(command)}") + +class PythonHandler(ProjectTypeHandler): + """Handler for Python projects""" + + @property + def name(self) -> str: + return "Python" + + @property + def indicators(self) -> List[str]: + return ["setup.py", "pyproject.toml", "requirements.txt", "Pipfile"] + + def can_handle(self, repo_path: Path) -> bool: + return any((repo_path / indicator).exists() for indicator in self.indicators) + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + # Change to repo directory + os.chdir(repo_path) + + # Check for setup.py or pyproject.toml + if (repo_path / "setup.py").exists(): + self.log_command(["pip", "install", "."], "Installing Python package") + result = subprocess.run(["pip", "install", "."], capture_output=True, text=True) + elif (repo_path / "pyproject.toml").exists(): + self.log_command(["pip", "install", "."], "Installing Python package (PEP 517)") + result = subprocess.run(["pip", "install", "."], capture_output=True, text=True) + else: + print(" ⚠ No setup.py or pyproject.toml found, installing requirements if present") + if (repo_path / "requirements.txt").exists(): + self.log_command(["pip", "install", "-r", "requirements.txt"], "Installing requirements") + result = subprocess.run(["pip", "install", "-r", "requirements.txt"], capture_output=True, text=True) + else: + print(" ✗ No installation method found for Python project") + return False + + if result.returncode == 0: + print(" ✓ Python package installed successfully") + return True + else: + print(f" ✗ Python installation failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ Python installation error: {e}") + return False + +class NodeJSHandler(ProjectTypeHandler): + """Handler for Node.js projects""" + + @property + def name(self) -> str: + return "Node.js" + + @property + def indicators(self) -> List[str]: + return ["package.json", "yarn.lock", "package-lock.json"] + + def can_handle(self, repo_path: Path) -> bool: + return (repo_path / "package.json").exists() + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + os.chdir(repo_path) + + # Check if yarn or npm should be used + use_yarn = (repo_path / "yarn.lock").exists() + + if use_yarn: + # Install dependencies with yarn + self.log_command(["yarn", "install"], "Installing dependencies with yarn") + result = subprocess.run(["yarn", "install"], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ Yarn install failed: {result.stderr}") + return False + + # Build if build script exists + if self._has_build_script(): + self.log_command(["yarn", "build"], "Building project with yarn") + result = subprocess.run(["yarn", "build"], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ Yarn build failed: {result.stderr}") + return False + + # Install globally if it's a CLI tool + if self._is_cli_tool(): + self.log_command(["yarn", "global", "add", "."], "Installing CLI tool globally") + result = subprocess.run(["yarn", "global", "add", "."], capture_output=True, text=True) + else: + print(" ⚠ Not a CLI tool, skipping global installation") + return True + + else: + # Use npm + self.log_command(["npm", "install"], "Installing dependencies with npm") + result = subprocess.run(["npm", "install"], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ NPM install failed: {result.stderr}") + return False + + # Build if build script exists + if self._has_build_script(): + self.log_command(["npm", "run", "build"], "Building project with npm") + result = subprocess.run(["npm", "run", "build"], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ NPM build failed: {result.stderr}") + return False + + # Install globally if it's a CLI tool + if self._is_cli_tool(): + self.log_command(["npm", "install", "-g", "."], "Installing CLI tool globally") + result = subprocess.run(["npm", "install", "-g", "."], capture_output=True, text=True) + else: + print(" ⚠ Not a CLI tool, skipping global installation") + return True + + if result.returncode == 0: + print(" ✓ Node.js package installed successfully") + return True + else: + print(f" ✗ Node.js installation failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ Node.js installation error: {e}") + return False + + def _has_build_script(self) -> bool: + """Check if package.json has a build script""" + try: + import json + with open("package.json", "r") as f: + data = json.load(f) + return "scripts" in data and "build" in data["scripts"] + except: + return False + + def _is_cli_tool(self) -> bool: + """Check if this is a CLI tool by looking for bin field""" + try: + import json + with open("package.json", "r") as f: + data = json.load(f) + return "bin" in data or ("name" in data and data["name"].startswith("@")) + except: + return False + +class CMakeHandler(ProjectTypeHandler): + """Handler for CMake projects""" + + @property + def name(self) -> str: + return "CMake" + + @property + def indicators(self) -> List[str]: + return ["CMakeLists.txt", "cmake"] + + def can_handle(self, repo_path: Path) -> bool: + return (repo_path / "CMakeLists.txt").exists() + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + os.chdir(repo_path) + + # Create build directory + build_dir = repo_path / "build" + build_dir.mkdir(exist_ok=True) + os.chdir(build_dir) + + # Configure with CMake + self.log_command(["cmake", ".."], "Configuring with CMake") + result = subprocess.run(["cmake", ".."], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ CMake configure failed: {result.stderr}") + return False + + # Build + self.log_command(["make", "-j$(nproc)"], "Building with make") + result = subprocess.run(["make", f"-j{os.cpu_count() or 1}"], capture_output=True, text=True) + if result.returncode != 0: + print(f" ✗ Make build failed: {result.stderr}") + return False + + # Install + self.log_command(["sudo", "make", "install"], "Installing with make") + result = subprocess.run(["sudo", "make", "install"], capture_output=True, text=True) + if result.returncode == 0: + print(" ✓ CMake project installed successfully") + return True + else: + print(f" ✗ Make install failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ CMake installation error: {e}") + return False + +class MakefileHandler(ProjectTypeHandler): + """Handler for Makefile projects""" + + @property + def name(self) -> str: + return "Makefile" + + @property + def indicators(self) -> List[str]: + return ["Makefile", "makefile"] + + def can_handle(self, repo_path: Path) -> bool: + return (repo_path / "Makefile").exists() or (repo_path / "makefile").exists() + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + os.chdir(repo_path) + + # Try make install first + self.log_command(["make", "install"], "Installing with make") + result = subprocess.run(["make", "install"], capture_output=True, text=True) + + if result.returncode == 0: + print(" ✓ Makefile project installed successfully") + return True + else: + # Try with sudo + self.log_command(["sudo", "make", "install"], "Installing with sudo make") + result = subprocess.run(["sudo", "make", "install"], capture_output=True, text=True) + if result.returncode == 0: + print(" ✓ Makefile project installed successfully") + return True + else: + print(f" ✗ Make install failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ Makefile installation error: {e}") + return False + +class GoHandler(ProjectTypeHandler): + """Handler for Go projects""" + + @property + def name(self) -> str: + return "Go" + + @property + def indicators(self) -> List[str]: + return ["go.mod", "main.go", ".go"] + + def can_handle(self, repo_path: Path) -> bool: + return (repo_path / "go.mod").exists() or any(f.suffix == ".go" for f in repo_path.glob("*.go")) + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + os.chdir(repo_path) + + # Install with go install + self.log_command(["go", "install", "."], "Installing Go package") + result = subprocess.run(["go", "install", "."], capture_output=True, text=True) + + if result.returncode == 0: + print(" ✓ Go package installed successfully") + return True + else: + print(f" ✗ Go install failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ Go installation error: {e}") + return False + +class RustHandler(ProjectTypeHandler): + """Handler for Rust projects""" + + @property + def name(self) -> str: + return "Rust" + + @property + def indicators(self) -> List[str]: + return ["Cargo.toml", "Cargo.lock"] + + def can_handle(self, repo_path: Path) -> bool: + return (repo_path / "Cargo.toml").exists() + + def build_and_install(self, repo_path: Path, temp_dir: Path) -> bool: + try: + os.chdir(repo_path) + + # Install with cargo + self.log_command(["cargo", "install", "--path", "."], "Installing Rust package") + result = subprocess.run(["cargo", "install", "--path", "."], capture_output=True, text=True) + + if result.returncode == 0: + print(" ✓ Rust package installed successfully") + return True + else: + print(f" ✗ Cargo install failed: {result.stderr}") + return False + + except Exception as e: + print(f" ✗ Rust installation error: {e}") + return False + +class ProjectTypeRegistry: + """Registry for project type handlers""" + + def __init__(self): + self.handlers: List[ProjectTypeHandler] = [ + PythonHandler(), + NodeJSHandler(), + CMakeHandler(), + MakefileHandler(), + GoHandler(), + RustHandler(), + ] + + def detect_project_type(self, repo_path: Path) -> Optional[ProjectTypeHandler]: + """Detect the project type and return appropriate handler""" + for handler in self.handlers: + if handler.can_handle(repo_path): + return handler + return None + + def get_supported_types(self) -> List[str]: + """Get list of supported project types""" + return [handler.name for handler in self.handlers] + +def clone_repository(repo_url: str, temp_dir: Path) -> Optional[Path]: + """Clone a GitHub repository to a temporary directory""" + try: + print(f"📥 Cloning repository: {repo_url}") + + # Use GitPython if available, otherwise use subprocess + try: + from git import Repo + logger.info(f"Cloning {repo_url} to {temp_dir}") + Repo.clone_from(repo_url, temp_dir) + print(" ✓ Repository cloned successfully") + return temp_dir + except ImportError: + # Fallback to subprocess + logger.info(f"Cloning {repo_url} to {temp_dir} using subprocess") + result = subprocess.run(["git", "clone", repo_url, str(temp_dir)], capture_output=True, text=True) + if result.returncode == 0: + print(" ✓ Repository cloned successfully") + return temp_dir + else: + print(f" ✗ Git clone failed: {result.stderr}") + return None + + except Exception as e: + print(f" ✗ Repository cloning error: {e}") + return None + +def validate_github_url(url_or_repo: str) -> Optional[str]: + """Validate and convert GitHub URL or user/repo format to full URL""" + # Handle github:user/repo format + if url_or_repo.startswith("github:"): + repo = url_or_repo[7:] # Remove "github:" prefix + if "/" not in repo: + print(" ✗ Invalid GitHub repo format. Use: github:user/repo") + return None + return f"https://github.com/{repo}.git" + + # Handle full GitHub URLs + if url_or_repo.startswith("https://github.com/"): + if not url_or_repo.endswith(".git"): + url_or_repo += ".git" + return url_or_repo + + # Handle git@github.com:user/repo.git format + if url_or_repo.startswith("git@github.com:"): + return url_or_repo + + print(" ✗ Invalid GitHub URL format. Use: github:user/repo or https://github.com/user/repo") + return None + +def install_from_github(repo_spec: str) -> bool: + """Main function to install from GitHub repository""" + print(f"\n🔧 Installing from GitHub: {repo_spec}") + + # Validate the repo specification + repo_url = validate_github_url(repo_spec) + if not repo_url: + return False + + # Create temporary directory + temp_dir = None + try: + temp_dir = Path(tempfile.mkdtemp(prefix="archpkg-github-")) + print(f"📁 Working in temporary directory: {temp_dir}") + + # Clone the repository + repo_path = clone_repository(repo_url, temp_dir) + if not repo_path: + return False + + # Detect project type + registry = ProjectTypeRegistry() + handler = registry.detect_project_type(repo_path) + + if not handler: + # List found files to help user + files = list(repo_path.glob("*")) + file_names = [f.name for f in files[:10]] # Show first 10 files + print(f" ✗ Unsupported project type") + print(f" 📄 Found files: {', '.join(file_names)}") + if len(files) > 10: + print(f" ... and {len(files) - 10} more files") + print(f" 💡 Supported types: {', '.join(registry.get_supported_types())}") + print(f" 💡 Try manual installation or request support for this project type") + return False + + print(f" 🔍 Detected project type: {handler.name}") + + # Build and install + success = handler.build_and_install(repo_path, temp_dir) + + if success: + print(f"🎉 Successfully installed {repo_spec}!") + return True + else: + print(f"❌ Failed to install {repo_spec}") + return False + + except Exception as e: + print(f" ✗ Unexpected error during installation: {e}") + return False + + finally: + # Clean up temporary directory + if temp_dir and temp_dir.exists(): + print(f"🧹 Cleaning up temporary directory: {temp_dir}") + try: + shutil.rmtree(temp_dir) + print(" ✓ Cleanup completed") + except Exception as e: + print(f" ⚠ Cleanup warning: {e}") + +def check_dependencies() -> Dict[str, bool]: + """Check if required dependencies are available""" + deps = { + "git": False, + "python": False, + "node": False, + "npm": False, + "yarn": False, + "cmake": False, + "make": False, + "go": False, + "cargo": False, + } + + # Check each dependency + for dep in deps.keys(): + try: + result = subprocess.run([dep, "--version"], capture_output=True, timeout=5) + deps[dep] = result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): + deps[dep] = False + + return deps \ No newline at end of file diff --git a/archpkg/installed_apps.py b/archpkg/installed_apps.py new file mode 100644 index 0000000..37ec249 --- /dev/null +++ b/archpkg/installed_apps.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +Installed applications tracking for archpkg-helper +""" + +import json +import os +from pathlib import Path +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, asdict +from datetime import datetime, timezone +from archpkg.logging_config import get_logger + +logger = get_logger(__name__) + +@dataclass +class InstalledPackage: + """Represents an installed package""" + name: str + version: Optional[str] = None + source: str = "unknown" + install_date: str = "" + last_update_check: Optional[str] = None + available_version: Optional[str] = None + update_available: bool = False + install_method: str = "archpkg" # "archpkg", "github", "manual" + +class InstalledAppsManager: + """Manages tracking of installed applications""" + + def __init__(self): + self.config_dir = Path.home() / ".archpkg" + self.installed_file = self.config_dir / "installed.json" + self._ensure_config_dir() + + def _ensure_config_dir(self) -> None: + """Ensure configuration directory exists""" + self.config_dir.mkdir(parents=True, exist_ok=True) + + def _atomic_write(self, data: Dict[str, Any]) -> None: + """Atomically write installed apps data to file""" + temp_file = self.installed_file.with_suffix('.tmp') + + try: + # Write to temporary file first + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + # Atomic move to final location + temp_file.replace(self.installed_file) + logger.debug(f"Installed apps data saved to {self.installed_file}") + + except Exception as e: + # Clean up temp file on error + if temp_file.exists(): + temp_file.unlink() + logger.error(f"Failed to save installed apps data: {e}") + raise + + def _load_installed_data(self) -> Dict[str, Dict[str, Any]]: + """Load installed packages data from file""" + if not self.installed_file.exists(): + logger.debug("No installed apps file found, starting fresh") + return {} + + try: + with open(self.installed_file, 'r', encoding='utf-8') as f: + data = json.load(f) + logger.debug(f"Loaded installed apps data with {len(data)} packages") + return data + + except Exception as e: + logger.warning(f"Failed to load installed apps data, starting fresh: {e}") + return {} + + def _save_installed_data(self, data: Dict[str, Dict[str, Any]]) -> None: + """Save installed packages data to file atomically""" + self._atomic_write(data) + + def add_package(self, package: InstalledPackage) -> None: + """Add a package to the installed list""" + data = self._load_installed_data() + + # Set install date if not provided + if not package.install_date: + package.install_date = datetime.now(timezone.utc).isoformat() + + data[package.name] = asdict(package) + self._save_installed_data(data) + logger.info(f"Added package to tracking: {package.name} ({package.source})") + + def remove_package(self, package_name: str) -> bool: + """Remove a package from the installed list""" + data = self._load_installed_data() + + if package_name in data: + del data[package_name] + self._save_installed_data(data) + logger.info(f"Removed package from tracking: {package_name}") + return True + + logger.warning(f"Package not found in tracking: {package_name}") + return False + + def get_package(self, package_name: str) -> Optional[InstalledPackage]: + """Get information about an installed package""" + data = self._load_installed_data() + + if package_name in data: + pkg_data = data[package_name] + return InstalledPackage(**pkg_data) + + return None + + def get_all_packages(self) -> List[InstalledPackage]: + """Get all installed packages""" + data = self._load_installed_data() + packages = [] + + for pkg_data in data.values(): + packages.append(InstalledPackage(**pkg_data)) + + return packages + + def update_package_info(self, package_name: str, **updates) -> bool: + """Update information for an installed package""" + data = self._load_installed_data() + + if package_name in data: + # Update the package data + data[package_name].update(updates) + + # Update last update check timestamp if we're checking for updates + if 'last_update_check' not in updates: + data[package_name]['last_update_check'] = datetime.now(timezone.utc).isoformat() + + self._save_installed_data(data) + logger.debug(f"Updated package info: {package_name}") + return True + + logger.warning(f"Package not found for update: {package_name}") + return False + + def get_packages_needing_update_check(self, max_age_hours: int = 24) -> List[InstalledPackage]: + """Get packages that need update checking""" + packages = self.get_all_packages() + needing_check = [] + + now = datetime.now(timezone.utc) + + for pkg in packages: + needs_check = True + + if pkg.last_update_check: + try: + last_check = datetime.fromisoformat(pkg.last_update_check.replace('Z', '+00:00')) + hours_since_check = (now - last_check).total_seconds() / 3600 + + if hours_since_check < max_age_hours: + needs_check = False + except ValueError: + # Invalid timestamp, needs check + pass + + if needs_check: + needing_check.append(pkg) + + logger.debug(f"Found {len(needing_check)} packages needing update check") + return needing_check + + def get_packages_with_updates(self) -> List[InstalledPackage]: + """Get packages that have available updates""" + packages = self.get_all_packages() + with_updates = [pkg for pkg in packages if pkg.update_available] + + logger.debug(f"Found {len(with_updates)} packages with available updates") + return with_updates + + def mark_update_available(self, package_name: str, available_version: str) -> bool: + """Mark that an update is available for a package""" + return self.update_package_info( + package_name, + available_version=available_version, + update_available=True, + last_update_check=datetime.now(timezone.utc).isoformat() + ) + + def mark_update_installed(self, package_name: str, new_version: str) -> bool: + """Mark that an update has been installed for a package""" + return self.update_package_info( + package_name, + version=new_version, + available_version=None, + update_available=False, + last_update_check=datetime.now(timezone.utc).isoformat() + ) + + def get_stats(self) -> Dict[str, int]: + """Get statistics about installed packages""" + packages = self.get_all_packages() + with_updates = len([p for p in packages if p.update_available]) + + return { + 'total_packages': len(packages), + 'packages_with_updates': with_updates, + 'packages_up_to_date': len(packages) - with_updates, + } + + def show_installed_packages(self) -> str: + """Get formatted display of installed packages""" + packages = self.get_all_packages() + + if not packages: + return "No packages installed via archpkg." + + lines = [f"Installed packages ({len(packages)}):", ""] + + for pkg in packages: + status = "✓ Up to date" + if pkg.update_available: + status = f"⚠ Update available: {pkg.available_version}" + + lines.append(f" {pkg.name} ({pkg.version or 'unknown'}) - {pkg.source}") + lines.append(f" Status: {status}") + lines.append(f" Installed: {pkg.install_date}") + if pkg.last_update_check: + lines.append(f" Last checked: {pkg.last_update_check}") + lines.append("") + + return "\n".join(lines) + +# Global installed apps manager instance +installed_apps_manager = InstalledAppsManager() + +def add_installed_package(package: InstalledPackage) -> None: + """Add a package to the installed list""" + installed_apps_manager.add_package(package) + +def remove_installed_package(package_name: str) -> bool: + """Remove a package from the installed list""" + return installed_apps_manager.remove_package(package_name) + +def get_installed_package(package_name: str) -> Optional[InstalledPackage]: + """Get information about an installed package""" + return installed_apps_manager.get_package(package_name) + +def get_all_installed_packages() -> List[InstalledPackage]: + """Get all installed packages""" + return installed_apps_manager.get_all_packages() + +def update_package_info(package_name: str, **updates) -> bool: + """Update information for an installed package""" + return installed_apps_manager.update_package_info(package_name, **updates) + +def get_packages_needing_update_check(max_age_hours: int = 24) -> List[InstalledPackage]: + """Get packages that need update checking""" + return installed_apps_manager.get_packages_needing_update_check(max_age_hours) + +def get_packages_with_updates() -> List[InstalledPackage]: + """Get packages that have available updates""" + return installed_apps_manager.get_packages_with_updates() + +def get_installed_stats() -> Dict[str, int]: + """Get statistics about installed packages""" + return installed_apps_manager.get_stats() + +def track_package(package_name: str, source: str, version: Optional[str] = None) -> None: + """Convenience function to track a newly installed package""" + package = InstalledPackage( + name=package_name, + source=source, + version=version or "latest", + install_date=datetime.now(timezone.utc).isoformat(), + install_method="archpkg" + ) + add_installed_package(package) \ No newline at end of file diff --git a/archpkg/security.py b/archpkg/security.py new file mode 100644 index 0000000..6484cc7 --- /dev/null +++ b/archpkg/security.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Security validations for archpkg-helper +""" + +import hashlib +import hmac +import os +from typing import Optional, Dict, Any +from pathlib import Path +from archpkg.logging_config import get_logger + +logger = get_logger(__name__) + +class SecurityValidator: + """Handles security validations for package updates""" + + def __init__(self): + self.supported_hash_algorithms = { + 'md5': hashlib.md5, + 'sha1': hashlib.sha1, + 'sha256': hashlib.sha256, + 'sha512': hashlib.sha512 + } + + def validate_checksum(self, file_path: Path, expected_hash: str, + algorithm: str = 'sha256') -> bool: + """Validate file checksum against expected hash""" + if algorithm not in self.supported_hash_algorithms: + logger.error(f"Unsupported hash algorithm: {algorithm}") + return False + + if not file_path.exists(): + logger.error(f"File does not exist: {file_path}") + return False + + try: + hash_func = self.supported_hash_algorithms[algorithm]() + + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_func.update(chunk) + + calculated_hash = hash_func.hexdigest() + is_valid = calculated_hash.lower() == expected_hash.lower() + + if is_valid: + logger.info(f"Checksum validation passed for {file_path.name}") + else: + logger.error(f"Checksum validation failed for {file_path.name}") + logger.debug(f"Expected: {expected_hash}") + logger.debug(f"Calculated: {calculated_hash}") + + return is_valid + + except Exception as e: + logger.error(f"Error validating checksum for {file_path}: {e}") + return False + + def generate_checksum(self, file_path: Path, algorithm: str = 'sha256') -> Optional[str]: + """Generate checksum for a file""" + if algorithm not in self.supported_hash_algorithms: + logger.error(f"Unsupported hash algorithm: {algorithm}") + return None + + if not file_path.exists(): + logger.error(f"File does not exist: {file_path}") + return None + + try: + hash_func = self.supported_hash_algorithms[algorithm]() + + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_func.update(chunk) + + return hash_func.hexdigest() + + except Exception as e: + logger.error(f"Error generating checksum for {file_path}: {e}") + return None + +class PackageSecurityValidator: + """Validates package security and integrity""" + + def __init__(self): + self.security_validator = SecurityValidator() + self.trusted_keys: Dict[str, str] = {} # Package name -> expected public key + + def validate_package_source(self, package_name: str, source: str) -> Dict[str, Any]: + """Validate that the package source is trusted""" + result = { + "valid": False, + "reason": "", + "warnings": [] + } + + # Basic source validation + trusted_sources = ['pacman', 'aur', 'flatpak', 'snap', 'apt', 'dnf'] + + if source not in trusted_sources: + result["reason"] = f"Unknown package source: {source}" + logger.warning(f"Package {package_name} from unknown source: {source}") + return result + + # Additional validation for AUR (less trusted) + if source == 'aur': + result["warnings"].append( + "AUR packages are user-contributed and may pose security risks" + ) + logger.warning(f"AUR package {package_name} - additional caution advised") + + result["valid"] = True + logger.info(f"Package source validation passed for {package_name} from {source}") + return result + + def validate_download_integrity(self, download_path: Path, + expected_checksums: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """Validate downloaded package integrity""" + result = { + "valid": False, + "checksums_validated": [], + "errors": [] + } + + if not download_path.exists(): + result["errors"].append(f"Download file does not exist: {download_path}") + return result + + # If no expected checksums provided, we can only do basic validation + if not expected_checksums: + logger.warning("No expected checksums provided - limited validation possible") + result["valid"] = True # Allow but warn + result["warnings"] = ["No checksum validation available"] + return result + + # Validate against provided checksums + all_valid = True + for algorithm, expected_hash in expected_checksums.items(): + is_valid = self.security_validator.validate_checksum( + download_path, expected_hash, algorithm + ) + + result["checksums_validated"].append({ + "algorithm": algorithm, + "valid": is_valid + }) + + if not is_valid: + all_valid = False + result["errors"].append(f"{algorithm} checksum validation failed") + + result["valid"] = all_valid + + if all_valid: + logger.info(f"Download integrity validation passed for {download_path.name}") + else: + logger.error(f"Download integrity validation failed for {download_path.name}") + + return result + + def validate_installation_safety(self, package_name: str, install_command: str) -> Dict[str, Any]: + """Validate that the installation command is safe to execute""" + result = { + "safe": False, + "warnings": [], + "blocked": False, + "reason": "" + } + + # Dangerous command patterns to block + dangerous_patterns = [ + 'rm -rf /', + 'rm -rf /*', + 'dd if=', + 'mkfs', + 'fdisk', + 'format', + 'wget.*|.*curl.*|.*bash', + 'chmod.*777', + 'chown.*root', + 'sudo.*su', + 'passwd', + 'shadow', + 'sudoers' + ] + + command_lower = install_command.lower() + + for pattern in dangerous_patterns: + if pattern in command_lower: + result["blocked"] = True + result["reason"] = f"Command contains dangerous pattern: {pattern}" + logger.error(f"Blocked dangerous install command for {package_name}: {pattern}") + return result + + # Check for sudo usage (warn but allow) + if 'sudo' in command_lower: + result["warnings"].append("Command uses sudo - ensure you have appropriate permissions") + + # Check for network downloads in install commands (warn) + if 'wget' in command_lower or 'curl' in command_lower: + result["warnings"].append("Command downloads from network - verify source trustworthiness") + + result["safe"] = True + logger.info(f"Installation safety validation passed for {package_name}") + return result + +class UpdateSecurityManager: + """Manages security for the update process""" + + def __init__(self): + self.package_validator = PackageSecurityValidator() + + def pre_update_validation(self, package_name: str, source: str, + install_command: str) -> Dict[str, Any]: + """Perform all security validations before allowing an update""" + validation_result = { + "approved": False, + "source_valid": False, + "command_safe": False, + "warnings": [], + "errors": [] + } + + # Validate package source + source_validation = self.package_validator.validate_package_source(package_name, source) + validation_result["source_valid"] = source_validation["valid"] + validation_result["warnings"].extend(source_validation.get("warnings", [])) + + if not source_validation["valid"]: + validation_result["errors"].append(f"Source validation failed: {source_validation['reason']}") + return validation_result + + # Validate installation command safety + command_validation = self.package_validator.validate_installation_safety(package_name, install_command) + validation_result["command_safe"] = command_validation["safe"] + validation_result["warnings"].extend(command_validation.get("warnings", [])) + + if command_validation["blocked"]: + validation_result["errors"].append(f"Command blocked: {command_validation['reason']}") + return validation_result + + # All validations passed + validation_result["approved"] = True + logger.info(f"Pre-update security validation passed for {package_name}") + + return validation_result + + def validate_downloaded_package(self, package_name: str, download_path: Path, + expected_checksums: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """Validate a downloaded package before installation""" + return self.package_validator.validate_download_integrity(download_path, expected_checksums) + +# Global security manager instance +security_manager = UpdateSecurityManager() + +def validate_update_security(package_name: str, source: str, install_command: str) -> Dict[str, Any]: + """Validate security for a package update""" + return security_manager.pre_update_validation(package_name, source, install_command) + +def validate_download_integrity(download_path: Path, expected_checksums: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """Validate download integrity""" + return security_manager.validate_downloaded_package("unknown", download_path, expected_checksums) \ No newline at end of file diff --git a/archpkg/templates/home.html b/archpkg/templates/home.html new file mode 100644 index 0000000..ff4d525 --- /dev/null +++ b/archpkg/templates/home.html @@ -0,0 +1,332 @@ + + +
+ + ++ A powerful cross-distro command-line utility that simplifies package discovery and installation + across Arch Linux, Ubuntu, Fedora, and more. No more guessing package names or commands! +
+ + Try Package Search + + + View on GitHub + ++ ArchPkg Helper is your universal companion for package management on Linux. + Whether you're using Arch Linux, Ubuntu, Fedora, or any other distribution, + our tool helps you find and install software with ease. +
++ Search across multiple package managers simultaneously. + Find packages from pacman, AUR, apt, dnf, flatpak, and snap in one place. +
++ Get ready-to-use installation commands tailored to your distribution. + No more manual command crafting or syntax errors. +
++ Works seamlessly across all major Linux distributions. + Automatically detects your system and provides appropriate commands. +
+Enter a package name or keyword in our search interface
+Browse results from multiple package sources with descriptions
+Copy and run the generated command for your distribution
++ Comprehensive support for all major Linux package management systems +
++ No more searching through multiple sources or remembering complex commands. + Get everything you need in seconds. +
++ Perfect for both Linux newcomers and experienced users. + Clear interface and helpful guidance. +
++ Uses official package repositories and APIs. + Always up-to-date with the latest package information. +
++ Free and open source software. Contribute to the project + and help make Linux package management even better. +
++ Search for packages across multiple Linux distributions and package managers +
+Searching packages...
+
+
+