diff --git a/README.md b/README.md index 2172ddc..d69aeaa 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Universal library for AI code execution sandboxes. `sandboxes` provides a unified interface for sandboxed code execution across multiple providers: -- **Current providers**: E2B, Modal, Daytona +- **Current providers**: E2B, Modal, Daytona, Hopx - **Experimental**: Cloudflare (requires self-hosted Worker deployment) Write your code once and switch between providers with a single line change, or let the library automatically select a provider. @@ -351,6 +351,7 @@ The library automatically detects available providers from environment variables export E2B_API_KEY="..." export MODAL_TOKEN_ID="..." # Or use `modal token set` export DAYTONA_API_KEY="..." +export HOPX_API_KEY="hopx_live_." export CLOUDFLARE_SANDBOX_BASE_URL="https://your-worker.workers.dev" export CLOUDFLARE_API_TOKEN="..." ``` @@ -372,8 +373,9 @@ When you call `Sandbox.create()` or `run()`, the library checks for providers in 1. **Daytona** - Looks for `DAYTONA_API_KEY` 2. **E2B** - Looks for `E2B_API_KEY` -3. **Modal** - Looks for `~/.modal.toml` or `MODAL_TOKEN_ID` -4. **Cloudflare** *(experimental)* - Looks for `CLOUDFLARE_SANDBOX_BASE_URL` + `CLOUDFLARE_API_TOKEN` +3. **Hopx** - Looks for `HOPX_API_KEY` +4. **Modal** - Looks for `~/.modal.toml` or `MODAL_TOKEN_ID` +5. **Cloudflare** *(experimental)* - Looks for `CLOUDFLARE_SANDBOX_BASE_URL` + `CLOUDFLARE_API_TOKEN` **The first provider with valid credentials becomes the default.** Cloudflare requires deploying your own Worker. @@ -412,11 +414,12 @@ from sandboxes import Sandbox # Configure providers programmatically Sandbox.configure( e2b_api_key="your-key", + hopx_api_key="hopx_live_.", cloudflare_config={ "base_url": "https://your-worker.workers.dev", "api_token": "your-token", }, - default_provider="e2b" + default_provider="hopx" ) ``` @@ -429,6 +432,7 @@ from sandboxes.providers import ( E2BProvider, ModalProvider, DaytonaProvider, + HopxProvider, CloudflareProvider, ) @@ -441,6 +445,9 @@ provider = ModalProvider() # Daytona - Uses DAYTONA_API_KEY env var provider = DaytonaProvider() +# Hopx - Uses HOPX_API_KEY env var +provider = HopxProvider() + # Cloudflare - Requires base_url and token provider = CloudflareProvider( base_url="https://your-worker.workers.dev", @@ -452,6 +459,7 @@ Each provider requires appropriate authentication: - **E2B**: Set `E2B_API_KEY` environment variable - **Modal**: Run `modal token set` to configure - **Daytona**: Set `DAYTONA_API_KEY` environment variable +- **Hopx**: Set `HOPX_API_KEY` environment variable (format: `hopx_live_.`) - **Cloudflare** *(experimental)*: Deploy the [Cloudflare sandbox Worker](https://github.com/cloudflare/sandbox-sdk) and set `CLOUDFLARE_SANDBOX_BASE_URL`, `CLOUDFLARE_API_TOKEN`, and (optionally) `CLOUDFLARE_ACCOUNT_ID` > **Cloudflare setup tips (experimental)** @@ -479,6 +487,7 @@ async def main(): manager.register_provider("e2b", E2BProvider, {}) manager.register_provider("modal", ModalProvider, {}) manager.register_provider("daytona", DaytonaProvider, {}) + manager.register_provider("hopx", HopxProvider, {}) manager.register_provider( "cloudflare", CloudflareProvider, diff --git a/docs/hopx-api-reference.md b/docs/hopx-api-reference.md new file mode 100644 index 0000000..5d950cc --- /dev/null +++ b/docs/hopx-api-reference.md @@ -0,0 +1,119 @@ +# Hopx API Reference + +This document contains the full API reference for implementing the Hopx provider. + +## Authentication + +Hopx uses **API keys** for request authentication. Keys follow the format `hopx_live_.` and are obtained from the dashboard. + +**Supported methods:** +- `X-API-Key` header (recommended) +- `Authorization: Bearer` header +- Environment variable (`HOPX_API_KEY`) + +Keys should never be hardcoded; use environment variables or secrets managers instead. + +## API Structure + +The platform provides two main API sections: + +**Lifecycle API** (`/v1/sandboxes`, `/v1/templates`): Manage sandbox creation, deletion, listing, and state transitions (start, stop, pause, resume). + +**VM Agent API** (`https://{sandbox_id}.hopx.dev`): Interact with running sandboxes for code execution, file operations, and system management. + +## Core Endpoints + +### Sandbox Management +- `POST /v1/sandboxes` - Create sandbox from template +- `GET /v1/sandboxes` - List all sandboxes (with filtering) +- `GET /v1/sandboxes/{id}` - Get sandbox details +- `DELETE /v1/sandboxes/{id}` - Delete sandbox +- `POST /v1/sandboxes/{id}/{action}` - Control operations (start, stop, pause, resume) + +### Template Operations +- `GET /v1/templates` - List templates +- `GET /v1/templates/{id}` - Get template details +- `POST /v1/templates/build` - Create custom template +- `DELETE /v1/templates/{id}` - Delete template + +### Code Execution +- `POST {sandbox_host}/execute` - Execute code +- `POST {sandbox_host}/execute/rich` - Execute with rich outputs (plots, DataFrames) +- `POST {sandbox_host}/commands/run` - Run shell commands +- `GET {sandbox_host}/execute/processes` - List processes +- `POST {sandbox_host}/execute/kill/{id}` - Terminate process + +### File Operations +- `GET /files/read` - Read file content +- `POST /files/write` - Create/update file +- `GET /files/list` - List directory contents +- `GET /files/download` - Download file +- `POST /files/upload` - Upload file (multipart/form-data) + +### Additional Features +- **Environment Variables**: GET, PUT, PATCH, DELETE operations on `/env` +- **Metrics**: `GET /metrics/snapshot` and health checks +- **Cache Management**: Get stats and clear cache +- **Desktop Automation**: VNC access, screenshots, mouse/keyboard control +- **WebSocket Support**: Real-time streaming for code execution, terminal, and file watching + +## Request/Response Format + +**Headers:** +``` +Content-Type: application/json +X-API-Key: your_api_key_here +``` + +**Success responses** return JSON with resource data; **error responses** include `error`, `code`, and optional `message` fields. + +## Supported Languages + +- Python +- JavaScript/Node.js +- Bash +- Go + +## Rate Limiting + +Rate limits vary by organization. Limits are communicated via headers: +- `X-RateLimit-Limit` +- `X-RateLimit-Remaining` +- `X-RateLimit-Reset` + +Template building is limited to 10 builds/hour and 50 builds/day by default. + +## Special Features + +**Memory Snapshots**: Templates use memory snapshots for sub-100ms boot times. + +**Sandbox States**: running, stopped, paused, creating. + +**Rich Output Support**: Captures plots, DataFrames, and other formatted outputs. + +**Real-time Streaming**: WebSocket endpoints enable live code execution and file system monitoring. + +**Environment Isolation**: Sandboxes support custom resource allocation and internet access control. + +## Implementation Notes for Provider + +### Base URL +The main API base URL should be configurable, likely: `https://api.hopx.dev` + +### Two-Level API Access +1. **Control Plane**: `https://api.hopx.dev/v1/*` - Lifecycle management +2. **Data Plane**: `https://{sandbox_id}.hopx.dev/*` - Code execution and file operations + +### Key Differences from Other Providers +- Uses HTTP REST API (like Cloudflare provider) +- Requires template selection for sandbox creation +- Supports multiple sandbox states (running, stopped, paused) +- Has separate endpoints for lifecycle vs execution +- Supports rich output formats (plots, DataFrames) + +### Recommended Implementation Approach +1. Use `aiohttp` for async HTTP requests (consistent with Cloudflare provider) +2. Store base URL and API key in config +3. Track sandbox state transitions (creating → running → stopped) +4. Implement streaming execution using WebSocket or SSE +5. Support template-based creation with default template fallback diff --git a/pyproject.toml b/pyproject.toml index 2a66f43..eae3480 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "modal>=1.1.4", "e2b>=2.0.0", "daytona>=0.103.0", + "hopx-ai>=0.1.19", "httpx>=0.27.0", ] @@ -42,6 +43,9 @@ e2b = [ modal = [ "modal==1.1.4", # Latest stable version ] +hopx = [ + "hopx-ai>=0.1.19", # Official Hopx SDK for secure cloud sandboxes +] # vercel = [ # "vercel-sdk>=0.1.0", # When available # ] @@ -52,6 +56,7 @@ all = [ "daytona==0.103.0", "e2b>=2.0.0", "modal==1.1.4", + "hopx-ai>=0.1.19", ] dev = [ "pytest>=7.4.0", diff --git a/sandboxes/providers/__init__.py b/sandboxes/providers/__init__.py index 6f2c05b..42c8594 100644 --- a/sandboxes/providers/__init__.py +++ b/sandboxes/providers/__init__.py @@ -33,6 +33,13 @@ except ImportError: pass +try: + from .hopx import HopxProvider + + _providers["hopx"] = HopxProvider +except ImportError: + pass + try: from .vercel import VercelProvider diff --git a/sandboxes/providers/hopx.py b/sandboxes/providers/hopx.py new file mode 100644 index 0000000..eb8f52b --- /dev/null +++ b/sandboxes/providers/hopx.py @@ -0,0 +1,668 @@ +"""Hopx sandbox provider using the official hopx-ai SDK.""" + +import asyncio +import logging +import os +import time +from collections.abc import AsyncIterator +from datetime import datetime +from typing import Any + +from ..base import ExecutionResult, Sandbox, SandboxConfig, SandboxProvider, SandboxState +from ..exceptions import ProviderError, SandboxError, SandboxNotFoundError +from ..security import validate_download_path, validate_upload_path + +logger = logging.getLogger(__name__) + +try: + from hopx_ai import AsyncSandbox as HopxSandbox + + HOPX_AVAILABLE = True +except ImportError: + HOPX_AVAILABLE = False + HopxSandbox = None + logger.warning("Hopx SDK not available - install with: pip install hopx-ai") + + +class HopxProvider(SandboxProvider): + """Hopx sandbox provider using the official hopx-ai SDK.""" + + def __init__(self, api_key: str | None = None, **config): + """ + Initialize Hopx provider. + + Args: + api_key: Hopx API key. If not provided, reads from HOPX_API_KEY environment variable. + **config: Additional configuration options + """ + super().__init__(**config) + + if not HOPX_AVAILABLE: + raise ProviderError("Hopx SDK not installed") + + self.api_key = api_key or os.getenv("HOPX_API_KEY") + if not self.api_key: + raise ProviderError("Hopx API key not provided") + + # Configuration + self.default_template = config.get("template", "code-interpreter") + self.timeout = config.get("timeout", 300) + self.base_url = config.get("base_url", "https://api.hopx.dev") + + # Track active sandboxes with metadata (like E2B pattern) + self._sandboxes: dict[str, dict[str, Any]] = {} + + # Lock for thread-safe operations + self._lock = asyncio.Lock() + + @property + def name(self) -> str: + """Provider name.""" + return "hopx" + + def _to_sandbox(self, hopx_sandbox, metadata: dict[str, Any]) -> Sandbox: + """Convert Hopx SDK sandbox to standard Sandbox.""" + return Sandbox( + id=hopx_sandbox.sandbox_id, + provider=self.name, + state=SandboxState.RUNNING, # Hopx sandboxes are running when created + labels=metadata.get("labels", {}), + created_at=metadata.get("created_at", datetime.now()), + metadata={ + "template": metadata.get("template", self.default_template), + "last_accessed": metadata.get("last_accessed", time.time()), + "public_host": metadata.get("public_host", ""), + }, + ) + + async def create_sandbox(self, config: SandboxConfig) -> Sandbox: + """Create a new sandbox using Hopx SDK.""" + try: + # Get template from config.image or provider_config, default to code-interpreter + template = ( + config.image + or (config.provider_config.get("template") if config.provider_config else None) + or self.default_template + ) + + # Get timeout configuration + timeout_seconds = config.timeout_seconds or self.timeout + + # Create sandbox using SDK + hopx_sandbox = await HopxSandbox.create( + template=template, + env_vars=config.env_vars, + timeout_seconds=timeout_seconds, + api_key=self.api_key, + base_url=self.base_url, + ) + + # Get sandbox info to retrieve public host + info = await hopx_sandbox.get_info() + + # Store metadata locally (following E2B pattern) + metadata = { + "hopx_sandbox": hopx_sandbox, + "labels": config.labels or {}, + "created_at": datetime.now(), + "last_accessed": time.time(), + "template": template, + "public_host": info.public_host, + "config": config, + } + + async with self._lock: + self._sandboxes[hopx_sandbox.sandbox_id] = metadata + + logger.info(f"Created Hopx sandbox {hopx_sandbox.sandbox_id} with template {template}") + + # Run setup commands if provided + if config.setup_commands: + for cmd in config.setup_commands: + await self.execute_command(hopx_sandbox.sandbox_id, cmd) + + return self._to_sandbox(hopx_sandbox, metadata) + + except Exception as e: + logger.error(f"Failed to create Hopx sandbox: {e}") + raise SandboxError(f"Failed to create sandbox: {e}") from e + + async def get_sandbox(self, sandbox_id: str) -> Sandbox | None: + """Get sandbox by ID.""" + if sandbox_id in self._sandboxes: + metadata = self._sandboxes[sandbox_id] + metadata["last_accessed"] = time.time() + return self._to_sandbox(metadata["hopx_sandbox"], metadata) + return None + + async def list_sandboxes(self, labels: dict[str, str] | None = None) -> list[Sandbox]: + """List active sandboxes, optionally filtered by labels.""" + sandboxes = [] + + # Try to get sandboxes from Hopx API + try: + # Use SDK to list sandboxes + hopx_sandboxes = await HopxSandbox.list(api_key=self.api_key, base_url=self.base_url) + + for hopx_sandbox in hopx_sandboxes: + # Check if we have it in local tracking + if hopx_sandbox.sandbox_id in self._sandboxes: + metadata = self._sandboxes[hopx_sandbox.sandbox_id] + else: + # Add untracked sandbox from API + info = await hopx_sandbox.get_info() + metadata = { + "hopx_sandbox": hopx_sandbox, + "labels": {}, + "created_at": info.created_at or datetime.now(), + "last_accessed": time.time(), + "template": info.template_name or self.default_template, + "public_host": info.public_host, + } + + # Filter by labels if provided + if labels: + sandbox_labels = metadata.get("labels", {}) + if not all(sandbox_labels.get(k) == v for k, v in labels.items()): + continue + + sandboxes.append(self._to_sandbox(hopx_sandbox, metadata)) + + except Exception as e: + logger.warning(f"Could not list Hopx sandboxes from API: {e}") + # Fallback to local tracking only + for _sandbox_id, metadata in self._sandboxes.items(): + if labels: + sandbox_labels = metadata.get("labels", {}) + if not all(sandbox_labels.get(k) == v for k, v in labels.items()): + continue + sandboxes.append(self._to_sandbox(metadata["hopx_sandbox"], metadata)) + + return sandboxes + + async def find_sandbox(self, labels: dict[str, str]) -> Sandbox | None: + """Find a running sandbox with matching labels for reuse.""" + sandboxes = await self.list_sandboxes(labels=labels) + if sandboxes: + # Return most recently accessed + sandboxes.sort( + key=lambda s: self._sandboxes.get(s.id, {}).get("last_accessed", 0), reverse=True + ) + logger.info(f"Found existing sandbox {sandboxes[0].id} with labels {labels}") + return sandboxes[0] + return None + + async def execute_command( + self, + sandbox_id: str, + command: str, + timeout: int | None = None, + env_vars: dict[str, str] | None = None, + ) -> ExecutionResult: + """Execute shell command in the sandbox.""" + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + metadata["last_accessed"] = time.time() + + start_time = time.time() + + # Execute command using SDK + result = await hopx_sandbox.commands.run( + command=command, + timeout_seconds=timeout or self.timeout, + env=env_vars, + ) + + duration_ms = int((time.time() - start_time) * 1000) + + return ExecutionResult( + exit_code=result.exit_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + truncated=False, + timed_out=False, + ) + + except Exception as e: + logger.error(f"Failed to execute command in sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to execute command: {e}") from e + + async def run_code( + self, + sandbox_id: str, + code: str, + language: str = "python", + timeout: int | None = None, + env_vars: dict[str, str] | None = None, + ) -> dict[str, Any]: + """ + Execute code with rich output capture (plots, DataFrames, etc.). + + This method captures rich outputs like matplotlib plots, pandas DataFrames, + and other visualizations automatically. + + Args: + sandbox_id: Sandbox ID + code: Code to execute + language: Language (python, javascript, bash, go) + timeout: Execution timeout in seconds + env_vars: Optional environment variables + + Returns: + Dictionary with: + - success: bool + - stdout: str + - stderr: str + - exit_code: int + - execution_time: float + - rich_outputs: list of rich output objects (plots, dataframes, etc.) + + Example: + >>> result = await provider.run_code( + ... sandbox_id="sb-123", + ... code="import matplotlib.pyplot as plt\\nplt.plot([1,2,3])", + ... language="python" + ... ) + >>> print(result['rich_outputs']) # Contains plot data + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + metadata["last_accessed"] = time.time() + + # Execute code with rich output capture using SDK + result = await hopx_sandbox.run_code( + code=code, + language=language, + timeout_seconds=timeout or self.timeout, + env=env_vars, + ) + + # Convert SDK ExecutionResult to dict with rich outputs + return { + "success": result.success, + "stdout": result.stdout, + "stderr": result.stderr, + "exit_code": result.exit_code, + "execution_time": result.execution_time or 0.0, + "rich_outputs": [ + { + "type": output.type, + "data": output.data, + "metadata": output.metadata, + } + for output in (result.rich_outputs or []) + ], + } + + except Exception as e: + logger.error(f"Failed to execute code in sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to execute code: {e}") from e + + async def stream_execution( + self, + sandbox_id: str, + command: str, + timeout: int | None = None, + env_vars: dict[str, str] | None = None, + ) -> AsyncIterator[str]: + """ + Stream execution output in real-time using WebSocket. + + Falls back to simulated streaming if WebSocket is not available. + + Args: + sandbox_id: Sandbox ID + command: Command to execute + timeout: Execution timeout in seconds + env_vars: Optional environment variables + + Yields: + Output chunks as they are produced + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + metadata["last_accessed"] = time.time() + + # Try to use SDK's streaming if available + if hasattr(hopx_sandbox, "run_code_stream"): + # Use real WebSocket streaming from SDK + async for chunk in hopx_sandbox.run_code_stream( + code=command, + language="bash", + timeout_seconds=timeout or self.timeout, + ): + yield chunk + else: + # Fallback to simulated streaming + result = await self.execute_command(sandbox_id, command, timeout, env_vars) + + # Yield output in chunks to simulate streaming + chunk_size = 256 + output = result.stdout + + for i in range(0, len(output), chunk_size): + yield output[i : i + chunk_size] + await asyncio.sleep(0.01) # Small delay to simulate streaming + + if result.stderr: + yield f"\n[Error]: {result.stderr}" + + except Exception as e: + logger.error(f"Failed to stream execution in sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to stream execution: {e}") from e + + async def upload_file( + self, sandbox_id: str, local_path: str, remote_path: str, binary: bool = False + ) -> bool: + """ + Upload a file to the sandbox with security validation. + + Supports both text and binary files. + + Args: + sandbox_id: Sandbox ID + local_path: Path to local file + remote_path: Destination path in sandbox + binary: If True, upload as binary file (for images, PDFs, etc.) + + Returns: + True if successful + + Example: + >>> # Upload text file + >>> await provider.upload_file("sb-123", "/path/to/script.py", "/workspace/script.py") + >>> # Upload binary file (image, PDF, etc.) + >>> await provider.upload_file("sb-123", "/path/to/plot.png", "/workspace/plot.png", binary=True) + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + # Validate local path to prevent path traversal attacks + validated_path = validate_upload_path(local_path) + + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + + # Read local file content from validated path + if binary: # noqa: SIM108 + # For binary files (images, PDFs, etc.) + content = validated_path.read_bytes() + else: + # For text files + content = validated_path.read_text() + + # Write to sandbox filesystem using SDK + await hopx_sandbox.files.write(path=remote_path, content=content) + + logger.info( + f"Uploaded {validated_path} to {remote_path} in sandbox {sandbox_id} " + f"(binary={binary})" + ) + metadata["last_accessed"] = time.time() + return True + + except Exception as e: + logger.error(f"Failed to upload file to sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to upload file: {e}") from e + + async def download_file( + self, sandbox_id: str, remote_path: str, local_path: str, binary: bool = False + ) -> bool: + """ + Download a file from the sandbox with security validation. + + Supports both text and binary files. + + Args: + sandbox_id: Sandbox ID + remote_path: Path to file in sandbox + local_path: Destination path on local filesystem + binary: If True, download as binary file (for images, PDFs, etc.) + + Returns: + True if successful + + Example: + >>> # Download text file + >>> await provider.download_file("sb-123", "/workspace/output.txt", "/local/output.txt") + >>> # Download binary file (image, PDF, etc.) + >>> await provider.download_file("sb-123", "/workspace/plot.png", "/local/plot.png", binary=True) + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + # Validate local path to prevent path traversal attacks + validated_path = validate_download_path(local_path) + + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + + # Read from sandbox filesystem using SDK + content = await hopx_sandbox.files.read(path=remote_path) + + # Write to local file at validated path + if binary: + # For binary files - SDK returns bytes + if isinstance(content, str): + # If SDK returned string, encode it + validated_path.write_bytes(content.encode("latin1")) + else: + validated_path.write_bytes(content) + else: + # For text files + if isinstance(content, bytes): + validated_path.write_text(content.decode("utf-8")) + else: + validated_path.write_text(content) + + logger.info( + f"Downloaded {remote_path} from sandbox {sandbox_id} to {validated_path} " + f"(binary={binary})" + ) + metadata["last_accessed"] = time.time() + return True + + except Exception as e: + logger.error(f"Failed to download file from sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to download file: {e}") from e + + async def destroy_sandbox(self, sandbox_id: str) -> bool: + """Destroy a sandbox.""" + try: + # Check if we have it in local tracking + if sandbox_id in self._sandboxes: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + else: + # Try to connect to it via API + hopx_sandbox = await HopxSandbox.connect( + sandbox_id, api_key=self.api_key, base_url=self.base_url + ) + + # Kill sandbox using SDK + await hopx_sandbox.kill() + + # Remove from tracking if present + if sandbox_id in self._sandboxes: + async with self._lock: + del self._sandboxes[sandbox_id] + + logger.info(f"Destroyed Hopx sandbox {sandbox_id}") + return True + + except Exception as e: + logger.error(f"Failed to destroy sandbox {sandbox_id}: {e}") + raise SandboxError(f"Failed to destroy sandbox: {e}") from e + + async def execute_commands( + self, + sandbox_id: str, + commands: list[str], + stop_on_error: bool = True, + timeout: int | None = None, + env_vars: dict[str, str] | None = None, + ) -> list[ExecutionResult]: + """Execute multiple commands in sequence.""" + results = [] + + for command in commands: + result = await self.execute_command(sandbox_id, command, timeout, env_vars) + results.append(result) + + if stop_on_error and not result.success: + logger.warning(f"Command failed, stopping sequence: {command}") + break + + return results + + async def get_or_create_sandbox(self, config: SandboxConfig) -> Sandbox: + """Get existing sandbox with matching labels or create new one.""" + # Try to find existing sandbox if labels provided + if config.labels: + existing = await self.find_sandbox(config.labels) + if existing: + return existing + + # Create new sandbox + return await self.create_sandbox(config) + + async def health_check(self) -> bool: + """Check if Hopx service is accessible.""" + try: + # Try to list sandboxes as a simple health check + await HopxSandbox.list(api_key=self.api_key, base_url=self.base_url) + return True + except Exception as e: + logger.error(f"Hopx health check failed: {e}") + return False + + async def cleanup_idle_sandboxes(self, idle_timeout: int = 600): + """Clean up sandboxes that have been idle.""" + current_time = time.time() + to_destroy = [] + + for sandbox_id, metadata in self._sandboxes.items(): + last_accessed = metadata.get("last_accessed", current_time) + if current_time - last_accessed > idle_timeout: + to_destroy.append(sandbox_id) + + for sandbox_id in to_destroy: + logger.info(f"Cleaning up idle sandbox {sandbox_id}") + await self.destroy_sandbox(sandbox_id) + + async def get_desktop_vnc_url(self, sandbox_id: str) -> str | None: + """ + Get VNC URL for desktop automation (if available). + + Desktop automation requires sandboxes created with desktop-enabled templates. + This feature allows GUI application testing, browser automation, and visual interactions. + + Args: + sandbox_id: Sandbox ID + + Returns: + VNC URL string if desktop is available, None otherwise + + Example: + >>> # Create sandbox with desktop support + >>> config = SandboxConfig(provider_config={"template": "desktop"}) + >>> sandbox = await provider.create_sandbox(config) + >>> + >>> # Get VNC URL + >>> vnc_url = await provider.get_desktop_vnc_url(sandbox.id) + >>> if vnc_url: + ... print(f"Connect to desktop at: {vnc_url}") + + Note: + Desktop automation is an advanced feature requiring specific templates. + Not all templates support desktop/VNC functionality. + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + + # Check if SDK supports desktop (may not be in all versions) + if hasattr(hopx_sandbox, "desktop"): + # Try to start VNC and get URL + vnc_info = await hopx_sandbox.desktop.start_vnc() + return vnc_info.url if hasattr(vnc_info, "url") else None + else: + logger.warning( + f"Desktop automation not available for sandbox {sandbox_id}. " + "Requires desktop-enabled template and SDK support." + ) + return None + + except Exception as e: + logger.error(f"Failed to get VNC URL for sandbox {sandbox_id}: {e}") + # Don't raise, just return None - desktop might not be available + return None + + async def screenshot(self, sandbox_id: str, output_path: str | None = None) -> bytes | None: + """ + Capture screenshot from sandbox desktop (if available). + + Requires sandbox with desktop support. + + Args: + sandbox_id: Sandbox ID + output_path: Optional local path to save screenshot PNG + + Returns: + PNG image bytes if successful, None if desktop not available + + Example: + >>> # Capture and save screenshot + >>> img_bytes = await provider.screenshot("sb-123", "/local/screenshot.png") + >>> if img_bytes: + ... print(f"Screenshot saved: {len(img_bytes)} bytes") + """ + if sandbox_id not in self._sandboxes: + raise SandboxNotFoundError(f"Sandbox {sandbox_id} not found") + + try: + metadata = self._sandboxes[sandbox_id] + hopx_sandbox = metadata["hopx_sandbox"] + + # Check if SDK supports desktop + if not hasattr(hopx_sandbox, "desktop"): + logger.warning("Screenshot not available - desktop support not enabled") + return None + + # Capture screenshot + img_bytes = await hopx_sandbox.desktop.screenshot() + + # Optionally save to file + if output_path and img_bytes: + validated_path = validate_download_path(output_path) + validated_path.write_bytes(img_bytes) + logger.info(f"Screenshot saved to {validated_path}") + + return img_bytes + + except Exception as e: + logger.error(f"Failed to capture screenshot for sandbox {sandbox_id}: {e}") + return None + + def __del__(self): + """Cleanup on deletion.""" + # Any cleanup needed when provider is destroyed + pass diff --git a/sandboxes/sandbox.py b/sandboxes/sandbox.py index 8b3733d..41cd68f 100644 --- a/sandboxes/sandbox.py +++ b/sandboxes/sandbox.py @@ -69,13 +69,20 @@ def _auto_configure(cls) -> None: Providers are registered in priority order: 1. Daytona 2. E2B - 3. Modal - 4. Cloudflare (experimental) + 3. Hopx + 4. Modal + 5. Cloudflare (experimental) The first registered provider becomes the default unless explicitly set. Users can override with Sandbox.configure(default_provider="..."). """ - from .providers import CloudflareProvider, DaytonaProvider, E2BProvider, ModalProvider + from .providers import ( + CloudflareProvider, + DaytonaProvider, + E2BProvider, + HopxProvider, + ModalProvider, + ) manager = cls._manager @@ -95,7 +102,15 @@ def _auto_configure(cls) -> None: except Exception: pass - # Try to register Modal (priority 3) + # Try to register Hopx (priority 3) + if os.getenv("HOPX_API_KEY"): + try: + manager.register_provider("hopx", HopxProvider, {}) + print("✓ Registered Hopx provider") + except Exception: + pass + + # Try to register Modal (priority 4) if os.path.exists(os.path.expanduser("~/.modal.toml")) or os.getenv("MODAL_TOKEN_ID"): try: manager.register_provider("modal", ModalProvider, {}) @@ -103,7 +118,7 @@ def _auto_configure(cls) -> None: except Exception: pass - # Try to register Cloudflare (priority 4 - experimental) + # Try to register Cloudflare (priority 5 - experimental) base_url = os.getenv("CLOUDFLARE_SANDBOX_BASE_URL") api_token = os.getenv("CLOUDFLARE_API_TOKEN") if base_url and api_token: @@ -128,6 +143,7 @@ def configure( e2b_api_key: str | None = None, modal_token: str | None = None, daytona_api_key: str | None = None, + hopx_api_key: str | None = None, cloudflare_config: dict[str, str] | None = None, default_provider: str | None = None, ) -> None: @@ -137,10 +153,17 @@ def configure( Example: Sandbox.configure( e2b_api_key="...", - default_provider="e2b" + hopx_api_key="...", + default_provider="hopx" ) """ - from .providers import CloudflareProvider, DaytonaProvider, E2BProvider, ModalProvider + from .providers import ( + CloudflareProvider, + DaytonaProvider, + E2BProvider, + HopxProvider, + ModalProvider, + ) manager = cls._ensure_manager() @@ -154,6 +177,9 @@ def configure( if daytona_api_key: manager.register_provider("daytona", DaytonaProvider, {"api_key": daytona_api_key}) + if hopx_api_key: + manager.register_provider("hopx", HopxProvider, {"api_key": hopx_api_key}) + if cloudflare_config: manager.register_provider("cloudflare", CloudflareProvider, cloudflare_config) diff --git a/scripts/benchmark_hopx.py b/scripts/benchmark_hopx.py new file mode 100755 index 0000000..b4c1ec4 --- /dev/null +++ b/scripts/benchmark_hopx.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +"""Benchmark script for Hopx provider performance testing.""" + +import asyncio +import os +import tempfile +import time +from pathlib import Path + +from sandboxes.base import SandboxConfig +from sandboxes.providers.hopx import HopxProvider + + +async def benchmark_hopx(): + """Run comprehensive benchmarks on Hopx provider.""" + api_key = os.getenv("HOPX_API_KEY") + + if not api_key: + print("❌ HOPX_API_KEY not set") + return + + print("=" * 80) + print("HOPX PROVIDER BENCHMARK") + print("=" * 80) + print() + + provider = HopxProvider(api_key=api_key) + + # Test 1: Health Check + print("📡 Test 1: Health Check") + start = time.time() + healthy = await provider.health_check() + duration = time.time() - start + print(f" Result: {'✅ PASS' if healthy else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + print() + + if not healthy: + print("❌ Health check failed, aborting benchmark") + return + + # Test 2: Sandbox Creation + print("🚀 Test 2: Sandbox Creation (template: base)") + config = SandboxConfig( + labels={"benchmark": "hopx", "test": "performance"}, provider_config={"template": "base"} + ) + + start = time.time() + sandbox = await provider.create_sandbox(config) + creation_time = time.time() - start + + print(f" Sandbox ID: {sandbox.id}") + print(f" State: {sandbox.state}") + print(f" Duration: {creation_time:.3f}s") + print() + + # Debug: Check sandbox metadata + print("🔍 Debug: Sandbox Metadata") + print(f" Auth Token: {sandbox.metadata.get('auth_token', 'NOT FOUND')[:50]}...") + print(f" Public Host: {sandbox.metadata.get('public_host')}") + print() + + # Wait for VM agent to be ready (memory snapshot boot can take time) + print("⏳ Waiting for VM agent to be ready (10s)...") + await asyncio.sleep(10) + print(" Ready!") + print() + + try: + # Test 3: Simple Command Execution + print("⚡ Test 3: Simple Command Execution") + commands = [ + ("echo 'Hello Hopx'", "Echo test"), + ("python3 --version", "Python version"), + ("node --version", "Node version"), + ("go version", "Go version"), + ] + + for cmd, desc in commands: + start = time.time() + result = await provider.execute_command(sandbox.id, cmd) + duration = time.time() - start + + status = "✅" if result.success else "❌" + print(f" {status} {desc}: {duration:.3f}s") + if result.success: + print(f" Output: {result.stdout.strip()[:60]}") + print() + + # Test 4: Compute-intensive Command + print("🧮 Test 4: Compute-intensive Command") + compute_cmd = "python3 -c 'print(sum(range(1000000)))'" + + start = time.time() + result = await provider.execute_command(sandbox.id, compute_cmd) + duration = time.time() - start + + print(f" Result: {'✅ PASS' if result.success else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + print(f" Output: {result.stdout.strip()}") + print() + + # Test 5: File Upload + print("📤 Test 5: File Upload") + + # Create a test file + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + test_content = "Hopx benchmark test file\n" * 100 + f.write(test_content) + local_path = f.name + + try: + start = time.time() + success = await provider.upload_file( + sandbox.id, local_path, "/workspace/benchmark_test.txt" + ) + duration = time.time() - start + + print(f" Result: {'✅ PASS' if success else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + print(f" File size: {len(test_content)} bytes") + finally: + os.unlink(local_path) + print() + + # Test 6: File Download + print("📥 Test 6: File Download") + + with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f: + download_path = f.name + + try: + start = time.time() + success = await provider.download_file( + sandbox.id, "/workspace/benchmark_test.txt", download_path + ) + duration = time.time() - start + + print(f" Result: {'✅ PASS' if success else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + + if success: + downloaded_size = Path(download_path).stat().st_size + print(f" Downloaded size: {downloaded_size} bytes") + finally: + os.unlink(download_path) + print() + + # Test 7: Multiple Commands (Sequential) + print("🔄 Test 7: Sequential Commands") + sequential_cmds = [ + "echo 'Command 1'", + "echo 'Command 2'", + "echo 'Command 3'", + "echo 'Command 4'", + "echo 'Command 5'", + ] + + start = time.time() + for cmd in sequential_cmds: + await provider.execute_command(sandbox.id, cmd) + duration = time.time() - start + + print(f" Commands: {len(sequential_cmds)}") + print(f" Total Duration: {duration:.3f}s") + print(f" Avg per command: {duration/len(sequential_cmds):.3f}s") + print() + + # Test 8: List Sandboxes + print("📋 Test 8: List Sandboxes") + start = time.time() + sandboxes = await provider.list_sandboxes() + duration = time.time() - start + + print(" Result: ✅ PASS") + print(f" Duration: {duration:.3f}s") + print(f" Sandboxes found: {len(sandboxes)}") + print() + + # Test 9: Get Sandbox + print("🔍 Test 9: Get Sandbox Details") + start = time.time() + fetched = await provider.get_sandbox(sandbox.id) + duration = time.time() - start + + print(f" Result: {'✅ PASS' if fetched else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + print() + + # Summary + print("=" * 80) + print("SUMMARY") + print("=" * 80) + print(f" Total Sandbox Lifetime: {time.time() - (start - creation_time):.3f}s") + print(f" Sandbox Creation Time: {creation_time:.3f}s") + print(" All tests completed successfully! ✅") + print() + + finally: + # Test 10: Sandbox Deletion + print("🗑️ Test 10: Sandbox Deletion") + start = time.time() + success = await provider.destroy_sandbox(sandbox.id) + duration = time.time() - start + + print(f" Result: {'✅ PASS' if success else '❌ FAIL'}") + print(f" Duration: {duration:.3f}s") + print() + + print("=" * 80) + print("BENCHMARK COMPLETE") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(benchmark_hopx()) diff --git a/tests/test_hopx_provider.py b/tests/test_hopx_provider.py new file mode 100644 index 0000000..a296e41 --- /dev/null +++ b/tests/test_hopx_provider.py @@ -0,0 +1,710 @@ +"""Tests for the Hopx sandbox provider.""" + +import os +import tempfile +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from sandboxes.base import ExecutionResult, SandboxConfig +from sandboxes.exceptions import ProviderError, SandboxError, SandboxNotFoundError +from sandboxes.providers.hopx import HopxProvider + + +@pytest.mark.asyncio +async def test_hopx_happy_path(): + """Create, execute, list, destroy, and health-check a Hopx sandbox.""" + sandbox_id = "hopx-test-123" + provider = HopxProvider(api_key="test-key") + + # Mock the Hopx SDK + with patch("sandboxes.providers.hopx.HopxSandbox") as MockHopxSandbox: # noqa: N806 + # Create mock sandbox instance + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_sandbox.get_info = AsyncMock( + return_value=MagicMock( + public_host="https://hopx-test-123.hopx.dev", + created_at=None, + template_name="code-interpreter", + ) + ) + mock_sandbox.commands.run = AsyncMock( + return_value=MagicMock(exit_code=0, stdout="hello\n", stderr="", execution_time=0.1) + ) + mock_sandbox.kill = AsyncMock() + + # Mock SDK class methods + MockHopxSandbox.create = AsyncMock(return_value=mock_sandbox) + MockHopxSandbox.list = AsyncMock(return_value=[mock_sandbox]) + + # Create sandbox + config = SandboxConfig(labels={"test": "hopx"}) + sandbox = await provider.create_sandbox(config) + assert sandbox.id == sandbox_id + assert sandbox.provider == "hopx" + + # Verify create was called with correct parameters + MockHopxSandbox.create.assert_called_once() + call_kwargs = MockHopxSandbox.create.call_args.kwargs + assert call_kwargs["template"] == "code-interpreter" + assert call_kwargs["api_key"] == "test-key" + + # List sandboxes + listed = await provider.list_sandboxes() + assert any(sb.id == sandbox_id for sb in listed) + + # Execute command + result = await provider.execute_command(sandbox_id, "echo hello") + assert result.success + assert result.stdout == "hello\n" + assert result.exit_code == 0 + + # Destroy sandbox + destroyed = await provider.destroy_sandbox(sandbox_id) + assert destroyed is True + mock_sandbox.kill.assert_called_once() + + +@pytest.mark.asyncio +async def test_hopx_missing_api_key(): + """Provider should raise ProviderError if API key is not provided.""" + with ( + patch.dict(os.environ, {}, clear=True), + pytest.raises(ProviderError, match="Hopx API key not provided"), + ): + HopxProvider() + + +@pytest.mark.asyncio +async def test_hopx_api_key_from_env(): + """Provider should use API key from environment variable.""" + with patch.dict(os.environ, {"HOPX_API_KEY": "env-key"}): + provider = HopxProvider() + assert provider.api_key == "env-key" + + +@pytest.mark.asyncio +async def test_hopx_missing_sandbox(): + """Executing against a missing sandbox should raise SandboxNotFoundError.""" + provider = HopxProvider(api_key="test-key") + + # Try to execute command on non-existent sandbox + with pytest.raises(SandboxNotFoundError, match="Sandbox .* not found"): + await provider.execute_command("unknown-id", "echo test") + + # get_sandbox should return None for non-existent sandbox + sandbox = await provider.get_sandbox("unknown-id") + assert sandbox is None + + +@pytest.mark.asyncio +async def test_hopx_http_error_raises_sandbox_error(): + """SDK errors should surface as SandboxError.""" + provider = HopxProvider(api_key="test-key") + + with patch("sandboxes.providers.hopx.HopxSandbox") as MockHopxSandbox: # noqa: N806 + # Mock SDK to raise error + MockHopxSandbox.list = AsyncMock(side_effect=Exception("API Error")) + + # health_check catches errors and returns False + result = await provider.health_check() + assert result is False + + +@pytest.mark.asyncio +async def test_hopx_stream_execution(): + """Test streaming execution with simulated chunking.""" + sandbox_id = "stream-test" + provider = HopxProvider(api_key="test-key") + + # Create mock sandbox without streaming support (fallback to simulated) + mock_sandbox = MagicMock() + mock_sandbox.sandbox_id = sandbox_id + # Explicitly set spec without run_code_stream to force fallback + mock_sandbox_spec = MagicMock(spec=["sandbox_id", "files", "commands"]) + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox_spec, + "labels": {}, + "last_accessed": 0, + } + + with patch.object(provider, "execute_command") as mock_exec: + mock_exec.return_value = ExecutionResult( + exit_code=0, + stdout="streaming output test", + stderr="", + duration_ms=50, + truncated=False, + timed_out=False, + ) + + chunks = [] + async for chunk in provider.stream_execution(sandbox_id, "echo test"): + chunks.append(chunk) + + output = "".join(chunks) + assert "streaming output test" in output + + +@pytest.mark.asyncio +async def test_hopx_file_upload(): + """Test file upload with security validation.""" + sandbox_id = "file-upload-test" + provider = HopxProvider(api_key="test-key") + + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("test file content") + temp_path = f.name + + try: + # Create mock sandbox + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_sandbox.files.write = AsyncMock() + + # Add to tracking + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + success = await provider.upload_file(sandbox_id, temp_path, "/workspace/test.txt") + assert success + + # Verify SDK method was called + mock_sandbox.files.write.assert_called_once() + call_kwargs = mock_sandbox.files.write.call_args.kwargs + assert call_kwargs["path"] == "/workspace/test.txt" + assert "content" in call_kwargs + finally: + os.unlink(temp_path) + + +@pytest.mark.asyncio +async def test_hopx_file_upload_security_validation(): + """Test that file upload prevents path traversal attacks.""" + sandbox_id = "security-test" + provider = HopxProvider(api_key="test-key") + + mock_sandbox = AsyncMock() + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + } + + # Test path traversal attack + with pytest.raises(SandboxError, match="Path traversal"): + await provider.upload_file(sandbox_id, "../../../etc/passwd", "/workspace/test.txt") + + +@pytest.mark.asyncio +async def test_hopx_file_download(): + """Test file download with security validation.""" + sandbox_id = "file-download-test" + provider = HopxProvider(api_key="test-key") + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = os.path.join(tmpdir, "downloaded.txt") + + # Create mock sandbox + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_sandbox.files.read = AsyncMock(return_value="downloaded file content") + + # Add to tracking + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + success = await provider.download_file(sandbox_id, "/workspace/test.txt", output_path) + assert success + + # Verify the content was written correctly + with open(output_path) as f: + content = f.read() + assert content == "downloaded file content" + + # Verify SDK method was called + mock_sandbox.files.read.assert_called_once_with(path="/workspace/test.txt") + + +@pytest.mark.asyncio +async def test_hopx_file_download_security_validation(): + """Test that file download prevents path traversal attacks.""" + sandbox_id = "security-test" + provider = HopxProvider(api_key="test-key") + + mock_sandbox = AsyncMock() + mock_sandbox.files.read = AsyncMock(return_value="content") + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + } + + # Test path traversal attack on destination + with pytest.raises(SandboxError, match="parent directory does not exist"): + await provider.download_file(sandbox_id, "/workspace/file.txt", "/nonexistent/path.txt") + + +@pytest.mark.asyncio +async def test_hopx_find_sandbox_with_labels(): + """Test finding a sandbox by labels.""" + provider = HopxProvider(api_key="test-key") + + # Create mock sandboxes + mock_sb1 = AsyncMock() + mock_sb1.sandbox_id = "sb-1" + mock_sb2 = AsyncMock() + mock_sb2.sandbox_id = "sb-2" + + # Add to tracking with labels + import time + + provider._sandboxes = { + "sb-1": { + "hopx_sandbox": mock_sb1, + "labels": {"env": "prod", "app": "web"}, + "last_accessed": time.time(), + "created_at": None, + }, + "sb-2": { + "hopx_sandbox": mock_sb2, + "labels": {"env": "dev", "app": "api"}, + "last_accessed": time.time() - 100, + "created_at": None, + }, + } + + # Find by matching labels + found = await provider.find_sandbox({"env": "prod"}) + assert found is not None + assert found.id == "sb-1" + + # No match + found_none = await provider.find_sandbox({"env": "staging"}) + assert found_none is None + + +@pytest.mark.asyncio +async def test_hopx_cleanup_idle_sandboxes(): + """Test cleanup of idle sandboxes.""" + provider = HopxProvider(api_key="test-key") + + import time + + # Create mock sandboxes + mock_old = AsyncMock() + mock_old.sandbox_id = "old-sandbox" + mock_old.kill = AsyncMock() + + mock_new = AsyncMock() + mock_new.sandbox_id = "new-sandbox" + mock_new.kill = AsyncMock() + + # Add to tracking with different access times + provider._sandboxes = { + "old-sandbox": { + "hopx_sandbox": mock_old, + "last_accessed": time.time() - 1000, + "labels": {}, + }, + "new-sandbox": { + "hopx_sandbox": mock_new, + "last_accessed": time.time(), + "labels": {}, + }, + } + + # Cleanup with 500 second timeout + await provider.cleanup_idle_sandboxes(idle_timeout=500) + + # Should only destroy old-sandbox + mock_old.kill.assert_called_once() + mock_new.kill.assert_not_called() + + # Old sandbox should be removed from tracking + assert "old-sandbox" not in provider._sandboxes + assert "new-sandbox" in provider._sandboxes + + +@pytest.mark.asyncio +async def test_hopx_template_selection(): + """Test that templates can be specified via config.""" + provider = HopxProvider(api_key="test-key") + + with patch("sandboxes.providers.hopx.HopxSandbox") as MockHopxSandbox: # noqa: N806 + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = "template-test" + mock_sandbox.get_info = AsyncMock( + return_value=MagicMock( + public_host="https://template-test.hopx.dev", + created_at=None, + template_name="nodejs", + ) + ) + MockHopxSandbox.create = AsyncMock(return_value=mock_sandbox) + + # Create with custom template via provider_config + config = SandboxConfig(provider_config={"template": "nodejs"}) + sandbox = await provider.create_sandbox(config) + assert sandbox.id == "template-test" + + # Verify template was passed + call_kwargs = MockHopxSandbox.create.call_args.kwargs + assert call_kwargs["template"] == "nodejs" + + +@pytest.mark.asyncio +async def test_hopx_execute_commands_batch(): + """Test executing multiple commands in sequence.""" + provider = HopxProvider(api_key="test-key") + sandbox_id = "batch-test" + + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_sandbox.commands.run = AsyncMock( + return_value=MagicMock(exit_code=0, stdout="output", stderr="", execution_time=0.1) + ) + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Execute multiple commands + commands = ["echo 'test1'", "echo 'test2'", "echo 'test3'"] + results = await provider.execute_commands(sandbox_id, commands) + + assert len(results) == 3 + assert all(r.success for r in results) + assert mock_sandbox.commands.run.call_count == 3 + + +@pytest.mark.asyncio +async def test_hopx_execute_commands_stop_on_error(): + """Test that execute_commands stops on first error when stop_on_error=True.""" + provider = HopxProvider(api_key="test-key") + sandbox_id = "error-test" + + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + + # First command succeeds, second fails, third should not run + call_count = 0 + + async def mock_run(command, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + return MagicMock(exit_code=0, stdout="ok", stderr="", execution_time=0.1) + else: + return MagicMock(exit_code=1, stdout="", stderr="error", execution_time=0.1) + + mock_sandbox.commands.run = mock_run + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + commands = ["echo 'ok'", "exit 1", "echo 'should not run'"] + results = await provider.execute_commands(sandbox_id, commands, stop_on_error=True) + + # Only first two commands should run + assert len(results) == 2 + assert results[0].success + assert not results[1].success + assert call_count == 2 # Third command not executed + + +@pytest.mark.asyncio +async def test_hopx_get_or_create_sandbox(): + """Test get_or_create_sandbox reuses existing sandboxes.""" + provider = HopxProvider(api_key="test-key") + + # Add existing sandbox + mock_existing = AsyncMock() + mock_existing.sandbox_id = "existing-sb" + provider._sandboxes["existing-sb"] = { + "hopx_sandbox": mock_existing, + "labels": {"env": "test"}, + "last_accessed": 0, + "created_at": None, + } + + # Request sandbox with matching labels + config = SandboxConfig(labels={"env": "test"}) + sandbox = await provider.get_or_create_sandbox(config) + + # Should return existing sandbox + assert sandbox.id == "existing-sb" + + +@pytest.mark.asyncio +async def test_hopx_run_code_with_rich_outputs(): + """Test run_code method for capturing plots and rich outputs.""" + sandbox_id = "rich-output-test" + provider = HopxProvider(api_key="test-key") + + # Create mock sandbox with run_code support + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + + # Mock rich output result + from unittest.mock import MagicMock + + mock_result = MagicMock() + mock_result.success = True + mock_result.stdout = "Plot created\n" + mock_result.stderr = "" + mock_result.exit_code = 0 + mock_result.execution_time = 1.5 + mock_result.rich_outputs = [ + MagicMock( + type="image/png", + data="iVBORw0KGgoAAAANSUhEUg...", # Base64 PNG data + metadata={"width": 800, "height": 600}, + ) + ] + + mock_sandbox.run_code = AsyncMock(return_value=mock_result) + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Execute code + result = await provider.run_code( + sandbox_id, + code="import matplotlib.pyplot as plt\nplt.plot([1,2,3])", + language="python", + ) + + # Verify result structure + assert result["success"] is True + assert result["stdout"] == "Plot created\n" + assert result["exit_code"] == 0 + assert result["execution_time"] == 1.5 + assert len(result["rich_outputs"]) == 1 + assert result["rich_outputs"][0]["type"] == "image/png" + assert "data" in result["rich_outputs"][0] + + # Verify SDK method was called + mock_sandbox.run_code.assert_called_once() + call_kwargs = mock_sandbox.run_code.call_args.kwargs + assert call_kwargs["code"] == "import matplotlib.pyplot as plt\nplt.plot([1,2,3])" + assert call_kwargs["language"] == "python" + + +@pytest.mark.asyncio +async def test_hopx_binary_file_upload(): + """Test binary file upload (images, PDFs, etc.).""" + sandbox_id = "binary-upload-test" + provider = HopxProvider(api_key="test-key") + + # Create a temporary binary file + import tempfile + + with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".png") as f: + # Write fake PNG header + f.write(b"\x89PNG\r\n\x1a\n") + temp_path = f.name + + try: + # Create mock sandbox + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_sandbox.files.write = AsyncMock() + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Upload binary file + success = await provider.upload_file( + sandbox_id, temp_path, "/workspace/image.png", binary=True + ) + assert success + + # Verify SDK was called with bytes + mock_sandbox.files.write.assert_called_once() + call_kwargs = mock_sandbox.files.write.call_args.kwargs + assert call_kwargs["path"] == "/workspace/image.png" + assert isinstance(call_kwargs["content"], bytes) + assert call_kwargs["content"].startswith(b"\x89PNG") + finally: + os.unlink(temp_path) + + +@pytest.mark.asyncio +async def test_hopx_binary_file_download(): + """Test binary file download (images, PDFs, etc.).""" + sandbox_id = "binary-download-test" + provider = HopxProvider(api_key="test-key") + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = os.path.join(tmpdir, "downloaded.png") + + # Create mock sandbox + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + # SDK returns bytes for binary files + mock_sandbox.files.read = AsyncMock(return_value=b"\x89PNG\r\n\x1a\n") + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Download binary file + success = await provider.download_file( + sandbox_id, "/workspace/plot.png", output_path, binary=True + ) + assert success + + # Verify binary content + with open(output_path, "rb") as f: + content = f.read() + assert content == b"\x89PNG\r\n\x1a\n" + + +@pytest.mark.asyncio +async def test_hopx_screenshot(): + """Test desktop screenshot capture.""" + sandbox_id = "screenshot-test" + provider = HopxProvider(api_key="test-key") + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = os.path.join(tmpdir, "screen.png") + + # Create mock sandbox with desktop support + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_desktop = AsyncMock() + mock_desktop.screenshot = AsyncMock(return_value=b"\x89PNG\r\n\x1a\nFAKE_SCREENSHOT") + mock_sandbox.desktop = mock_desktop + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Capture screenshot + img_bytes = await provider.screenshot(sandbox_id, output_path) + + assert img_bytes is not None + assert img_bytes.startswith(b"\x89PNG") + assert os.path.exists(output_path) + + # Verify file was saved + with open(output_path, "rb") as f: + saved_content = f.read() + assert saved_content == img_bytes + + +@pytest.mark.asyncio +async def test_hopx_screenshot_no_desktop_support(): + """Test screenshot when desktop is not available.""" + sandbox_id = "no-desktop-test" + provider = HopxProvider(api_key="test-key") + + # Create mock sandbox WITHOUT desktop support + mock_sandbox = MagicMock() + mock_sandbox.sandbox_id = sandbox_id + # Explicitly remove desktop attribute using spec + mock_sandbox_spec = MagicMock(spec=["sandbox_id", "files", "commands"]) + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox_spec, + "labels": {}, + "last_accessed": 0, + } + + # Try to capture screenshot (should return None gracefully) + img_bytes = await provider.screenshot(sandbox_id) + assert img_bytes is None + + +@pytest.mark.asyncio +async def test_hopx_get_desktop_vnc_url(): + """Test getting VNC URL for desktop automation.""" + sandbox_id = "vnc-test" + provider = HopxProvider(api_key="test-key") + + # Create mock sandbox with desktop support + mock_sandbox = AsyncMock() + mock_sandbox.sandbox_id = sandbox_id + mock_desktop = AsyncMock() + mock_vnc_info = MagicMock() + mock_vnc_info.url = "wss://hopx-vnc-123.hopx.dev/vnc" + mock_desktop.start_vnc = AsyncMock(return_value=mock_vnc_info) + mock_sandbox.desktop = mock_desktop + + provider._sandboxes[sandbox_id] = { + "hopx_sandbox": mock_sandbox, + "labels": {}, + "last_accessed": 0, + } + + # Get VNC URL + vnc_url = await provider.get_desktop_vnc_url(sandbox_id) + + assert vnc_url is not None + assert vnc_url == "wss://hopx-vnc-123.hopx.dev/vnc" + mock_desktop.start_vnc.assert_called_once() + + +@pytest.mark.asyncio +@pytest.mark.hopx +@pytest.mark.integration +async def test_hopx_live_integration(): + """Live integration test with real Hopx API. + + This test is skipped unless HOPX_API_KEY is set and pytest is run with -m hopx. + """ + api_key = os.getenv("HOPX_API_KEY") + + if not api_key: + pytest.skip("Hopx live credentials not configured") + + provider = HopxProvider(api_key=api_key) + + # Test health check first + assert await provider.health_check() is True + + # Create a sandbox + config = SandboxConfig(labels={"test": "pytest-live"}) + sandbox = await provider.create_sandbox(config) + + try: + # Execute a command + result = await provider.execute_command(sandbox.id, "echo 'hopx test'") + assert result.success + assert "hopx test" in result.stdout + + # List sandboxes + sandboxes = await provider.list_sandboxes() + assert any(sb.id == sandbox.id for sb in sandboxes) + + # Get sandbox details + fetched = await provider.get_sandbox(sandbox.id) + assert fetched is not None + assert fetched.id == sandbox.id + + finally: + # Clean up + await provider.destroy_sandbox(sandbox.id)