From 9ecdc1f26076b41140024e8d54bef774dfae7ada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AC=E7=A5=89?= Date: Thu, 12 Mar 2026 18:02:52 +0800 Subject: [PATCH] feat(sdks): client pool --- sdks/sandbox/kotlin/README.md | 44 + sdks/sandbox/kotlin/README_zh.md | 44 + sdks/sandbox/kotlin/sandbox/Module.md | 228 ---- sdks/sandbox/kotlin/sandbox/build.gradle.kts | 9 - .../sandbox/config/ConnectionConfig.kt | 18 + .../domain/exceptions/SandboxException.kt | 60 ++ .../sandbox/domain/pool/AcquirePolicy.kt | 33 + .../sandbox/domain/pool/PoolConfig.kt | 169 +++ .../sandbox/domain/pool/PoolCreationSpec.kt | 136 +++ .../sandbox/domain/pool/PoolSnapshot.kt | 30 + .../sandbox/domain/pool/PoolState.kt | 40 + .../sandbox/domain/pool/PoolStateStore.kt | 116 +++ .../sandbox/domain/pool/StoreCounters.kt | 26 + .../pool/InMemoryPoolStateStore.kt | 133 +++ .../infrastructure/pool/PoolReconciler.kt | 153 +++ .../infrastructure/pool/ReconcileState.kt | 82 ++ .../opensandbox/sandbox/pool/SandboxPool.kt | 580 +++++++++++ .../pool/InMemoryPoolStateStoreTest.kt | 183 ++++ .../pool/PoolReconcilerStateTest.kt | 460 ++++++++ .../sandbox/pool/SandboxPoolTest.kt | 140 +++ .../e2e/CodeInterpreterE2ETest.java | 16 +- .../opensandbox/e2e/SandboxE2ETest.java | 42 +- .../SandboxPoolPseudoDistributedE2ETest.java | 662 ++++++++++++ .../e2e/SandboxPoolSingleNodeE2ETest.java | 986 ++++++++++++++++++ 24 files changed, 4116 insertions(+), 274 deletions(-) delete mode 100644 sdks/sandbox/kotlin/sandbox/Module.md create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/AcquirePolicy.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolCreationSpec.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolSnapshot.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolState.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/StoreCounters.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/ReconcileState.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconcilerStateTest.kt create mode 100644 sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt create mode 100644 tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java create mode 100644 tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolSingleNodeE2ETest.java diff --git a/sdks/sandbox/kotlin/README.md b/sdks/sandbox/kotlin/README.md index 2b492c8b..645337a2 100644 --- a/sdks/sandbox/kotlin/README.md +++ b/sdks/sandbox/kotlin/README.md @@ -203,6 +203,50 @@ sandboxes.getSandboxInfos().forEach(info -> { // manager.close(); ``` +### 6. Sandbox Pool (Client-Side) + +Use `SandboxPool` to keep an idle buffer of ready sandboxes and reduce acquire latency. + +> ⚠ Experimental: `SandboxPool` is still evolving based on production feedback and may introduce breaking changes in future releases. + +```java +import com.alibaba.opensandbox.sandbox.pool.SandboxPool; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec; +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy; +import com.alibaba.opensandbox.sandbox.infrastructure.pool.InMemoryPoolStateStore; + +SandboxPool pool = SandboxPool.builder() + .poolName("demo-pool") + .ownerId("worker-1") + .maxIdle(3) + .stateStore(new InMemoryPoolStateStore()) // single-node store + .connectionConfig(config) + .creationSpec( + PoolCreationSpec.builder() + .image("ubuntu:22.04") + .entrypoint(java.util.List.of("tail", "-f", "/dev/null")) + .build() + ) + .build(); + +pool.start(); +Sandbox sb = pool.acquire(Duration.ofMinutes(10), AcquirePolicy.FAIL_FAST); +try { + sb.commands().run("echo pool-ok"); +} finally { + sb.kill(); + sb.close(); +} +pool.shutdown(true); +``` + +Pool lifecycle semantics: +- `acquire()` is only allowed when pool state is `RUNNING`. +- In `DRAINING` / `STOPPED`, `acquire()` throws `PoolNotRunningException`. + + +> For distributed deployment, your application must provide a `PoolStateStore` implementation and ensure it satisfies distributed semantics (atomic idle take, idempotent put/remove, lock ownership/renewal, pool isolation, and consistent counters). + ## Configuration ### 1. Connection Configuration diff --git a/sdks/sandbox/kotlin/README_zh.md b/sdks/sandbox/kotlin/README_zh.md index 0edfedf4..1f7f17bb 100644 --- a/sdks/sandbox/kotlin/README_zh.md +++ b/sdks/sandbox/kotlin/README_zh.md @@ -204,6 +204,50 @@ sandboxes.getSandboxInfos().forEach(info -> { // manager.close(); ``` +### 6. Sandbox Pool(客户端池化) + +你可以使用 `SandboxPool` 维护就绪沙箱的空闲缓冲区,以降低 `acquire` 延迟。 + +> ⚠ 实验性能力:`SandboxPool` 仍在根据真实生产场景持续演进,后续版本可能存在 breaking changes。 + +```java +import com.alibaba.opensandbox.sandbox.pool.SandboxPool; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec; +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy; +import com.alibaba.opensandbox.sandbox.infrastructure.pool.InMemoryPoolStateStore; + +SandboxPool pool = SandboxPool.builder() + .poolName("demo-pool") + .ownerId("worker-1") + .maxIdle(3) + .stateStore(new InMemoryPoolStateStore()) // 单机实现 + .connectionConfig(config) + .creationSpec( + PoolCreationSpec.builder() + .image("ubuntu:22.04") + .entrypoint(java.util.List.of("tail", "-f", "/dev/null")) + .build() + ) + .build(); + +pool.start(); +Sandbox sb = pool.acquire(Duration.ofMinutes(10), AcquirePolicy.FAIL_FAST); +try { + sb.commands().run("echo pool-ok"); +} finally { + sb.kill(); + sb.close(); +} +pool.shutdown(true); +``` + +池状态语义: +- `acquire()` 仅允许在 `RUNNING` 状态调用。 +- 在 `DRAINING` / `STOPPED` 状态,`acquire()` 会抛出 `PoolNotRunningException`。 + + +> 在分布式部署场景下,需要由业务方自行提供 `PoolStateStore` 实现,并保证其满足分布式语义:原子 `tryTakeIdle`、`put/remove` 幂等、主锁所有权与续约、不同 `poolName` 隔离、以及计数一致性。 + ## 配置说明 ### 1. 连接配置 (Connection Configuration) diff --git a/sdks/sandbox/kotlin/sandbox/Module.md b/sdks/sandbox/kotlin/sandbox/Module.md deleted file mode 100644 index 14e36358..00000000 --- a/sdks/sandbox/kotlin/sandbox/Module.md +++ /dev/null @@ -1,228 +0,0 @@ -# Module sandbox -The Open Sandbox SDK provides a comprehensive interface for creating and managing secure, isolated execution environments. Built with Kotlin and designed for both Kotlin and Java applications, it offers high-level abstractions for container-based sandboxing with advanced features like file system operations, command execution, and lifecycle management. - -## Features - -- **🔒 Secure Isolation**: Complete Linux OS access in isolated containers -- **📁 File System Operations**: Create, read, update, delete files and directories -- **⚡ Multi-language Execution**: Support for Python, Java, Bash, and other languages -- **🎛️ Real-time Command Execution**: Streaming output with timeout handling -- **📊 Resource Management**: CPU, memory, and storage constraints -- **🔄 Lifecycle Management**: Create, pause, resume, terminate operations -- **💚 Health Monitoring**: Automatic readiness detection and status tracking -- **🏗️ Fluent API**: Type-safe builder pattern with DSL support - -## Quick Start - -### Basic Usage - -```kotlin -// Create a simple Python sandbox -val sandbox = Sandbox.builder() - .image("python:3.11") - .build() - -// Write and execute code -sandbox.filesystem.writeFile("hello.py", "print('Hello, World!')") -val result = sandbox.commands.execute("python hello.py") -println(result.stdout) // Output: Hello, World! - -// Clean up -sandbox.terminate() -``` - -### Advanced Configuration - -```kotlin -val sandbox = Sandbox.builder() - .image("myregistry.com/app:latest") - .imageAuth("username", "password") - .resource { - put("cpu", "1000m") // 1 CPU core - put("memory", "2Gi") // 2 GB RAM - put("gpu", "1") // 1 GPU device - } - .environment { - put("DEBUG", "true") - put("LOG_LEVEL", "info") - } - .metadata { - put("project", "my-project") - put("team", "backend") - } - .timeout(Duration.ofMinutes(30)) - .readyTimeout(Duration.ofSeconds(120)) - .build() -``` - -### File System Operations - -```kotlin -// File operations -sandbox.filesystem.writeFile("config.json", """{"debug": true}""") -val content = sandbox.filesystem.readFile("config.json") -val exists = sandbox.filesystem.exists("config.json") - -// Directory operations -sandbox.filesystem.createDirectory("workspace") -val files = sandbox.filesystem.listDirectory("workspace") - -// Advanced operations -sandbox.filesystem.copy("source.txt", "backup.txt") -sandbox.filesystem.move("old.txt", "new.txt") -sandbox.filesystem.setPermissions("script.sh", "755") -``` - -### Command Execution - -```kotlin -// Synchronous execution -val result = sandbox.commands.execute("ls -la") -println("Exit code: ${result.exitCode}") -println("Output: ${result.stdout}") - -// With environment and working directory -val result = sandbox.commands.execute( - command = "npm install", - workingDirectory = "/app", - environment = mapOf("NODE_ENV" to "production"), - timeout = Duration.ofMinutes(5) -) - -// Streaming execution -sandbox.commands.executeStreaming("long-running-task").collect { event -> - when (event) { - is StreamEvent.Stdout -> print(event.data) - is StreamEvent.Stderr -> System.err.print(event.data) - is StreamEvent.Completed -> println("Exit code: ${event.exitCode}") - is StreamEvent.Error -> println("Error: ${event.message}") - } -} -``` - -## Key Components - -### Sandbox -The primary interface for interacting with sandbox environments. Provides methods for: -- Creating new sandbox instances with fluent configuration -- Connecting to existing sandboxes by ID -- Managing sandbox lifecycle (pause, resume, terminate) -- Accessing file system and command execution capabilities -- Health monitoring and status checking - -### SandboxBuilder -A fluent builder for configuring sandbox creation with: -- Container image specification with authentication -- Resource limits (CPU, memory, GPU) -- Environment variables and metadata -- Timeout and readiness configuration -- API client configuration - -### Operations Interfaces - -#### FileSystemOperations -- **File Operations**: Read, write, copy, move, delete files -- **Directory Operations**: Create, list, navigate directories -- **Metadata Operations**: Get file info, set permissions, check existence -- **Batch Operations**: Replace multiple files atomically - -#### CommandOperations -- **Synchronous Execution**: Run commands and wait for completion -- **Streaming Execution**: Real-time output streaming with Flow API -- **Background Execution**: Non-blocking command execution -- **Shell Scripts**: Execute multi-line shell scripts -- **Command Utilities**: Check command availability, get versions - -### Domain Models -- **SandboxState**: Lifecycle states (PROVISIONING, RUNNING, PAUSED, etc.) -- **ExecutionResult**: Command execution output with exit code and timing -- **FileInfo**: File system entry information with permissions and metadata -- **Resource Maps**: Kubernetes-style resource specifications as key-value pairs -- **StreamEvent**: Real-time command output events - -### Infrastructure Layer -- **ApiClientAdapter**: HTTP client management with authentication and retry logic -- **SandboxConfig**: Centralized configuration with environment variable support -- **ModelAdapter**: Translation between OpenAPI models and domain types -- **Exception Hierarchy**: Specific exceptions for different error scenarios - -## Architecture - -The SDK follows a clean architecture with clear separation of concerns: - -``` -┌─────────────────────────────────────────┐ -│ Public API │ -│ (Sandbox, SandboxBuilder) │ -├─────────────────────────────────────────┤ -│ Operations Layer │ -│ (FileSystem, Command, Lifecycle) │ -├─────────────────────────────────────────┤ -│ Infrastructure Layer │ -│ (API Clients, Configuration) │ -├─────────────────────────────────────────┤ -│ Domain Layer │ -│ (Types, Exceptions, Models) │ -└─────────────────────────────────────────┘ -``` - -## Java Interoperability - -The SDK is fully compatible with Java applications: - -```java -// Java usage example -Sandbox sandbox = Sandbox.builder() - .image("openjdk:11") - .resource(Map.of( - "cpu", "1000m", - "memory", "2Gi" - )) - .build(); - -ExecutionResult result = sandbox.getCommands().execute("java -version"); -System.out.println("Java version: " + result.getStdout()); - -sandbox.terminate(); -``` - -## Best Practices - -### Resource Management -Always use try-with-resources or explicit cleanup: - -```kotlin -// Using AutoCloseable -Sandbox.builder() - .image("python:3.11") - .build() - .use { sandbox -> - // Use sandbox - automatically terminated when exiting - sandbox.filesystem.writeFile("script.py", "print('Hello')") - sandbox.commands.execute("python script.py") - } -``` - -### Error Handling -Handle specific exception types: - -```kotlin -try { - val sandbox = Sandbox.builder().image("python:3.11").build() -} catch (e: AuthenticationException) { - // Handle auth errors -} catch (e: TimeoutException) { - // Handle timeouts -} catch (e: SandboxException) { - // Handle general sandbox errors -} -``` - -## Usage Examples - -See the [samples](../../samples/) directory for comprehensive usage examples including: -- Basic sandbox creation and usage -- Advanced configuration scenarios -- File system operations -- Command execution patterns -- Error handling strategies diff --git a/sdks/sandbox/kotlin/sandbox/build.gradle.kts b/sdks/sandbox/kotlin/sandbox/build.gradle.kts index 9343d5e2..05093390 100644 --- a/sdks/sandbox/kotlin/sandbox/build.gradle.kts +++ b/sdks/sandbox/kotlin/sandbox/build.gradle.kts @@ -58,12 +58,3 @@ tasks.withType { ) } } - -tasks.withType().configureEach { - dokkaSourceSets { - named("main") { - moduleName.set("Sandbox") - includes.from("Module.md") - } - } -} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/config/ConnectionConfig.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/config/ConnectionConfig.kt index 86181c5a..df451ff5 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/config/ConnectionConfig.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/config/ConnectionConfig.kt @@ -47,6 +47,24 @@ class ConnectionConfig private constructor( */ val useServerProxy: Boolean = false, ) { + /** + * Creates a copy of this ConnectionConfig without copying the connectionPool. + * The returned config will have connectionPool set to null and connectionPoolManagedByUser set to false. + */ + fun copyWithoutConnectionPool(): ConnectionConfig = + ConnectionConfig( + apiKey = this.apiKey, + domain = this.domain, + protocol = this.protocol, + requestTimeout = this.requestTimeout, + debug = this.debug, + userAgent = this.userAgent, + headers = this.headers, + connectionPool = null, + connectionPoolManagedByUser = false, + useServerProxy = this.useServerProxy, + ) + companion object { private const val DEFAULT_DOMAIN = "localhost:8080" private const val DEFAULT_PROTOCOL = "http" diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/exceptions/SandboxException.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/exceptions/SandboxException.kt index da033beb..9c8c2934 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/exceptions/SandboxException.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/exceptions/SandboxException.kt @@ -88,6 +88,54 @@ class InvalidArgumentException( error = SandboxError(SandboxError.INVALID_ARGUMENT, message), ) +/** + * Thrown when acquire is called with FAIL_FAST policy and no idle sandbox is available. + */ +class PoolEmptyException( + message: String? = "No idle sandbox available and policy is FAIL_FAST", + cause: Throwable? = null, +) : SandboxException( + message = message, + cause = cause, + error = SandboxError(SandboxError.POOL_EMPTY, message), + ) + +/** + * Thrown when the pool state store is unavailable during idle take/put/lock operations. + */ +class PoolStateStoreUnavailableException( + message: String? = null, + cause: Throwable? = null, +) : SandboxException( + message = message, + cause = cause, + error = SandboxError(SandboxError.POOL_STATE_STORE_UNAVAILABLE, message), + ) + +/** + * Thrown when atomic take or lock-update conflicts occur in the state store. + */ +class PoolStateStoreContentionException( + message: String? = null, + cause: Throwable? = null, +) : SandboxException( + message = message, + cause = cause, + error = SandboxError(SandboxError.POOL_STATE_STORE_CONTENTION, message), + ) + +/** + * Thrown when acquire is called while pool is not in RUNNING state. + */ +class PoolNotRunningException( + message: String? = "Pool is not running", + cause: Throwable? = null, +) : SandboxException( + message = message, + cause = cause, + error = SandboxError(SandboxError.POOL_NOT_RUNNING, message), + ) + /** * Defines standardized common error codes and messages for the Sandbox SDK. */ @@ -101,5 +149,17 @@ data class SandboxError( const val UNHEALTHY = "UNHEALTHY" const val INVALID_ARGUMENT = "INVALID_ARGUMENT" const val UNEXPECTED_RESPONSE = "UNEXPECTED_RESPONSE" + + /** Pool-specific: no idle sandbox and policy is FAIL_FAST. */ + const val POOL_EMPTY = "POOL_EMPTY" + + /** Pool state store unavailable during operations. */ + const val POOL_STATE_STORE_UNAVAILABLE = "POOL_STATE_STORE_UNAVAILABLE" + + /** Pool state store contention (atomic take or lock conflicts). */ + const val POOL_STATE_STORE_CONTENTION = "POOL_STATE_STORE_CONTENTION" + + /** Pool is not in RUNNING state when acquire is requested. */ + const val POOL_NOT_RUNNING = "POOL_NOT_RUNNING" } } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/AcquirePolicy.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/AcquirePolicy.kt new file mode 100644 index 00000000..3491610d --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/AcquirePolicy.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolEmptyException + +/** + * Policy for acquire when the idle buffer is empty. + * + * - FAIL_FAST: throw [PoolEmptyException] (POOL_EMPTY). + * - DIRECT_CREATE: attempt direct create via lifecycle API, then connect and return. + */ +enum class AcquirePolicy { + /** When no idle sandbox is available, fail immediately with POOL_EMPTY. */ + FAIL_FAST, + + /** When no idle sandbox is available, create a new sandbox via lifecycle API. */ + DIRECT_CREATE, +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt new file mode 100644 index 00000000..2d983630 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt @@ -0,0 +1,169 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +import com.alibaba.opensandbox.sandbox.config.ConnectionConfig +import java.util.UUID +import kotlin.math.ceil + +/** + * Configuration for a client-side sandbox pool. + * + * @property poolName User-defined name and namespace for this logical pool (required). + * @property ownerId Unique process identity for primary lock ownership. If not provided, a UUID-based default is generated. + * @property maxIdle Standby idle target/cap (required). + * @property warmupConcurrency Max concurrent creation workers during replenish (default: max(1, ceil(maxIdle * 0.2))). + * @property primaryLockTtl Lock TTL for distributed primary ownership (default: 60s). + * @property stateStore Injected [PoolStateStore] implementation (required). + * @property connectionConfig Connection config for lifecycle API (required). + * @property creationSpec Template for creating sandboxes (replenish and direct-create) (required). + * @property reconcileInterval Interval between reconcile ticks (default: 30s). + * @property degradedThreshold Consecutive create failures required to transition to DEGRADED (default: 3). + * @property drainTimeout Max wait during graceful shutdown for in-flight ops (default: 30s). + */ +data class PoolConfig( + val poolName: String, + val ownerId: String, + val maxIdle: Int, + val warmupConcurrency: Int, + val primaryLockTtl: java.time.Duration, + val stateStore: PoolStateStore, + val connectionConfig: ConnectionConfig, + val creationSpec: PoolCreationSpec, + val reconcileInterval: java.time.Duration, + val degradedThreshold: Int, + val drainTimeout: java.time.Duration, +) { + init { + require(poolName.isNotBlank()) { "poolName must not be blank" } + require(ownerId.isNotBlank()) { "ownerId must not be blank" } + require(maxIdle >= 0) { "maxIdle must be >= 0" } + require(warmupConcurrency > 0) { "warmupConcurrency must be positive" } + require(degradedThreshold > 0) { "degradedThreshold must be positive" } + require(!reconcileInterval.isNegative && !reconcileInterval.isZero) { "reconcileInterval must be positive" } + require(!primaryLockTtl.isNegative && !primaryLockTtl.isZero) { "primaryLockTtl must be positive" } + require(!drainTimeout.isNegative) { "drainTimeout must be non-negative" } + } + + companion object { + private val DEFAULT_RECONCILE_INTERVAL = java.time.Duration.ofSeconds(30) + private val DEFAULT_PRIMARY_LOCK_TTL = java.time.Duration.ofSeconds(60) + private const val DEFAULT_DEGRADED_THRESHOLD = 3 + private val DEFAULT_DRAIN_TIMEOUT = java.time.Duration.ofSeconds(30) + + @JvmStatic + fun builder(): Builder = Builder() + } + + class Builder { + private var poolName: String? = null + private var ownerId: String? = null + private var maxIdle: Int? = null + private var warmupConcurrency: Int? = null + private var primaryLockTtl: java.time.Duration = DEFAULT_PRIMARY_LOCK_TTL + private var stateStore: PoolStateStore? = null + private var connectionConfig: ConnectionConfig? = null + private var creationSpec: PoolCreationSpec? = null + private var reconcileInterval: java.time.Duration = DEFAULT_RECONCILE_INTERVAL + private var degradedThreshold: Int = DEFAULT_DEGRADED_THRESHOLD + private var drainTimeout: java.time.Duration = DEFAULT_DRAIN_TIMEOUT + + fun poolName(poolName: String): Builder { + this.poolName = poolName + return this + } + + fun ownerId(ownerId: String): Builder { + this.ownerId = ownerId + return this + } + + fun maxIdle(maxIdle: Int): Builder { + this.maxIdle = maxIdle + return this + } + + fun warmupConcurrency(warmupConcurrency: Int): Builder { + this.warmupConcurrency = warmupConcurrency + return this + } + + fun primaryLockTtl(primaryLockTtl: java.time.Duration): Builder { + this.primaryLockTtl = primaryLockTtl + return this + } + + fun stateStore(stateStore: PoolStateStore): Builder { + this.stateStore = stateStore + return this + } + + fun connectionConfig(connectionConfig: ConnectionConfig): Builder { + this.connectionConfig = connectionConfig + return this + } + + fun creationSpec(creationSpec: PoolCreationSpec): Builder { + this.creationSpec = creationSpec + return this + } + + fun reconcileInterval(reconcileInterval: java.time.Duration): Builder { + this.reconcileInterval = reconcileInterval + return this + } + + fun degradedThreshold(degradedThreshold: Int): Builder { + this.degradedThreshold = degradedThreshold + return this + } + + fun drainTimeout(drainTimeout: java.time.Duration): Builder { + this.drainTimeout = drainTimeout + return this + } + + private fun generateDefaultOwnerId(): String { + return "pool-owner-${UUID.randomUUID()}" + } + + fun build(): PoolConfig { + val name = poolName ?: throw IllegalArgumentException("poolName is required") + val owner = ownerId ?: generateDefaultOwnerId() + val max = maxIdle ?: throw IllegalArgumentException("maxIdle is required") + val store = stateStore ?: throw IllegalArgumentException("stateStore is required") + val conn = connectionConfig ?: throw IllegalArgumentException("connectionConfig is required") + val spec = creationSpec ?: throw IllegalArgumentException("creationSpec is required") + + val warmup = warmupConcurrency ?: ceil(max * 0.2).toInt().coerceAtLeast(1) + + return PoolConfig( + poolName = name, + ownerId = owner, + maxIdle = max, + warmupConcurrency = warmup, + primaryLockTtl = primaryLockTtl, + stateStore = store, + connectionConfig = conn, + creationSpec = spec, + reconcileInterval = reconcileInterval, + degradedThreshold = degradedThreshold, + drainTimeout = drainTimeout, + ) + } + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolCreationSpec.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolCreationSpec.kt new file mode 100644 index 00000000..7fc2b604 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolCreationSpec.kt @@ -0,0 +1,136 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +import com.alibaba.opensandbox.sandbox.Sandbox +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.NetworkPolicy +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxImageSpec +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.Volume + +/** + * Template for creating sandboxes in the pool (replenish and direct-create). + * + * Pool always uses a fixed 24h timeout for created sandboxes; other parameters + * are taken from this spec. Defaults align with [Sandbox.Builder]. + * + * @property imageSpec Container image specification (required). + * @property entrypoint Entrypoint command (default: tail -f /dev/null). + * @property resource Resource limits (default: cpu=1, memory=2Gi). + * @property env Environment variables. + * @property metadata User-defined metadata. + * @property networkPolicy Optional outbound network policy. + * @property volumes Optional volume mounts. + */ +data class PoolCreationSpec( + val imageSpec: SandboxImageSpec, + val entrypoint: List = DEFAULT_ENTRYPOINT, + val resource: Map = DEFAULT_RESOURCE, + val env: Map = emptyMap(), + val metadata: Map = emptyMap(), + val networkPolicy: NetworkPolicy? = null, + val volumes: List? = null, +) { + companion object { + /** Default entrypoint: keep container running. */ + val DEFAULT_ENTRYPOINT: List = listOf("tail", "-f", "/dev/null") + + /** Default resource limits. */ + val DEFAULT_RESOURCE: Map = + mapOf( + "cpu" to "1", + "memory" to "2Gi", + ) + + @JvmStatic + fun builder(): Builder = Builder() + } + + class Builder { + private var imageSpec: SandboxImageSpec? = null + private var entrypoint: List = DEFAULT_ENTRYPOINT + private var resource: Map = DEFAULT_RESOURCE + private var env: Map = emptyMap() + private var metadata: Map = emptyMap() + private var networkPolicy: NetworkPolicy? = null + private var volumes: List? = null + + fun imageSpec(imageSpec: SandboxImageSpec): Builder { + this.imageSpec = imageSpec + return this + } + + fun image(image: String): Builder { + this.imageSpec = SandboxImageSpec.builder().image(image).build() + return this + } + + fun entrypoint(entrypoint: List): Builder { + this.entrypoint = entrypoint + return this + } + + fun entrypoint(vararg entrypoint: String): Builder { + this.entrypoint = entrypoint.toList() + return this + } + + fun resource(resource: Map): Builder { + this.resource = resource + return this + } + + fun resource(configure: MutableMap.() -> Unit): Builder { + val map = DEFAULT_RESOURCE.toMutableMap() + map.configure() + this.resource = map + return this + } + + fun env(env: Map): Builder { + this.env = env + return this + } + + fun metadata(metadata: Map): Builder { + this.metadata = metadata + return this + } + + fun networkPolicy(networkPolicy: NetworkPolicy?): Builder { + this.networkPolicy = networkPolicy + return this + } + + fun volumes(volumes: List?): Builder { + this.volumes = volumes + return this + } + + fun build(): PoolCreationSpec { + val spec = imageSpec ?: throw IllegalArgumentException("PoolCreationSpec imageSpec (or image) must be specified") + return PoolCreationSpec( + imageSpec = spec, + entrypoint = entrypoint, + resource = resource, + env = env, + metadata = metadata, + networkPolicy = networkPolicy, + volumes = volumes, + ) + } + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolSnapshot.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolSnapshot.kt new file mode 100644 index 00000000..0bcccaf8 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolSnapshot.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +/** + * Point-in-time snapshot of pool state for observability. + * + * @property state Current pool state (HEALTHY, DEGRADED, DRAINING, STOPPED). + * @property idleCount Number of idle sandboxes in the store. + * @property lastError Last error message if pool is DEGRADED or after failure; null otherwise. + */ +data class PoolSnapshot( + val state: PoolState, + val idleCount: Int, + val lastError: String? = null, +) diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolState.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolState.kt new file mode 100644 index 00000000..22495c23 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolState.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +/** + * High-level state of the sandbox pool. + * + * Transitions: + * - HEALTHY -> DEGRADED: consecutive create failures reach threshold + * - DEGRADED -> HEALTHY: probe or create succeeds, failure counter resets + * - HEALTHY/DEGRADED -> DRAINING: shutdown(graceful=true) called; pool replenish stops + * - any -> STOPPED: shutdown(graceful=false) or drain completes + */ +enum class PoolState { + /** Pool is operating normally. */ + HEALTHY, + + /** Replenish is failing; backoff applied; acquire still served from existing idle. */ + DEGRADED, + + /** Graceful shutdown in progress; no new replenish, waiting for in-flight ops. */ + DRAINING, + + /** Pool is stopped; no replenish and acquire() is rejected. */ + STOPPED, +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt new file mode 100644 index 00000000..90c6f2e9 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt @@ -0,0 +1,116 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +import java.time.Duration +import java.time.Instant + +/** + * Abstraction for storing pool coordination state and idle sandbox membership. + * + * All operations are namespaced by [poolName]. Implementations must ensure: + * - Atomic take: one idle sandbox can only be taken by one acquire. + * - Idempotent put/remove for idle membership. + * - tryTakeIdle should prefer FIFO (oldest idle first) as best-effort. + * + * In distributed mode, only the current primary lock holder may execute + * reconcile maintenance writes (putIdle, reapExpiredIdle). Foreground + * acquire-path writes (tryTakeIdle, removeIdle) are allowed on all nodes. + */ +interface PoolStateStore { + /** + * Atomically removes and returns one idle sandbox ID for the pool, or null if none. + * Best-effort FIFO (oldest first). + */ + fun tryTakeIdle(poolName: String): String? + + /** + * Adds a sandbox ID to the idle set for the pool. + * Idempotent: duplicate put for same sandboxId leaves membership single-copy. + */ + fun putIdle( + poolName: String, + sandboxId: String, + ) + + /** + * Removes a sandbox ID from the idle set. + * Idempotent: duplicate remove is no-op. + */ + fun removeIdle( + poolName: String, + sandboxId: String, + ) + + /** + * Tries to acquire the primary (leader) lock for this pool. + * Best-effort mutually exclusive by poolName. Returns true if this node is now primary. + */ + fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean + + /** + * Renews the primary lock for the current owner. Non-owner renew is rejected. + */ + fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean + + /** + * Releases the primary lock for the given owner. + */ + fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) + + /** + * Removes expired idle entries. In-memory store performs sweep; TTL-backed stores may no-op. + */ + fun reapExpiredIdle( + poolName: String, + now: Instant, + ) + + /** + * Returns a snapshot of counters for the pool (at least idle count). + * Eventually consistent for distributed stores. + */ + fun snapshotCounters(poolName: String): StoreCounters + + /** + * Returns the cluster-wide max idle target for the pool, if set. + * Used in distributed mode so that [resize] on any node is visible to the leader. + * Return null to use the calling node's local config (e.g. single-node InMemory returns null). + */ + fun getMaxIdle(poolName: String): Int? + + /** + * Sets the cluster-wide max idle target for the pool. + * In distributed mode, all nodes (including the leader) should use this for reconcile. + * Single-node implementations may no-op; the pool falls back to local [currentMaxIdle]. + */ + fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/StoreCounters.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/StoreCounters.kt new file mode 100644 index 00000000..d3e3c338 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/StoreCounters.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +/** + * Snapshot of pool store counters. + * + * @property idleCount Number of sandbox IDs currently in the idle set. + */ +data class StoreCounters( + val idleCount: Int, +) diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt new file mode 100644 index 00000000..9746bfbc --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt @@ -0,0 +1,133 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.infrastructure.pool + +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore +import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters +import java.time.Duration +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * In-memory implementation of [PoolStateStore] for single-node use. + * + * Concurrency is provided entirely by concurrent data structures: + * - Per-pool state uses [ConcurrentHashMap] (sandboxId -> [IdleEntry]) and + * [ConcurrentLinkedQueue] (FIFO sandboxIds). No external lock. + * - Primary lock is a no-op: single-node mode always treats the caller as leader + * ([tryAcquirePrimaryLock]/[renewPrimaryLock] return true, [releasePrimaryLock] is no-op). + * + * Idle entries use a fixed 24h TTL; expired entries are removed on take, put, reap, or snapshot. + * [tryTakeIdle] returns oldest (FIFO) non-expired idle sandbox ID. + */ +class InMemoryPoolStateStore : PoolStateStore { + /** Fixed idle TTL per OSEP (24h). */ + private val idleTtl: Duration = Duration.ofHours(24) + + /** Per pool: (map = sandboxId -> entry for idempotent put + expiry, queue = FIFO order for take). */ + private val pools = ConcurrentHashMap() + + override fun tryTakeIdle(poolName: String): String? { + val state = pools[poolName] ?: return null + val now = Instant.now() + while (true) { + val sandboxId = state.queue.poll() ?: return null + val entry = state.map.remove(sandboxId) ?: continue // already removed (e.g. by removeIdle) + if (entry.expiresAt.isAfter(now)) return sandboxId + // expired, discard and poll next + } + } + + override fun putIdle( + poolName: String, + sandboxId: String, + ) { + val state = pools.computeIfAbsent(poolName) { PoolIdleState() } + val expiresAt = Instant.now().plus(idleTtl) + val entry = IdleEntry(sandboxId, expiresAt) + if (state.map.putIfAbsent(sandboxId, entry) == null) { + state.queue.add(sandboxId) + } + } + + override fun removeIdle( + poolName: String, + sandboxId: String, + ) { + pools[poolName]?.map?.remove(sandboxId) + // queue may still contain sandboxId; tryTakeIdle will skip it (map.remove returns null) + } + + override fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean { + // Single-node: no real lock; always grant so reconcile runs. + return true + } + + override fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean { + // Single-node: no real lock; always succeed. + return true + } + + override fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) { + // Single-node: no-op. + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + ) { + val state = pools[poolName] ?: return + state.map.entries.removeIf { it.value.expiresAt <= now } + state.queue.removeIf { sandboxId -> !state.map.containsKey(sandboxId) } + } + + override fun snapshotCounters(poolName: String): StoreCounters { + val state = pools[poolName] ?: return StoreCounters(idleCount = 0) + val now = Instant.now() + state.map.entries.removeIf { it.value.expiresAt <= now } + state.queue.removeIf { sandboxId -> !state.map.containsKey(sandboxId) } + return StoreCounters(idleCount = state.map.size) + } + + override fun getMaxIdle(poolName: String): Int? = null + + override fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) { + // Single-node: no shared state; pool uses local currentMaxIdle. + } + + private data class IdleEntry(val sandboxId: String, val expiresAt: Instant) + + private class PoolIdleState { + val map = ConcurrentHashMap() + val queue = ConcurrentLinkedQueue() + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt new file mode 100644 index 00000000..03607fb6 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt @@ -0,0 +1,153 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.infrastructure.pool + +import com.alibaba.opensandbox.sandbox.domain.pool.PoolConfig +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService + +/** + * Runs one reconcile tick: leader-gated replenish and TTL reap. + * + * Only the current primary lock holder performs create + putIdle. + * Leader does not voluntarily release the lock; it is only lost when renew fails or TTL expires. + * Call from a periodic scheduler; [createOne] should call lifecycle create and return the new sandbox ID or null on failure. + */ +internal object PoolReconciler { + private val logger = LoggerFactory.getLogger(PoolReconciler::class.java) + + /** + * Runs a single reconcile tick. If this node does not hold the primary lock, returns immediately. + * Otherwise: reaps expired idle, snapshots counters, then creates up to min(deficit, warmupConcurrency) + * sandboxes via [createOne] concurrently using [warmupExecutor]. + * Lock is not released at end of tick (distributed implementations rely on TTL or renew failure to release). + */ + fun runReconcileTick( + config: PoolConfig, + stateStore: PoolStateStore, + createOne: () -> String?, + onOrphanedCreated: (String) -> Unit = {}, + reconcileState: ReconcileState, + warmupExecutor: ExecutorService, + ) { + val poolName = config.poolName + val ownerId = config.ownerId + val ttl = config.primaryLockTtl + + if (!stateStore.tryAcquirePrimaryLock(poolName, ownerId, ttl)) { + logger.trace("Reconcile skip (not primary): pool_name={}", poolName) + return + } + runPrimaryReplenishOnce(config, stateStore, createOne, onOrphanedCreated, reconcileState, warmupExecutor) + // Do not release primary lock here; leader holds until renew fails or TTL expires. + } + + private fun runPrimaryReplenishOnce( + config: PoolConfig, + stateStore: PoolStateStore, + createOne: () -> String?, + onOrphanedCreated: (String) -> Unit, + reconcileState: ReconcileState, + warmupExecutor: ExecutorService, + ) { + val poolName = config.poolName + val ownerId = config.ownerId + val ttl = config.primaryLockTtl + val now = Instant.now() + + stateStore.reapExpiredIdle(poolName, now) + val counters = stateStore.snapshotCounters(poolName) + val deficit = (config.maxIdle - counters.idleCount).coerceAtLeast(0) + val toCreate = minOf(deficit, config.warmupConcurrency) + + if (toCreate == 0 || reconcileState.isBackoffActive(now)) { + stateStore.renewPrimaryLock(poolName, ownerId, ttl) + logger.debug( + "Reconcile tick: pool_name={} idle={} deficit={} toCreate=0 (backoff={})", + poolName, + counters.idleCount, + deficit, + reconcileState.isBackoffActive(now), + ) + return + } + + logger.debug( + "Reconcile tick: pool_name={} idle={} deficit={} toCreate={}", + poolName, + counters.idleCount, + deficit, + toCreate, + ) + + if (!stateStore.renewPrimaryLock(poolName, ownerId, ttl)) return + val tasks = List(toCreate) { Callable { createOne() } } + val results: List> = + warmupExecutor.invokeAll(tasks).map { f -> + try { + f.get() to null + } catch (e: Exception) { + null to e.message + } + } + + val createdSandboxIds = mutableListOf() + for ((newId, errorMessage) in results) { + if (newId != null) { + createdSandboxIds += newId + } else { + reconcileState.recordFailure(errorMessage) + } + } + + var created = 0 + for (index in createdSandboxIds.indices) { + val sandboxId = createdSandboxIds[index] + if (!stateStore.renewPrimaryLock(poolName, ownerId, ttl)) { + val orphanedCount = createdSandboxIds.size - index + for (orphanedIndex in index until createdSandboxIds.size) { + try { + onOrphanedCreated(createdSandboxIds[orphanedIndex]) + } catch (e: Exception) { + logger.warn( + "Reconcile orphaned sandbox cleanup failed: pool_name={} sandbox_id={} error={}", + poolName, + createdSandboxIds[orphanedIndex], + e.message, + ) + } + } + logger.warn( + "Reconcile lost primary lock before putIdle; dropped {} newly created sandbox(es): pool_name={}", + orphanedCount, + poolName, + ) + return + } + stateStore.putIdle(poolName, sandboxId) + created++ + reconcileState.recordSuccess() + } + + if (created > 0) { + logger.debug("Reconcile created {} sandboxes: pool_name={}", created, poolName) + } + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/ReconcileState.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/ReconcileState.kt new file mode 100644 index 00000000..1d7ffa80 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/ReconcileState.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.infrastructure.pool + +import com.alibaba.opensandbox.sandbox.domain.pool.PoolState +import java.time.Duration +import java.time.Instant + +/** + * Mutable state for reconcile loop: failure count, pool state, and exponential backoff. + * + * Thread-safe for use from reconcile worker and from pool snapshot. + */ +internal class ReconcileState( + private val degradedThreshold: Int, + private val backoffBase: Duration = Duration.ofSeconds(1), + private val backoffMax: Duration = Duration.ofSeconds(60), +) { + @Volatile + var failureCount: Int = 0 + private set + + @Volatile + var state: PoolState = PoolState.HEALTHY + private set + + @Volatile + var lastError: String? = null + private set + + @Volatile + private var backoffUntil: Instant? = null + + private var backoffAttempts: Int = 0 + + @Synchronized + fun recordSuccess() { + failureCount = 0 + if (state == PoolState.DEGRADED) state = PoolState.HEALTHY + backoffUntil = null + backoffAttempts = 0 + lastError = null + } + + @Synchronized + fun recordFailure(errorMessage: String?) { + failureCount++ + lastError = errorMessage + if (failureCount >= degradedThreshold) { + state = PoolState.DEGRADED + backoffAttempts++ + val exponent = backoffAttempts.coerceAtMost(10) + val delaySeconds = backoffBase.seconds * (1L shl exponent) + val delayMs = + minOf( + Duration.ofSeconds(delaySeconds).toMillis(), + backoffMax.toMillis(), + ) + backoffUntil = Instant.now().plusMillis(delayMs) + } + } + + /** True if reconciler should skip create attempts this tick (in backoff window). */ + fun isBackoffActive(now: Instant = Instant.now()): Boolean { + val until = backoffUntil ?: return false + return state == PoolState.DEGRADED && now.isBefore(until) + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt new file mode 100644 index 00000000..6396576f --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt @@ -0,0 +1,580 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.pool + +import com.alibaba.opensandbox.sandbox.Sandbox +import com.alibaba.opensandbox.sandbox.SandboxManager +import com.alibaba.opensandbox.sandbox.config.ConnectionConfig +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolEmptyException +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolNotRunningException +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy +import com.alibaba.opensandbox.sandbox.domain.pool.PoolConfig +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec +import com.alibaba.opensandbox.sandbox.domain.pool.PoolSnapshot +import com.alibaba.opensandbox.sandbox.domain.pool.PoolState +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore +import com.alibaba.opensandbox.sandbox.infrastructure.pool.PoolReconciler +import com.alibaba.opensandbox.sandbox.infrastructure.pool.ReconcileState +import org.slf4j.LoggerFactory +import java.time.Duration +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock + +/** + * Client-side sandbox pool for acquiring ready sandboxes with predictable latency. + * + * The pool maintains an idle buffer of clean, borrowable sandboxes. Callers [acquire] a sandbox, + * use it, and terminate it via [Sandbox.kill] when done. No return/finalize API; sandboxes are ephemeral. + * + * Uses [PoolStateStore] for idle membership and primary lock; runs a background reconcile loop + * when started. Replenish is leader-gated; acquire is allowed on all nodes. + * + * ## Usage + * + * ```kotlin + * val pool = SandboxPool.builder() + * .poolName("my-pool") + * .ownerId("worker-1") + * .maxIdle(5) + * .stateStore(InMemoryPoolStateStore()) + * .connectionConfig(connectionConfig) + * .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + * .build() + * pool.start() + * + * val sandbox = pool.acquire(sandboxTimeout = Duration.ofMinutes(30), policy = AcquirePolicy.DIRECT_CREATE) + * try { + * // use sandbox + * } finally { + * sandbox.kill() + * } + * + * pool.shutdown(graceful = true) + * ``` + * + * @see PoolConfig + */ +class SandboxPool internal constructor( + config: PoolConfig, +) { + private val logger = LoggerFactory.getLogger(SandboxPool::class.java) + + private val idleTtl = Duration.ofHours(24) + + private val config: PoolConfig = config + private val stateStore: PoolStateStore = config.stateStore + private val connectionConfig: ConnectionConfig = config.connectionConfig + private val creationSpec: PoolCreationSpec = config.creationSpec + private val reconcileState = ReconcileState(config.degradedThreshold) + + @Volatile + private var currentMaxIdle: Int = config.maxIdle + + private val lifecycleState = AtomicReference(LifecycleState.NOT_STARTED) + private var sandboxManager: SandboxManager? = null + private var scheduler: ScheduledExecutorService? = null + private var warmupExecutor: ExecutorService? = null + private var reconcileTask: ScheduledFuture<*>? = null + private val inFlightOperations = AtomicInteger(0) + private val inFlightLock = ReentrantLock() + private val inFlightZero: Condition = inFlightLock.newCondition() + + /** + * Starts the pool: begins the background reconcile loop and, if [PoolConfig.maxIdle] > 0, + * triggers an immediate warmup tick. + */ + @Synchronized + fun start() { + if (lifecycleState.get() == LifecycleState.RUNNING || lifecycleState.get() == LifecycleState.STARTING) { + return + } + lifecycleState.set(LifecycleState.STARTING) + try { + sandboxManager = SandboxManager.builder().connectionConfig(connectionConfig.copyWithoutConnectionPool()).build() + if (stateStore.getMaxIdle(config.poolName) == null) { + stateStore.setMaxIdle(config.poolName, config.maxIdle) + } + warmupExecutor = + Executors.newFixedThreadPool(config.warmupConcurrency.coerceAtLeast(1)) { r -> + Thread(r, "sandbox-pool-warmup-${config.poolName}").apply { isDaemon = true } + } + val exec = + Executors.newSingleThreadScheduledExecutor { r -> + Thread(r, "sandbox-pool-reconcile-${config.poolName}").apply { isDaemon = true } + } + scheduler = exec + val reconcileIntervalMs = config.reconcileInterval.toMillis() + reconcileTask = + exec.scheduleAtFixedRate( + { + try { + runReconcileTick() + } catch (t: Throwable) { + // Keep periodic scheduling alive even if one tick fails unexpectedly. + logger.error("Pool reconcile tick failed unexpectedly: pool_name={}", config.poolName, t) + } + }, + if (config.maxIdle > 0) 0 else reconcileIntervalMs, + reconcileIntervalMs, + TimeUnit.MILLISECONDS, + ) + lifecycleState.set(LifecycleState.RUNNING) + logger.info( + "Pool started: pool_name={} state={} maxIdle={}", + config.poolName, + LifecycleState.RUNNING, + currentMaxIdle, + ) + } catch (e: Exception) { + stopReconcile() + closeProvider() + lifecycleState.set(LifecycleState.STOPPED) + logger.error("Pool start failed: pool_name={}", config.poolName, e) + throw e + } + } + + /** + * Acquires a sandbox from the pool or creates one directly per policy. + * + * 1. Tries to take an idle sandbox ID from the store and connect. + * 2. If connect fails (stale ID), removes the ID, best-effort kill, then falls back to direct create. + * 3. If no idle and [policy] is [AcquirePolicy.FAIL_FAST], throws [PoolEmptyException]. + * 4. If no idle and [policy] is [AcquirePolicy.DIRECT_CREATE], creates a new sandbox via lifecycle API and returns it. + * 5. If pool is not RUNNING (e.g. DRAINING/STOPPED), throws [PoolNotRunningException]. + * + * @param sandboxTimeout Optional duration to set on the acquired sandbox (applied via renew after connect). + * @param policy Behavior when idle buffer is empty (default: [AcquirePolicy.DIRECT_CREATE]). + * @return A connected [Sandbox] instance. Caller must call [Sandbox.kill] when done. + * @throws PoolNotRunningException when pool lifecycle state is not RUNNING. + * @throws PoolEmptyException when policy is FAIL_FAST and no idle is available. + * @throws SandboxException for lifecycle create/connect/renew errors. + */ + fun acquire( + sandboxTimeout: Duration? = null, + policy: AcquirePolicy = AcquirePolicy.DIRECT_CREATE, + ): Sandbox { + if (lifecycleState.get() != LifecycleState.RUNNING) { + val state = lifecycleState.get() + logger.info("Pool not running, acquire rejected: pool_name={} state={}", config.poolName, state) + throw PoolNotRunningException("Cannot acquire when pool state is $state") + } + beginOperation() + try { + if (lifecycleState.get() != LifecycleState.RUNNING) { + val state = lifecycleState.get() + logger.info("Pool not running after acquire started, rejected: pool_name={} state={}", config.poolName, state) + throw PoolNotRunningException("Cannot acquire when pool state is $state") + } + val poolName = config.poolName + val sandboxId = stateStore.tryTakeIdle(poolName) + var noIdleReason: String? = null // null = got a sandbox from idle; non-null = reason we have no usable idle + if (sandboxId != null) { + try { + val sandbox = + Sandbox.connector() + .sandboxId(sandboxId) + .connectionConfig(connectionConfig) + .connect() + sandboxTimeout?.let { sandbox.renew(it) } + logger.debug( + "Acquire from idle: pool_name={} sandbox_id={} policy={}", + poolName, + sandboxId, + policy, + ) + return sandbox + } catch (e: Exception) { + logger.warn( + "Idle connect failed (stale or unreachable), removed from pool and falling back: " + + "pool_name={} sandbox_id={} error={}", + poolName, + sandboxId, + e.message, + ) + stateStore.removeIdle(poolName, sandboxId) + try { + sandboxManager?.killSandbox(sandboxId) + } catch (_: Exception) { + // best-effort kill; do not replace original error + } + noIdleReason = "idle connect failed for sandbox_id=$sandboxId (stale or unreachable)" + } + } else { + noIdleReason = "idle buffer empty" + } + val reason = noIdleReason!! + if (policy == AcquirePolicy.FAIL_FAST) { + logger.debug("Acquire FAIL_FAST: pool_name={} reason={}", poolName, reason) + throw PoolEmptyException("Cannot acquire: $reason; policy is FAIL_FAST") + } + logger.debug("Acquire direct create: pool_name={} reason={} policy={}", poolName, reason, policy) + return directCreate(sandboxTimeout) + } finally { + endOperation() + } + } + + /** + * Updates the maximum idle target. In distributed mode the new value is written to the store + * so the whole cluster (including the leader) uses it; in single-node only this process sees it. + * Triggers a reconcile tick without blocking on convergence. + */ + fun resize(maxIdle: Int) { + require(maxIdle >= 0) { "maxIdle must be >= 0" } + stateStore.setMaxIdle(config.poolName, maxIdle) + currentMaxIdle = maxIdle + if (lifecycleState.get() != LifecycleState.RUNNING) return + try { + scheduler?.execute { runReconcileTick() } + } catch (_: RejectedExecutionException) { + logger.debug( + "Resize reconcile trigger skipped because scheduler is shutting down: pool_name={} state={}", + config.poolName, + lifecycleState.get(), + ) + } + } + + /** + * Takes all idle sandbox IDs from the store and terminates each sandbox (best-effort). + * Use this to release held resources, e.g. before process exit on single-node, or to reset the idle buffer. + * In distributed mode this is best-effort: concurrent putIdle on other nodes may add new idle during the loop. + * If the pool is not running, [sandboxManager] may be null and kill is skipped; the store is still drained. + * + * @return Number of idle sandboxes that were taken and (when possible) killed. + */ + fun releaseAllIdle(): Int { + val poolName = config.poolName + var count = 0 + while (true) { + val sandboxId = stateStore.tryTakeIdle(poolName) ?: break + count++ + try { + sandboxManager?.killSandbox(sandboxId) + } catch (e: Exception) { + logger.warn( + "releaseAllIdle: failed to kill sandbox (best-effort): pool_name={} sandbox_id={} error={}", + poolName, + sandboxId, + e.message, + ) + } + } + if (count > 0) { + logger.info("releaseAllIdle: released {} idle sandbox(es): pool_name={}", count, poolName) + } + return count + } + + /** + * Returns a point-in-time snapshot of pool state for observability. + */ + fun snapshot(): PoolSnapshot { + val state = + when (lifecycleState.get()) { + LifecycleState.NOT_STARTED, + LifecycleState.STOPPED, + -> PoolState.STOPPED + LifecycleState.DRAINING -> PoolState.DRAINING + else -> reconcileState.state + } + val counters = stateStore.snapshotCounters(config.poolName) + return PoolSnapshot( + state = state, + idleCount = counters.idleCount, + lastError = reconcileState.lastError, + ) + } + + /** + * Stops pool replenish workers. If [graceful] is true, transitions to DRAINING, stops reconcile worker, + * and waits until local in-flight operations complete or [PoolConfig.drainTimeout] elapses before STOPPED. + * acquire() is rejected while pool is not RUNNING. If [graceful] is false, stops immediately. + */ + @Synchronized + fun shutdown(graceful: Boolean = true) { + if (lifecycleState.get() == LifecycleState.STOPPED) return + if (!graceful) { + stopReconcile() + lifecycleState.set(LifecycleState.STOPPED) + closeProvider() + logger.info("Pool stopped (non-graceful): pool_name={} state={}", config.poolName, LifecycleState.STOPPED) + return + } + lifecycleState.set(LifecycleState.DRAINING) + stopReconcile() + try { + val drained = awaitInFlightDrain(config.drainTimeout) + if (!drained) { + logger.warn( + "Pool graceful shutdown timed out waiting in-flight operations: pool_name={} in_flight={} timeout_ms={}", + config.poolName, + inFlightOperations.get(), + config.drainTimeout.toMillis(), + ) + } + } catch (_: InterruptedException) { + Thread.currentThread().interrupt() + logger.warn("Pool graceful shutdown interrupted during drain: pool_name={}", config.poolName) + } finally { + lifecycleState.set(LifecycleState.STOPPED) + closeProvider() + logger.info("Pool stopped (graceful): pool_name={} state={}", config.poolName, LifecycleState.STOPPED) + } + } + + private fun resolveMaxIdle(): Int = stateStore.getMaxIdle(config.poolName) ?: currentMaxIdle + + private fun runReconcileTick() { + if (lifecycleState.get() != LifecycleState.RUNNING) return + val executor = warmupExecutor ?: return + val reconcileConfig = config.copy(maxIdle = resolveMaxIdle()) + PoolReconciler.runReconcileTick( + config = reconcileConfig, + stateStore = stateStore, + createOne = { createOneSandbox() }, + onOrphanedCreated = { sandboxId -> killSandboxBestEffort(sandboxId) }, + reconcileState = reconcileState, + warmupExecutor = executor, + ) + } + + /** + * Creates one sandbox via [Sandbox.builder], waits for readiness (no skipHealthCheck), + * then returns its id. Caller must put the id into the store; the created [Sandbox] + * is closed immediately so only the id is kept in the pool. + */ + private fun createOneSandbox(): String? { + beginOperation() + return try { + val sandbox = buildSandboxFromSpec() + val id = sandbox.id + sandbox.close() + id + } catch (e: Exception) { + logger.warn("Pool create sandbox failed: poolName={}", config.poolName, e) + throw e + } finally { + endOperation() + } + } + + private fun buildSandboxFromSpec(): Sandbox { + val b = + Sandbox.builder() + .imageSpec(creationSpec.imageSpec) + .entrypoint(creationSpec.entrypoint) + .resource(creationSpec.resource) + .env(creationSpec.env) + .metadata(creationSpec.metadata) + .volumes(creationSpec.volumes ?: emptyList()) + .timeout(idleTtl) + .connectionConfig(connectionConfig) + val builder = creationSpec.networkPolicy?.let { b.networkPolicy(it) } ?: b + return builder.build() + } + + private fun directCreate(sandboxTimeout: Duration?): Sandbox { + val sandbox = buildSandboxFromSpec() + sandboxTimeout?.let { sandbox.renew(it) } + return sandbox + } + + private fun killSandboxBestEffort(sandboxId: String) { + try { + sandboxManager?.killSandbox(sandboxId) + } catch (e: Exception) { + logger.warn( + "Pool orphaned sandbox cleanup failed (best-effort): pool_name={} sandbox_id={} error={}", + config.poolName, + sandboxId, + e.message, + ) + } + } + + private fun beginOperation() { + inFlightOperations.incrementAndGet() + } + + private fun endOperation() { + val remaining = inFlightOperations.decrementAndGet() + if (remaining < 0) { + inFlightOperations.set(0) + logger.warn("Pool in-flight counter underflow corrected: pool_name={}", config.poolName) + inFlightLock.lock() + try { + inFlightZero.signalAll() + } finally { + inFlightLock.unlock() + } + return + } + if (remaining == 0) { + inFlightLock.lock() + try { + inFlightZero.signalAll() + } finally { + inFlightLock.unlock() + } + } + } + + @Throws(InterruptedException::class) + private fun awaitInFlightDrain(timeout: Duration): Boolean { + val timeoutNanos = timeout.toNanos() + if (timeoutNanos <= 0) { + return inFlightOperations.get() == 0 + } + val deadline = System.nanoTime() + timeoutNanos + inFlightLock.lock() + try { + while (inFlightOperations.get() > 0) { + val remaining = deadline - System.nanoTime() + if (remaining <= 0) { + return false + } + inFlightZero.awaitNanos(remaining) + } + return true + } finally { + inFlightLock.unlock() + } + } + + private fun stopReconcile() { + reconcileTask?.cancel(false) + reconcileTask = null + scheduler?.shutdown() + try { + scheduler?.awaitTermination(5, TimeUnit.SECONDS) + } catch (_: InterruptedException) { + Thread.currentThread().interrupt() + } + scheduler = null + warmupExecutor?.shutdown() + try { + warmupExecutor?.awaitTermination(5, TimeUnit.SECONDS) + } catch (_: InterruptedException) { + Thread.currentThread().interrupt() + } + warmupExecutor = null + } + + private fun closeProvider() { + try { + sandboxManager?.close() + } catch (e: Exception) { + logger.warn("Error closing pool SandboxManager", e) + } + sandboxManager = null + } + + @Suppress("ktlint:standard:property-naming") + private enum class LifecycleState { + NOT_STARTED, + STARTING, + RUNNING, + DRAINING, + STOPPED, + } + + companion object { + @JvmStatic + fun builder(): Builder = Builder() + } + + class Builder internal constructor() { + private var config: PoolConfig? = null + + fun config(config: PoolConfig): Builder { + this.config = config + return this + } + + fun poolName(poolName: String): Builder { + configBuilder.poolName(poolName) + return this + } + + fun ownerId(ownerId: String): Builder { + configBuilder.ownerId(ownerId) + return this + } + + fun maxIdle(maxIdle: Int): Builder { + configBuilder.maxIdle(maxIdle) + return this + } + + fun stateStore(stateStore: PoolStateStore): Builder { + configBuilder.stateStore(stateStore) + return this + } + + fun connectionConfig(connectionConfig: ConnectionConfig): Builder { + configBuilder.connectionConfig(connectionConfig) + return this + } + + fun creationSpec(creationSpec: PoolCreationSpec): Builder { + configBuilder.creationSpec(creationSpec) + return this + } + + fun warmupConcurrency(warmupConcurrency: Int): Builder { + configBuilder.warmupConcurrency(warmupConcurrency) + return this + } + + fun primaryLockTtl(primaryLockTtl: Duration): Builder { + configBuilder.primaryLockTtl(primaryLockTtl) + return this + } + + fun reconcileInterval(reconcileInterval: Duration): Builder { + configBuilder.reconcileInterval(reconcileInterval) + return this + } + + fun degradedThreshold(degradedThreshold: Int): Builder { + configBuilder.degradedThreshold(degradedThreshold) + return this + } + + fun drainTimeout(drainTimeout: Duration): Builder { + configBuilder.drainTimeout(drainTimeout) + return this + } + + private val configBuilder = PoolConfig.builder() + + fun build(): SandboxPool { + val cfg = config ?: configBuilder.build() + return SandboxPool(cfg) + } + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt new file mode 100644 index 00000000..a2281311 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt @@ -0,0 +1,183 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.infrastructure.pool + +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore +import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.Duration + +/** + * Contract and behavior tests for [InMemoryPoolStateStore]. + */ +class InMemoryPoolStateStoreTest { + private lateinit var store: PoolStateStore + private val poolName = "test-pool" + + @BeforeEach + fun setUp() { + store = InMemoryPoolStateStore() + } + + @Test + fun `tryTakeIdle returns null when empty`() { + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `putIdle and tryTakeIdle round-trip`() { + store.putIdle(poolName, "id-1") + assertEquals("id-1", store.tryTakeIdle(poolName)) + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `tryTakeIdle prefers FIFO`() { + store.putIdle(poolName, "id-1") + store.putIdle(poolName, "id-2") + store.putIdle(poolName, "id-3") + assertEquals("id-1", store.tryTakeIdle(poolName)) + assertEquals("id-2", store.tryTakeIdle(poolName)) + assertEquals("id-3", store.tryTakeIdle(poolName)) + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `removeIdle removes entry`() { + store.putIdle(poolName, "id-1") + store.removeIdle(poolName, "id-1") + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `removeIdle is idempotent`() { + store.putIdle(poolName, "id-1") + store.removeIdle(poolName, "id-1") + store.removeIdle(poolName, "id-1") + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `putIdle is idempotent - single copy`() { + store.putIdle(poolName, "id-1") + store.putIdle(poolName, "id-1") + assertEquals("id-1", store.tryTakeIdle(poolName)) + assertNull(store.tryTakeIdle(poolName)) + } + + @Test + fun `snapshotCounters returns idle count`() { + assertEquals(0, store.snapshotCounters(poolName).idleCount) + store.putIdle(poolName, "id-1") + store.putIdle(poolName, "id-2") + assertEquals(2, store.snapshotCounters(poolName).idleCount) + store.tryTakeIdle(poolName) + assertEquals(1, store.snapshotCounters(poolName).idleCount) + } + + @Test + fun `tryAcquirePrimaryLock acquires when free`() { + val ok = store.tryAcquirePrimaryLock(poolName, "owner-1", Duration.ofSeconds(60)) + assertTrue(ok) + } + + @Test + fun `tryAcquirePrimaryLock always grants in single-node mode`() { + // InMemory is single-node: no real lock; every caller is treated as leader. + store.tryAcquirePrimaryLock(poolName, "owner-1", Duration.ofSeconds(60)) + val ok = store.tryAcquirePrimaryLock(poolName, "owner-2", Duration.ofSeconds(60)) + assertTrue(ok) + } + + @Test + fun `renewPrimaryLock succeeds for owner`() { + store.tryAcquirePrimaryLock(poolName, "owner-1", Duration.ofSeconds(60)) + assertTrue(store.renewPrimaryLock(poolName, "owner-1", Duration.ofSeconds(60))) + } + + @Test + fun `renewPrimaryLock always succeeds in single-node mode`() { + // InMemory is single-node: no real lock; renew always succeeds. + store.tryAcquirePrimaryLock(poolName, "owner-1", Duration.ofSeconds(60)) + assertTrue(store.renewPrimaryLock(poolName, "owner-2", Duration.ofSeconds(60))) + } + + @Test + fun `releasePrimaryLock allows new owner`() { + store.tryAcquirePrimaryLock(poolName, "owner-1", Duration.ofSeconds(60)) + store.releasePrimaryLock(poolName, "owner-1") + assertTrue(store.tryAcquirePrimaryLock(poolName, "owner-2", Duration.ofSeconds(60))) + } + + @Test + fun `pool isolation - different pools do not share idle`() { + store.putIdle("pool-a", "id-a") + store.putIdle("pool-b", "id-b") + assertEquals("id-a", store.tryTakeIdle("pool-a")) + assertEquals("id-b", store.tryTakeIdle("pool-b")) + assertNull(store.tryTakeIdle("pool-a")) + } + + @Test + fun `pool isolation - different pools do not share lock`() { + store.tryAcquirePrimaryLock("pool-a", "owner-a", Duration.ofSeconds(60)) + assertTrue(store.tryAcquirePrimaryLock("pool-b", "owner-b", Duration.ofSeconds(60))) + } + + @Test + fun `reapExpiredIdle removes expired entries`() { + store.putIdle(poolName, "id-1") + store.reapExpiredIdle(poolName, java.time.Instant.now().plus(java.time.Duration.ofHours(25))) + assertEquals(StoreCounters(0), store.snapshotCounters(poolName)) + } + + @Test + fun `getMaxIdle returns null and setMaxIdle is no-op in single-node`() { + assertNull(store.getMaxIdle(poolName)) + store.setMaxIdle(poolName, 10) + assertNull(store.getMaxIdle(poolName)) + } + + @Test + fun `snapshotCounters compacts queue tombstones`() { + store.putIdle(poolName, "id-1") + store.putIdle(poolName, "id-2") + store.removeIdle(poolName, "id-1") + store.snapshotCounters(poolName) + + val queueSize = extractQueueSize(store as InMemoryPoolStateStore, poolName) + assertEquals(1, queueSize) + } + + private fun extractQueueSize( + inMemoryStore: InMemoryPoolStateStore, + pool: String, + ): Int { + val poolsField = InMemoryPoolStateStore::class.java.getDeclaredField("pools") + poolsField.isAccessible = true + val pools = poolsField.get(inMemoryStore) as java.util.concurrent.ConcurrentHashMap<*, *> + val state = pools[pool] ?: return 0 + val queueField = state.javaClass.getDeclaredField("queue") + queueField.isAccessible = true + val queue = queueField.get(state) as java.util.concurrent.ConcurrentLinkedQueue<*> + return queue.size + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconcilerStateTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconcilerStateTest.kt new file mode 100644 index 00000000..88990ee4 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconcilerStateTest.kt @@ -0,0 +1,460 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.infrastructure.pool + +import com.alibaba.opensandbox.sandbox.config.ConnectionConfig +import com.alibaba.opensandbox.sandbox.domain.pool.PoolConfig +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec +import com.alibaba.opensandbox.sandbox.domain.pool.PoolState +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore +import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Test +import java.time.Duration +import java.time.Instant +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger + +class PoolReconcilerStateTest { + @Test + fun `recordFailure transitions to DEGRADED when failure count reaches threshold`() { + val state = ReconcileState(degradedThreshold = 3, backoffBase = Duration.ofMillis(10), backoffMax = Duration.ofSeconds(1)) + state.recordFailure("boom-1") + state.recordFailure("boom-2") + assertEquals(PoolState.HEALTHY, state.state) + assertFalse(state.isBackoffActive()) + + state.recordFailure("boom-3") + assertEquals(PoolState.DEGRADED, state.state) + assertEquals(3, state.failureCount) + } + + @Test + fun `reconcile create exception increments failure count once per task`() { + val stateStore = InMemoryPoolStateStore() + val config = + PoolConfig.builder() + .poolName("pool-reconcile-test") + .ownerId("owner-1") + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(stateStore) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .build() + val state = ReconcileState(degradedThreshold = 10) + val warmupExecutor = Executors.newFixedThreadPool(1) + + try { + PoolReconciler.runReconcileTick( + config = config, + stateStore = stateStore, + createOne = { throw RuntimeException("boom") }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + } finally { + warmupExecutor.shutdownNow() + } + + assertEquals(1, state.failureCount) + } + + @Test + fun `reconcile stops putIdle and cleans orphaned sandboxes after lock renew failure`() { + val stateStore = RenewFailAfterSecondPutStore() + val config = + PoolConfig.builder() + .poolName("pool-lock-window-test") + .ownerId("owner-1") + .maxIdle(2) + .warmupConcurrency(2) + .stateStore(stateStore) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .build() + val state = ReconcileState(degradedThreshold = 10) + val warmupExecutor = Executors.newFixedThreadPool(2) + val idGen = AtomicInteger(0) + val orphaned = mutableListOf() + + try { + PoolReconciler.runReconcileTick( + config = config, + stateStore = stateStore, + createOne = { "id-${idGen.incrementAndGet()}" }, + onOrphanedCreated = { orphaned += it }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + } finally { + warmupExecutor.shutdownNow() + } + + assertEquals(listOf("id-1"), stateStore.putIdleIds) + assertEquals(listOf("id-2"), orphaned) + } + + @Test + fun `reconcile skips create when current node is not primary`() { + val stateStore = AlwaysSecondaryStore() + val config = + PoolConfig.builder() + .poolName("pool-not-primary") + .ownerId("owner-2") + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(stateStore) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .build() + val state = ReconcileState(degradedThreshold = 10) + val warmupExecutor = Executors.newFixedThreadPool(1) + val createCalls = AtomicInteger(0) + + try { + PoolReconciler.runReconcileTick( + config = config, + stateStore = stateStore, + createOne = { + createCalls.incrementAndGet() + "id-1" + }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + } finally { + warmupExecutor.shutdownNow() + } + + assertEquals(0, createCalls.get()) + assertEquals(emptyList(), stateStore.putIdleIds) + } + + @Test + fun `only primary owner can reconcile create for same pool`() { + val stateStore = OwnerLockingStore() + val primaryConfig = buildConfig(ownerId = "owner-primary", maxIdle = 1, stateStore = stateStore, poolName = "pool-owner-lock") + val secondaryConfig = buildConfig(ownerId = "owner-secondary", maxIdle = 1, stateStore = stateStore, poolName = "pool-owner-lock") + val state = ReconcileState(degradedThreshold = 10) + val warmupExecutor = Executors.newFixedThreadPool(1) + val secondaryCreateCalls = AtomicInteger(0) + + try { + PoolReconciler.runReconcileTick( + config = primaryConfig, + stateStore = stateStore, + createOne = { "id-primary-1" }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + PoolReconciler.runReconcileTick( + config = secondaryConfig, + stateStore = stateStore, + createOne = { + secondaryCreateCalls.incrementAndGet() + "id-secondary-1" + }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + } finally { + warmupExecutor.shutdownNow() + } + + assertEquals(0, secondaryCreateCalls.get()) + assertEquals(listOf("id-primary-1"), stateStore.putIdleIds) + } + + @Test + fun `reconcile does not create when initial renew fails`() { + val stateStore = RenewFailsOnFirstCallStore() + val config = buildConfig(ownerId = "owner-1", maxIdle = 1, stateStore = stateStore, poolName = "pool-renew-first-fail") + val state = ReconcileState(degradedThreshold = 10) + val warmupExecutor = Executors.newFixedThreadPool(1) + val createCalls = AtomicInteger(0) + + try { + PoolReconciler.runReconcileTick( + config = config, + stateStore = stateStore, + createOne = { + createCalls.incrementAndGet() + "id-1" + }, + reconcileState = state, + warmupExecutor = warmupExecutor, + ) + } finally { + warmupExecutor.shutdownNow() + } + + assertEquals(0, createCalls.get()) + assertEquals(emptyList(), stateStore.putIdleIds) + } + + private fun buildConfig( + ownerId: String, + maxIdle: Int, + stateStore: PoolStateStore, + poolName: String, + ): PoolConfig { + return PoolConfig.builder() + .poolName(poolName) + .ownerId(ownerId) + .maxIdle(maxIdle) + .warmupConcurrency(1) + .stateStore(stateStore) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .build() + } + + private class RenewFailAfterSecondPutStore : PoolStateStore { + private val renewCalls = AtomicInteger(0) + val putIdleIds = mutableListOf() + + override fun tryTakeIdle(poolName: String): String? = null + + override fun putIdle( + poolName: String, + sandboxId: String, + ) { + putIdleIds += sandboxId + } + + override fun removeIdle( + poolName: String, + sandboxId: String, + ) { + } + + override fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean = true + + override fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean { + val call = renewCalls.incrementAndGet() + return call < 3 + } + + override fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) { + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + ) { + } + + override fun snapshotCounters(poolName: String): StoreCounters = StoreCounters(idleCount = 0) + + override fun getMaxIdle(poolName: String): Int? = null + + override fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) { + } + } + + private class AlwaysSecondaryStore : PoolStateStore { + val putIdleIds = mutableListOf() + + override fun tryTakeIdle(poolName: String): String? = null + + override fun putIdle( + poolName: String, + sandboxId: String, + ) { + putIdleIds += sandboxId + } + + override fun removeIdle( + poolName: String, + sandboxId: String, + ) { + } + + override fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean = false + + override fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean = false + + override fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) { + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + ) { + } + + override fun snapshotCounters(poolName: String): StoreCounters = StoreCounters(idleCount = 0) + + override fun getMaxIdle(poolName: String): Int? = null + + override fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) { + } + } + + private class OwnerLockingStore : PoolStateStore { + @Volatile + private var lockOwner: String? = null + val putIdleIds = mutableListOf() + + override fun tryTakeIdle(poolName: String): String? = null + + override fun putIdle( + poolName: String, + sandboxId: String, + ) { + putIdleIds += sandboxId + } + + override fun removeIdle( + poolName: String, + sandboxId: String, + ) { + } + + override fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean { + val currentOwner = lockOwner + return if (currentOwner == null || currentOwner == ownerId) { + lockOwner = ownerId + true + } else { + false + } + } + + override fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean = lockOwner == ownerId + + override fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) { + if (lockOwner == ownerId) { + lockOwner = null + } + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + ) { + } + + override fun snapshotCounters(poolName: String): StoreCounters = StoreCounters(idleCount = putIdleIds.size) + + override fun getMaxIdle(poolName: String): Int? = null + + override fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) { + } + } + + private class RenewFailsOnFirstCallStore : PoolStateStore { + private val renewCalls = AtomicInteger(0) + val putIdleIds = mutableListOf() + + override fun tryTakeIdle(poolName: String): String? = null + + override fun putIdle( + poolName: String, + sandboxId: String, + ) { + putIdleIds += sandboxId + } + + override fun removeIdle( + poolName: String, + sandboxId: String, + ) { + } + + override fun tryAcquirePrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean = true + + override fun renewPrimaryLock( + poolName: String, + ownerId: String, + ttl: Duration, + ): Boolean { + val call = renewCalls.incrementAndGet() + return call > 1 + } + + override fun releasePrimaryLock( + poolName: String, + ownerId: String, + ) { + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + ) { + } + + override fun snapshotCounters(poolName: String): StoreCounters = StoreCounters(idleCount = 0) + + override fun getMaxIdle(poolName: String): Int? = null + + override fun setMaxIdle( + poolName: String, + maxIdle: Int, + ) { + } + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt new file mode 100644 index 00000000..9b74c2a6 --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt @@ -0,0 +1,140 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.pool + +import com.alibaba.opensandbox.sandbox.config.ConnectionConfig +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolEmptyException +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolNotRunningException +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec +import com.alibaba.opensandbox.sandbox.domain.pool.PoolState +import com.alibaba.opensandbox.sandbox.infrastructure.pool.InMemoryPoolStateStore +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Test +import java.time.Duration + +class SandboxPoolTest { + @Test + fun `snapshot before start returns STOPPED and zero idle`() { + val pool = buildPool() + val snap = pool.snapshot() + assertEquals(PoolState.STOPPED, snap.state) + assertEquals(0, snap.idleCount) + } + + @Test + fun `start then snapshot returns RUNNING`() { + val pool = buildPool() + pool.start() + try { + val snap = pool.snapshot() + assertEquals(PoolState.HEALTHY, snap.state) + } finally { + pool.shutdown(graceful = false) + } + } + + @Test + fun `resize updates maxIdle`() { + val pool = buildPool() + pool.start() + try { + pool.resize(10) + val snap = pool.snapshot() + assertEquals(PoolState.HEALTHY, snap.state) + } finally { + pool.shutdown(graceful = false) + } + } + + @Test + fun `shutdown graceful then snapshot returns STOPPED`() { + val pool = buildPool() + pool.start() + pool.shutdown(graceful = true) + val snap = pool.snapshot() + assertEquals(PoolState.STOPPED, snap.state) + } + + @Test + fun `shutdown non-graceful then snapshot returns STOPPED`() { + val pool = buildPool() + pool.start() + pool.shutdown(graceful = false) + val snap = pool.snapshot() + assertEquals(PoolState.STOPPED, snap.state) + } + + @Test + fun `acquire with FAIL_FAST and empty idle throws PoolEmptyException`() { + val pool = buildPool() + pool.start() + try { + assertThrows(PoolEmptyException::class.java) { + pool.acquire(policy = AcquirePolicy.FAIL_FAST) + } + } finally { + pool.shutdown(graceful = false) + } + } + + @Test + fun `acquire when pool is stopped throws PoolNotRunningException`() { + val pool = buildPool() + assertThrows(PoolNotRunningException::class.java) { + pool.acquire(policy = AcquirePolicy.DIRECT_CREATE) + } + } + + @Test + fun `releaseAllIdle drains store and returns count`() { + val store = InMemoryPoolStateStore() + val pool = + SandboxPool.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(store) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .drainTimeout(Duration.ofMillis(50)) + .reconcileInterval(Duration.ofSeconds(30)) + .build() + store.putIdle("test-pool", "id-1") + store.putIdle("test-pool", "id-2") + assertEquals(2, store.snapshotCounters("test-pool").idleCount) + val released = pool.releaseAllIdle() + assertEquals(2, released) + assertEquals(0, store.snapshotCounters("test-pool").idleCount) + } + + private fun buildPool(): SandboxPool { + val config = ConnectionConfig.builder().build() + val spec = PoolCreationSpec.builder().image("ubuntu:22.04").build() + return SandboxPool.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(config) + .creationSpec(spec) + .drainTimeout(Duration.ofMillis(50)) + .reconcileInterval(Duration.ofSeconds(30)) + .build() + } +} diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/CodeInterpreterE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/CodeInterpreterE2ETest.java index 1225608f..0ad86bf9 100644 --- a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/CodeInterpreterE2ETest.java +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/CodeInterpreterE2ETest.java @@ -22,9 +22,9 @@ import com.alibaba.opensandbox.codeinterpreter.domain.models.execd.executions.CodeContext; import com.alibaba.opensandbox.codeinterpreter.domain.models.execd.executions.RunCodeRequest; import com.alibaba.opensandbox.codeinterpreter.domain.models.execd.executions.SupportedLanguage; -import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.*; import com.alibaba.opensandbox.sandbox.Sandbox; import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.*; +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.*; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -591,16 +591,15 @@ void testTypeScriptCodeExecution() { } /** - * Run a code request with a per-execution timeout so that a single hanging - * SSE stream cannot block the entire test for the full JUnit timeout. + * Run a code request with a per-execution timeout so that a single hanging SSE stream cannot + * block the entire test for the full JUnit timeout. */ private Execution runWithTimeout(RunCodeRequest request, Duration timeout) { try { return CompletableFuture.supplyAsync(() -> codeInterpreter.codes().run(request)) .get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - throw new AssertionError( - "Code execution did not complete within " + timeout, e); + throw new AssertionError("Code execution did not complete within " + timeout, e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { @@ -943,8 +942,11 @@ void testCodeExecutionInterrupt() throws InterruptedException, ExecutionExceptio // Verify the interrupt was effective: execution finished much faster // than the full 20 s run. Terminal events (complete/error) may or may // not arrive depending on how quickly the server closed the stream. - assertTrue(elapsed < 90_000, - "Execution should have finished promptly after interrupt (elapsed=" + elapsed + "ms)"); + assertTrue( + elapsed < 90_000, + "Execution should have finished promptly after interrupt (elapsed=" + + elapsed + + "ms)"); // Test 2: Java long-running execution with interrupt logger.info("Testing Java interrupt functionality"); diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java index b2af7c42..4e53440d 100644 --- a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java @@ -315,8 +315,7 @@ void testSandboxCreateWithHostVolumeMount() { assertNull(readMarker.getError(), "Failed to read marker file"); assertEquals(1, readMarker.getLogs().getStdout().size()); assertEquals( - "opensandbox-e2e-marker", - readMarker.getLogs().getStdout().get(0).getText()); + "opensandbox-e2e-marker", readMarker.getLogs().getStdout().get(0).getText()); // Step 2: Write a file from inside the sandbox to the mounted path Execution writeResult = @@ -344,8 +343,7 @@ void testSandboxCreateWithHostVolumeMount() { .build()); assertNull(readBack.getError()); assertEquals(1, readBack.getLogs().getStdout().size()); - assertEquals( - "written-from-sandbox", readBack.getLogs().getStdout().get(0).getText()); + assertEquals("written-from-sandbox", readBack.getLogs().getStdout().get(0).getText()); // Step 4: Verify the mount path is a proper directory Execution dirCheck = @@ -407,8 +405,7 @@ void testSandboxCreateWithHostVolumeMountReadOnly() { assertNull(readMarker.getError(), "Failed to read marker file on read-only mount"); assertEquals(1, readMarker.getLogs().getStdout().size()); assertEquals( - "opensandbox-e2e-marker", - readMarker.getLogs().getStdout().get(0).getText()); + "opensandbox-e2e-marker", readMarker.getLogs().getStdout().get(0).getText()); // Step 2: Verify writing is denied on read-only mount Execution writeResult = @@ -421,8 +418,7 @@ void testSandboxCreateWithHostVolumeMountReadOnly() { + containerMountPath + "/should-fail.txt") .build()); - assertNotNull( - writeResult.getError(), "Write should fail on read-only mount"); + assertNotNull(writeResult.getError(), "Write should fail on read-only mount"); } finally { try { roSandbox.kill(); @@ -470,9 +466,7 @@ void testSandboxCreateWithPvcVolumeMount() { .build()); assertNull(readMarker.getError(), "Failed to read marker file from PVC volume"); assertEquals(1, readMarker.getLogs().getStdout().size()); - assertEquals( - "pvc-marker-data", - readMarker.getLogs().getStdout().get(0).getText()); + assertEquals("pvc-marker-data", readMarker.getLogs().getStdout().get(0).getText()); // Step 2: Write a file from inside the sandbox to the named volume Execution writeResult = @@ -494,14 +488,11 @@ void testSandboxCreateWithPvcVolumeMount() { .run( RunCommandRequest.builder() .command( - "cat " - + containerMountPath - + "/pvc-output.txt") + "cat " + containerMountPath + "/pvc-output.txt") .build()); assertNull(readBack.getError()); assertEquals(1, readBack.getLogs().getStdout().size()); - assertEquals( - "written-to-pvc", readBack.getLogs().getStdout().get(0).getText()); + assertEquals("written-to-pvc", readBack.getLogs().getStdout().get(0).getText()); // Step 4: Verify the mount path is a proper directory Execution dirCheck = @@ -562,9 +553,7 @@ void testSandboxCreateWithPvcVolumeMountReadOnly() { .build()); assertNull(readMarker.getError(), "Failed to read marker file on read-only PVC mount"); assertEquals(1, readMarker.getLogs().getStdout().size()); - assertEquals( - "pvc-marker-data", - readMarker.getLogs().getStdout().get(0).getText()); + assertEquals("pvc-marker-data", readMarker.getLogs().getStdout().get(0).getText()); // Step 2: Verify writing is denied on read-only mount Execution writeResult = @@ -577,8 +566,7 @@ void testSandboxCreateWithPvcVolumeMountReadOnly() { + containerMountPath + "/should-fail.txt") .build()); - assertNotNull( - writeResult.getError(), "Write should fail on read-only PVC mount"); + assertNotNull(writeResult.getError(), "Write should fail on read-only PVC mount"); } finally { try { roSandbox.kill(); @@ -627,9 +615,7 @@ void testSandboxCreateWithPvcVolumeMountSubPath() { .build()); assertNull(readMarker.getError(), "Failed to read subpath marker file"); assertEquals(1, readMarker.getLogs().getStdout().size()); - assertEquals( - "pvc-subpath-marker", - readMarker.getLogs().getStdout().get(0).getText()); + assertEquals("pvc-subpath-marker", readMarker.getLogs().getStdout().get(0).getText()); // Step 2: Verify only subPath contents are visible (not the full volume) Execution lsResult = @@ -665,15 +651,11 @@ void testSandboxCreateWithPvcVolumeMountSubPath() { .commands() .run( RunCommandRequest.builder() - .command( - "cat " - + containerMountPath - + "/output.txt") + .command("cat " + containerMountPath + "/output.txt") .build()); assertNull(readBack.getError()); assertEquals(1, readBack.getLogs().getStdout().size()); - assertEquals( - "subpath-write-test", readBack.getLogs().getStdout().get(0).getText()); + assertEquals("subpath-write-test", readBack.getLogs().getStdout().get(0).getText()); } finally { try { subpathSandbox.kill(); diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java new file mode 100644 index 00000000..9586cc60 --- /dev/null +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java @@ -0,0 +1,662 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.e2e; + +import static org.junit.jupiter.api.Assertions.*; + +import com.alibaba.opensandbox.sandbox.Sandbox; +import com.alibaba.opensandbox.sandbox.SandboxManager; +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolEmptyException; +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.Execution; +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunCommandRequest; +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.PagedSandboxInfos; +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxFilter; +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore; +import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters; +import com.alibaba.opensandbox.sandbox.pool.SandboxPool; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BooleanSupplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Tag("e2e") +@DisplayName("SandboxPool E2E Tests (Pseudo-Distributed)") +public class SandboxPoolPseudoDistributedE2ETest extends BaseE2ETest { + private static final Duration RECONCILE_INTERVAL = Duration.ofSeconds(1); + private static final Duration PRIMARY_LOCK_TTL = Duration.ofSeconds(2); + private static final Duration DRAIN_TIMEOUT = Duration.ofMillis(200); + private static final Duration AWAIT_TIMEOUT = Duration.ofMinutes(2); + + private final List pools = new ArrayList<>(); + private final List borrowed = new CopyOnWriteArrayList<>(); + + private SandboxManager sandboxManager; + private String tag; + + @AfterEach + void teardown() { + for (Sandbox sandbox : borrowed) { + killAndCloseQuietly(sandbox); + } + borrowed.clear(); + + for (SandboxPool pool : pools) { + try { + pool.resize(0); + pool.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + pool.shutdown(false); + } catch (Exception ignored) { + } + } + pools.clear(); + + if (sandboxManager != null && tag != null) { + cleanupTaggedSandboxes(tag); + } + if (sandboxManager != null) { + try { + sandboxManager.close(); + } catch (Exception ignored) { + } + } + } + + @Test + @DisplayName("shared distributed store supports cross-node acquire and resize propagation") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testCrossNodeAcquireAndResizePropagation() throws Exception { + tag = "e2e-pool-dist-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-dist-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 2); + SandboxPool poolB = createPool(poolName, ownerB, store, 2); + pools.add(poolA); + pools.add(poolB); + + poolA.start(); + poolB.start(); + + eventually( + "distributed pool warmed idle", + AWAIT_TIMEOUT, + Duration.ofSeconds(2), + () -> poolA.snapshot().getIdleCount() >= 1); + + Sandbox sandbox = poolB.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST); + borrowed.add(sandbox); + assertTrue(sandbox.isHealthy(), "cross-node acquire should return healthy sandbox"); + Execution result = + sandbox.commands().run(RunCommandRequest.builder().command("echo dist-acquire-ok").build()); + assertNotNull(result); + assertNull(result.getError()); + + // Resize from one node should propagate through shared store and be honored by leader reconcile. + poolB.resize(0); + int released = poolA.releaseAllIdle(); + assertTrue(released >= 0, "releaseAllIdle should be non-negative"); + eventually( + "idle drained after resize to zero", + Duration.ofSeconds(30), + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() == 0); + Thread.sleep(RECONCILE_INTERVAL.multipliedBy(3).toMillis()); + assertTrue( + poolA.snapshot().getIdleCount() <= 1, + "idle should stay bounded after resize(0) across distributed nodes"); + poolA.releaseAllIdle(); + poolB.releaseAllIdle(); + eventually( + "idle converges back to zero after best-effort drain", + Duration.ofSeconds(20), + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() == 0); + + assertThrows( + PoolEmptyException.class, + () -> poolA.acquire(Duration.ofMinutes(2), AcquirePolicy.FAIL_FAST)); + } + + @Test + @DisplayName("primary lock failover occurs after leader shutdown") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testPrimaryFailoverAfterLeaderShutdown() throws Exception { + tag = "e2e-pool-failover-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-failover-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 1); + SandboxPool poolB = createPool(poolName, ownerB, store, 1); + pools.add(poolA); + pools.add(poolB); + + poolA.start(); + poolB.start(); + + eventually( + "one owner becomes primary", + Duration.ofSeconds(30), + Duration.ofMillis(500), + () -> store.currentOwner(poolName) != null); + String firstOwner = store.currentOwner(poolName); + assertNotNull(firstOwner, "primary owner should be established"); + + SandboxPool leader = firstOwner.equals(ownerA) ? poolA : poolB; + String expectedNextOwner = firstOwner.equals(ownerA) ? ownerB : ownerA; + leader.shutdown(false); + + eventually( + "primary owner fails over after ttl", + Duration.ofSeconds(30), + Duration.ofMillis(500), + () -> expectedNextOwner.equals(store.currentOwner(poolName))); + } + + @Test + @DisplayName("only one owner writes idle in steady-state (dual-primary guard)") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testOnlyOneOwnerWritesIdleInSteadyState() throws Exception { + tag = "e2e-pool-single-writer-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-single-writer-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 1); + SandboxPool poolB = createPool(poolName, ownerB, store, 1); + pools.add(poolA); + pools.add(poolB); + poolA.start(); + poolB.start(); + + eventually( + "pool warms and gets a primary owner", + AWAIT_TIMEOUT, + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() >= 1 && store.currentOwner(poolName) != null); + Thread.sleep(Duration.ofSeconds(3).toMillis()); + + Map putCounts = store.putCountsByOwner(poolName); + assertEquals(1, putCounts.size(), "idle writes in steady-state should come from one owner only"); + assertTrue( + putCounts.containsKey(store.currentOwner(poolName)), + "steady-state writer should match current primary owner"); + } + + @Test + @DisplayName("renew failure window drops extra create and orphan cleanup keeps remote count bounded") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testRenewFailureWindowAndOrphanCleanupBoundedResources() throws Exception { + tag = "e2e-pool-renew-window-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-renew-window-" + tag; + String ownerA = "owner-a-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + // Once one idle is put by ownerA, all subsequent renews for ownerA fail. + store.setFailRenewWhenPutCountAtLeast(poolName, ownerA, 1); + + SandboxPool poolA = createPool(poolName, ownerA, store, 2); + pools.add(poolA); + poolA.start(); + + eventually( + "renew failure window leaves only one idle", + Duration.ofSeconds(45), + Duration.ofMillis(500), + () -> poolA.snapshot().getIdleCount() == 1); + Thread.sleep(Duration.ofSeconds(3).toMillis()); + assertEquals(1, poolA.snapshot().getIdleCount(), "idle should remain bounded at 1"); + + eventually( + "tagged remote sandbox count stays bounded after orphan cleanup", + Duration.ofSeconds(45), + Duration.ofSeconds(1), + () -> countTaggedSandboxes(tag) <= 2); + } + + @Test + @DisplayName("concurrent maxIdle jitter converges without runaway over-allocation") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testMaxIdleJitterConvergesWithoutRunaway() throws Exception { + tag = "e2e-pool-jitter-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-jitter-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 1); + SandboxPool poolB = createPool(poolName, ownerB, store, 1); + pools.add(poolA); + pools.add(poolB); + poolA.start(); + poolB.start(); + + eventually( + "initial warmup completes", + AWAIT_TIMEOUT, + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() >= 1); + + for (int i = 0; i < 6; i++) { + poolA.resize(i % 3); + poolB.resize((i + 1) % 3); + Thread.sleep(200); + } + poolA.resize(1); + poolB.resize(1); + + eventually( + "idle remains bounded after maxIdle jitter", + Duration.ofSeconds(45), + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() <= 2); + assertTrue( + countTaggedSandboxes(tag) <= 3, + "tagged sandbox count should stay bounded under jitter"); + } + + @Test + @DisplayName("non-primary acquire remains available while leadership changes") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testNonPrimaryAcquireDuringLeadershipChanges() throws Exception { + tag = "e2e-pool-follower-acquire-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-follower-acquire-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 1); + SandboxPool poolB = createPool(poolName, ownerB, store, 1); + pools.add(poolA); + pools.add(poolB); + poolA.start(); + poolB.start(); + + eventually( + "leader is elected", + Duration.ofSeconds(30), + Duration.ofMillis(500), + () -> store.currentOwner(poolName) != null); + + for (int round = 0; round < 2; round++) { + String currentOwner = store.currentOwner(poolName); + assertNotNull(currentOwner, "leader should exist before follower acquire"); + + SandboxPool leader = currentOwner.equals(ownerA) ? poolA : poolB; + SandboxPool follower = currentOwner.equals(ownerA) ? poolB : poolA; + String expectedNextOwner = currentOwner.equals(ownerA) ? ownerB : ownerA; + + Sandbox sandbox = + follower.acquire(Duration.ofMinutes(3), AcquirePolicy.DIRECT_CREATE); + borrowed.add(sandbox); + Execution execution = + sandbox.commands().run(RunCommandRequest.builder().command("echo follower-acquire-ok").build()); + assertNotNull(execution); + assertNull(execution.getError()); + + leader.shutdown(false); + eventually( + "leadership transfers to follower", + Duration.ofSeconds(30), + Duration.ofMillis(500), + () -> expectedNextOwner.equals(store.currentOwner(poolName))); + leader.start(); + } + } + + @Test + @DisplayName("node restart re-joins cluster without idle pollution") + @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) + void testNodeRestartRejoinsWithoutIdlePollution() throws Exception { + tag = "e2e-pool-restart-" + UUID.randomUUID().toString().substring(0, 8); + String poolName = "pool-restart-" + tag; + String ownerA = "owner-a-" + tag; + String ownerB = "owner-b-" + tag; + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PseudoDistributedPoolStateStore store = new PseudoDistributedPoolStateStore(); + SandboxPool poolA = createPool(poolName, ownerA, store, 1); + SandboxPool poolB = createPool(poolName, ownerB, store, 1); + pools.add(poolA); + pools.add(poolB); + poolA.start(); + poolB.start(); + + eventually( + "pool warmed before restart", + AWAIT_TIMEOUT, + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() >= 1); + + poolA.shutdown(false); + poolA.start(); + + eventually( + "restarted node reports healthy", + Duration.ofSeconds(45), + Duration.ofSeconds(1), + () -> poolA.snapshot().getState().name().equals("HEALTHY")); + Sandbox sandbox = poolA.acquire(Duration.ofMinutes(3), AcquirePolicy.DIRECT_CREATE); + borrowed.add(sandbox); + assertTrue(sandbox.isHealthy(), "restarted node should still serve acquire"); + + eventually( + "idle count stays bounded after restart", + Duration.ofSeconds(45), + Duration.ofSeconds(1), + () -> poolA.snapshot().getIdleCount() <= 1); + assertTrue( + countTaggedSandboxes(tag) <= 3, + "restart should not cause runaway idle pollution"); + } + + private SandboxPool createPool( + String poolName, String ownerId, PseudoDistributedPoolStateStore store, int maxIdle) { + PoolCreationSpec creationSpec = + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata(Map.of("tag", tag, "suite", "sandbox-pool-pseudo-dist-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build(); + return SandboxPool.builder() + .poolName(poolName) + .ownerId(ownerId) + .maxIdle(maxIdle) + .warmupConcurrency(1) + .stateStore(store) + .connectionConfig(sharedConnectionConfig) + .creationSpec(creationSpec) + .reconcileInterval(RECONCILE_INTERVAL) + .primaryLockTtl(PRIMARY_LOCK_TTL) + .drainTimeout(DRAIN_TIMEOUT) + .build(); + } + + private void cleanupTaggedSandboxes(String cleanupTag) { + try { + PagedSandboxInfos infos = + sandboxManager.listSandboxInfos( + SandboxFilter.builder() + .metadata(Map.of("tag", cleanupTag)) + .pageSize(50) + .build()); + infos.getSandboxInfos() + .forEach( + info -> { + try { + sandboxManager.killSandbox(info.getId()); + } catch (Exception ignored) { + } + }); + } catch (Exception ignored) { + } + } + + private int countTaggedSandboxes(String queryTag) { + if (queryTag == null || queryTag.isBlank()) { + return 0; + } + PagedSandboxInfos infos = + sandboxManager.listSandboxInfos( + SandboxFilter.builder() + .metadata(Map.of("tag", queryTag)) + .pageSize(50) + .build()); + return infos.getSandboxInfos().size(); + } + + private void eventually( + String description, Duration timeout, Duration interval, BooleanSupplier condition) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + Throwable lastError = null; + while (System.currentTimeMillis() < deadline) { + try { + if (condition.getAsBoolean()) { + return; + } + } catch (Throwable t) { + lastError = t; + } + Thread.sleep(interval.toMillis()); + } + if (lastError != null) { + fail( + "Timed out waiting for " + + description + + ", last error: " + + lastError.getMessage()); + } else { + fail("Timed out waiting for " + description); + } + } + + private static void killAndCloseQuietly(Sandbox sandbox) { + if (sandbox == null) { + return; + } + try { + sandbox.kill(); + } catch (Exception ignored) { + } + try { + sandbox.close(); + } catch (Exception ignored) { + } + } + + /** + * A thread-safe in-process store that mimics distributed semantics: + * - shared idle membership by poolName + * - owner-based primary lock with TTL + * - shared maxIdle propagation + */ + static class PseudoDistributedPoolStateStore implements PoolStateStore { + private static final Duration IDLE_TTL = Duration.ofHours(24); + + private final Map> idleByPool = new LinkedHashMap<>(); + private final Map locks = new LinkedHashMap<>(); + private final Map maxIdleByPool = new LinkedHashMap<>(); + private final Map> putCountByOwnerByPool = new HashMap<>(); + private final Map> renewCountByOwnerByPool = new HashMap<>(); + private final Map> failRenewAfterPutByOwnerByPool = new HashMap<>(); + + @Override + public synchronized String tryTakeIdle(String poolName) { + LinkedHashMap entries = idleByPool.get(poolName); + if (entries == null || entries.isEmpty()) { + return null; + } + reapExpiredIdle(poolName, Instant.now()); + entries = idleByPool.get(poolName); + if (entries == null || entries.isEmpty()) { + return null; + } + Iterator> it = entries.entrySet().iterator(); + if (!it.hasNext()) { + return null; + } + Map.Entry first = it.next(); + it.remove(); + return first.getKey(); + } + + @Override + public synchronized void putIdle(String poolName, String sandboxId) { + LinkedHashMap entries = + idleByPool.computeIfAbsent(poolName, ignored -> new LinkedHashMap<>()); + entries.putIfAbsent(sandboxId, Instant.now().plus(IDLE_TTL)); + String owner = currentOwner(poolName); + if (owner != null) { + incrementCounter(putCountByOwnerByPool, poolName, owner); + } + } + + @Override + public synchronized void removeIdle(String poolName, String sandboxId) { + LinkedHashMap entries = idleByPool.get(poolName); + if (entries != null) { + entries.remove(sandboxId); + } + } + + @Override + public synchronized boolean tryAcquirePrimaryLock( + String poolName, String ownerId, Duration ttl) { + Instant now = Instant.now(); + LockEntry lock = locks.get(poolName); + if (lock == null || !lock.expiresAt.isAfter(now) || lock.ownerId.equals(ownerId)) { + locks.put(poolName, new LockEntry(ownerId, now.plus(ttl))); + return true; + } + return false; + } + + @Override + public synchronized boolean renewPrimaryLock(String poolName, String ownerId, Duration ttl) { + Instant now = Instant.now(); + LockEntry lock = locks.get(poolName); + if (lock == null || !lock.ownerId.equals(ownerId) || !lock.expiresAt.isAfter(now)) { + return false; + } + incrementCounter(renewCountByOwnerByPool, poolName, ownerId); + if (shouldFailRenewByPutThreshold(poolName, ownerId)) { + return false; + } + locks.put(poolName, new LockEntry(ownerId, now.plus(ttl))); + return true; + } + + @Override + public synchronized void releasePrimaryLock(String poolName, String ownerId) { + LockEntry lock = locks.get(poolName); + if (lock != null && lock.ownerId.equals(ownerId)) { + locks.remove(poolName); + } + } + + @Override + public synchronized void reapExpiredIdle(String poolName, Instant now) { + LinkedHashMap entries = idleByPool.get(poolName); + if (entries == null || entries.isEmpty()) { + return; + } + entries.entrySet().removeIf(e -> !e.getValue().isAfter(now)); + } + + @Override + public synchronized StoreCounters snapshotCounters(String poolName) { + reapExpiredIdle(poolName, Instant.now()); + LinkedHashMap entries = idleByPool.get(poolName); + return new StoreCounters(entries == null ? 0 : entries.size()); + } + + @Override + public synchronized Integer getMaxIdle(String poolName) { + return maxIdleByPool.get(poolName); + } + + @Override + public synchronized void setMaxIdle(String poolName, int maxIdle) { + maxIdleByPool.put(poolName, maxIdle); + } + + synchronized String currentOwner(String poolName) { + LockEntry lock = locks.get(poolName); + if (lock == null || !lock.expiresAt.isAfter(Instant.now())) { + return null; + } + return lock.ownerId; + } + + synchronized Map putCountsByOwner(String poolName) { + Map counters = putCountByOwnerByPool.get(poolName); + if (counters == null) { + return Map.of(); + } + return new HashMap<>(counters); + } + + synchronized void setFailRenewWhenPutCountAtLeast( + String poolName, String ownerId, int putThreshold) { + failRenewAfterPutByOwnerByPool + .computeIfAbsent(poolName, ignored -> new HashMap<>()) + .put(ownerId, putThreshold); + } + + private void incrementCounter( + Map> table, String poolName, String ownerId) { + Map ownerCounts = + table.computeIfAbsent(poolName, ignored -> new HashMap<>()); + ownerCounts.put(ownerId, ownerCounts.getOrDefault(ownerId, 0) + 1); + } + + private boolean shouldFailRenewByPutThreshold(String poolName, String ownerId) { + Map thresholds = failRenewAfterPutByOwnerByPool.get(poolName); + if (thresholds == null) { + return false; + } + Integer threshold = thresholds.get(ownerId); + if (threshold == null) { + return false; + } + int putCount = + putCountByOwnerByPool + .getOrDefault(poolName, Map.of()) + .getOrDefault(ownerId, 0); + return putCount >= threshold; + } + + private static class LockEntry { + private final String ownerId; + private final Instant expiresAt; + + private LockEntry(String ownerId, Instant expiresAt) { + this.ownerId = ownerId; + this.expiresAt = expiresAt; + } + } + } +} diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolSingleNodeE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolSingleNodeE2ETest.java new file mode 100644 index 00000000..6a03b80b --- /dev/null +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolSingleNodeE2ETest.java @@ -0,0 +1,986 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.e2e; + +import static org.junit.jupiter.api.Assertions.*; + +import com.alibaba.opensandbox.sandbox.Sandbox; +import com.alibaba.opensandbox.sandbox.SandboxManager; +import com.alibaba.opensandbox.sandbox.config.ConnectionConfig; +import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolEmptyException; +import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxException; +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.Execution; +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunCommandRequest; +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.PagedSandboxInfos; +import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxFilter; +import com.alibaba.opensandbox.sandbox.domain.pool.AcquirePolicy; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolCreationSpec; +import com.alibaba.opensandbox.sandbox.domain.pool.PoolState; +import com.alibaba.opensandbox.sandbox.infrastructure.pool.InMemoryPoolStateStore; +import com.alibaba.opensandbox.sandbox.pool.SandboxPool; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; + +@Tag("e2e") +@DisplayName("SandboxPool E2E Tests (Single-Node)") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SandboxPoolSingleNodeE2ETest extends BaseE2ETest { + private static final int MAX_IDLE = 3; + private static final int CONCURRENT_BORROW = 3; + private static final int MAX_TOTAL_SANDBOX_TOLERANCE = 7; + + private SandboxPool pool; + private InMemoryPoolStateStore stateStore; + private SandboxManager sandboxManager; + private String poolName; + private String tag; + private final List borrowed = new CopyOnWriteArrayList<>(); + + @BeforeEach + void setup() { + tag = "e2e-pool-" + UUID.randomUUID().toString().substring(0, 8); + poolName = "pool-" + tag; + stateStore = new InMemoryPoolStateStore(); + sandboxManager = SandboxManager.builder().connectionConfig(sharedConnectionConfig).build(); + + PoolCreationSpec creationSpec = + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata(Map.of("tag", tag, "suite", "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build(); + pool = + SandboxPool.builder() + .poolName(poolName) + .ownerId("owner-" + tag) + .maxIdle(MAX_IDLE) + .warmupConcurrency(1) + .stateStore(stateStore) + .connectionConfig(sharedConnectionConfig) + .creationSpec(creationSpec) + .reconcileInterval(Duration.ofSeconds(2)) + .drainTimeout(Duration.ofMillis(200)) + .build(); + pool.start(); + } + + @AfterEach + void teardown() { + for (Sandbox sandbox : borrowed) { + killAndCloseQuietly(sandbox); + } + borrowed.clear(); + + if (pool != null) { + try { + pool.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + pool.shutdown(false); + } catch (Exception ignored) { + } + } + + cleanupTaggedSandboxes(); + + if (sandboxManager != null) { + try { + sandboxManager.close(); + } catch (Exception ignored) { + } + } + } + + @Test + @Order(1) + @DisplayName("warmup + acquire(FAIL_FAST) + basic command run") + @Timeout(value = 3, unit = TimeUnit.MINUTES) + void testBasicPoolFlow() throws InterruptedException { + eventually( + "pool becomes healthy with warm idle", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + pool.snapshot().getState() == PoolState.HEALTHY + && pool.snapshot().getIdleCount() >= 1); + + Sandbox sandbox = pool.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST); + borrowed.add(sandbox); + assertTrue(sandbox.isHealthy(), "acquired sandbox should be healthy"); + + Execution result = + sandbox.commands() + .run(RunCommandRequest.builder().command("echo pool-basic-ok").build()); + assertNotNull(result); + assertNull(result.getError()); + assertFalse(result.getLogs().getStdout().isEmpty()); + assertEquals("pool-basic-ok", result.getLogs().getStdout().get(0).getText()); + } + + @Test + @Order(2) + @DisplayName("resize/release supports FAIL_FAST on empty and DIRECT_CREATE fallback") + @Timeout(value = 3, unit = TimeUnit.MINUTES) + void testResizeReleaseAndFallbackPolicy() throws InterruptedException { + eventually( + "pool has warmed idle sandboxes", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getIdleCount() >= 1); + + pool.resize(0); + int released = pool.releaseAllIdle(); + assertTrue(released >= 0, "releaseAllIdle should return non-negative count"); + + eventually( + "idle is drained", + Duration.ofSeconds(30), + Duration.ofSeconds(1), + () -> pool.snapshot().getIdleCount() == 0); + + assertThrows( + PoolEmptyException.class, + () -> pool.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST)); + + Sandbox direct = pool.acquire(Duration.ofMinutes(5), AcquirePolicy.DIRECT_CREATE); + borrowed.add(direct); + assertTrue(direct.isHealthy(), "direct-created sandbox should be healthy"); + } + + @Test + @Order(3) + @DisplayName("stale idle id fallback works and acquire is rejected after graceful shutdown") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testStaleIdleAndShutdownAcquire() throws InterruptedException { + String staleId = "non-existent-" + System.nanoTime(); + stateStore.putIdle(poolName, staleId); + + Sandbox staleFallback = pool.acquire(Duration.ofMinutes(5), AcquirePolicy.DIRECT_CREATE); + borrowed.add(staleFallback); + assertTrue(staleFallback.isHealthy(), "stale-id fallback should return healthy sandbox"); + + pool.shutdown(true); + SandboxException afterShutdownError = + assertThrows( + SandboxException.class, + () -> pool.acquire(Duration.ofMinutes(5), AcquirePolicy.DIRECT_CREATE)); + assertEquals("POOL_NOT_RUNNING", afterShutdownError.getError().getCode()); + } + + @Test + @Order(4) + @DisplayName("concurrent acquire keeps total pool sandboxes under tolerance") + @Timeout(value = 6, unit = TimeUnit.MINUTES) + void testConcurrentAcquireWithinTolerance() throws Exception { + eventually( + "pool reaches target idle", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getIdleCount() >= MAX_IDLE); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch acquiredLatch = new CountDownLatch(CONCURRENT_BORROW); + CountDownLatch releaseLatch = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_BORROW); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < CONCURRENT_BORROW; i++) { + futures.add( + CompletableFuture.runAsync( + () -> { + try { + startLatch.await(); + Sandbox sandbox = + pool.acquire( + Duration.ofMinutes(5), + AcquirePolicy.DIRECT_CREATE); + borrowed.add(sandbox); + acquiredLatch.countDown(); + sandbox.commands() + .run( + RunCommandRequest.builder() + .command( + "sh -c 'sleep 2; echo pool-concurrency-ok'") + .build()); + releaseLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } catch (Exception e) { + throw new CompletionException(e); + } + }, + executor)); + } + + startLatch.countDown(); + assertTrue( + acquiredLatch.await(2, TimeUnit.MINUTES), + "concurrent acquires should complete in time"); + + int observedMax = 0; + for (int i = 0; i < 10; i++) { + observedMax = Math.max(observedMax, countTaggedSandboxes()); + Thread.sleep(500); + } + assertTrue( + observedMax <= MAX_TOTAL_SANDBOX_TOLERANCE, + "observed tagged sandbox count exceeded tolerance, max=" + observedMax); + + releaseLatch.countDown(); + for (CompletableFuture future : futures) { + future.get(2, TimeUnit.MINUTES); + } + } finally { + releaseLatch.countDown(); + executor.shutdownNow(); + } + } + + @Test + @Order(5) + @DisplayName("pool start/shutdown is idempotent and acquire is rejected after stop") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testLifecycleIdempotencyAndAcquireAfterStop() throws InterruptedException { + pool.start(); + pool.start(); + + eventually( + "pool reaches healthy state after repeated start", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getState() == PoolState.HEALTHY); + + pool.shutdown(true); + pool.shutdown(true); + pool.shutdown(false); + + assertEquals(PoolState.STOPPED, pool.snapshot().getState(), "pool should remain STOPPED"); + SandboxException stoppedError = + assertThrows( + SandboxException.class, + () -> pool.acquire(Duration.ofMinutes(5), AcquirePolicy.DIRECT_CREATE)); + assertEquals("POOL_NOT_RUNNING", stoppedError.getError().getCode()); + } + + @Test + @Order(6) + @DisplayName("releaseAllIdle drains injected idle ids and stopped pool rejects acquire") + @Timeout(value = 3, unit = TimeUnit.MINUTES) + void testReleaseAllIdleAfterStopAndFailFastAcquireFallback() { + pool.shutdown(false); + assertEquals(PoolState.STOPPED, pool.snapshot().getState(), "pool should be STOPPED"); + pool.releaseAllIdle(); + assertEquals( + 0, + stateStore.snapshotCounters(poolName).getIdleCount(), + "pre-existing idle should be drained before injection"); + + String fake1 = "fake-id-1-" + System.nanoTime(); + String fake2 = "fake-id-2-" + System.nanoTime(); + stateStore.putIdle(poolName, fake1); + stateStore.putIdle(poolName, fake2); + assertEquals(2, stateStore.snapshotCounters(poolName).getIdleCount()); + + int released = pool.releaseAllIdle(); + assertEquals(2, released, "releaseAllIdle should drain all injected idle ids"); + assertEquals(0, stateStore.snapshotCounters(poolName).getIdleCount()); + + SandboxException stoppedFailFastError = + assertThrows( + SandboxException.class, + () -> pool.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST)); + assertEquals("POOL_NOT_RUNNING", stoppedFailFastError.getError().getCode()); + } + + @Test + @Order(7) + @DisplayName("pool can restart after stop and rewarm idle sandboxes") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testRestartAfterStopRewarmsIdle() throws InterruptedException { + pool.shutdown(false); + assertEquals(PoolState.STOPPED, pool.snapshot().getState(), "pool should be STOPPED"); + + pool.start(); + eventually( + "pool restarts and rewarm idle sandboxes", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + pool.snapshot().getState() == PoolState.HEALTHY + && pool.snapshot().getIdleCount() >= 1); + + Sandbox fromIdle = pool.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST); + borrowed.add(fromIdle); + assertTrue(fromIdle.isHealthy(), "acquire after restart should return healthy sandbox"); + } + + @Test + @Order(8) + @DisplayName("concurrent shutdown and acquire does not deadlock (POOL_NOT_RUNNING is allowed)") + @Timeout(value = 6, unit = TimeUnit.MINUTES) + void testConcurrentShutdownAndAcquireDoesNotDeadlock() throws Exception { + eventually( + "pool has warm idle before concurrency scenario", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getIdleCount() >= 1); + + int acquireWorkers = 4; + CountDownLatch startLatch = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(acquireWorkers + 1); + List> futures = new ArrayList<>(); + List errors = new CopyOnWriteArrayList<>(); + try { + for (int i = 0; i < acquireWorkers; i++) { + futures.add( + CompletableFuture.runAsync( + () -> { + try { + startLatch.await(); + Sandbox sandbox = + pool.acquire( + Duration.ofMinutes(5), + AcquirePolicy.DIRECT_CREATE); + borrowed.add(sandbox); + Execution result = + sandbox.commands() + .run( + RunCommandRequest.builder() + .command( + "echo acquire-concurrent-ok") + .build()); + assertNotNull(result); + } catch (Throwable t) { + if (isPoolNotRunningError(t)) { + return; + } + errors.add(t); + throw new CompletionException(t); + } + }, + executor)); + } + + futures.add( + CompletableFuture.runAsync( + () -> { + try { + startLatch.await(); + pool.shutdown(true); + } catch (Throwable t) { + errors.add(t); + throw new CompletionException(t); + } + }, + executor)); + + startLatch.countDown(); + for (CompletableFuture future : futures) { + future.get(2, TimeUnit.MINUTES); + } + } finally { + executor.shutdownNow(); + } + + assertTrue( + errors.isEmpty(), "concurrent shutdown/acquire should not raise errors: " + errors); + } + + @Test + @Order(9) + @DisplayName("concurrent start/shutdown stress remains stable") + @Timeout(value = 6, unit = TimeUnit.MINUTES) + void testConcurrentStartShutdownStressSingleNode() throws Exception { + pool.resize(0); + pool.releaseAllIdle(); + pool.shutdown(false); + + int workers = 4; + int loopsPerWorker = 8; + ExecutorService executor = Executors.newFixedThreadPool(workers); + List> futures = new ArrayList<>(); + List errors = new CopyOnWriteArrayList<>(); + try { + for (int i = 0; i < workers; i++) { + futures.add( + CompletableFuture.runAsync( + () -> { + for (int j = 0; j < loopsPerWorker; j++) { + try { + pool.start(); + Thread.sleep(20); + pool.shutdown(false); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + errors.add(e); + throw new CompletionException(e); + } catch (Throwable t) { + errors.add(t); + throw new CompletionException(t); + } + } + }, + executor)); + } + + for (CompletableFuture future : futures) { + future.get(2, TimeUnit.MINUTES); + } + } finally { + executor.shutdownNow(); + } + assertTrue(errors.isEmpty(), "concurrent start/shutdown stress should not fail: " + errors); + + pool.resize(2); + pool.start(); + eventually( + "pool recovers to healthy after stress", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getState() == PoolState.HEALTHY); + } + + @Test + @Order(10) + @DisplayName("resize from zero to positive rewarm idle target") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testResizeFromZeroToPositiveRewarmsIdleTarget() throws InterruptedException { + eventually( + "pool has warmed idle before resize test", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getIdleCount() >= 1); + + pool.resize(0); + pool.releaseAllIdle(); + eventually( + "idle drained after resize to zero", + Duration.ofSeconds(30), + Duration.ofSeconds(1), + () -> pool.snapshot().getIdleCount() == 0); + + pool.resize(2); + eventually( + "idle rewarmed to resized target", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + pool.snapshot().getState() == PoolState.HEALTHY + && pool.snapshot().getIdleCount() >= 2); + } + + @Test + @Order(11) + @DisplayName("two pools stay isolated in serial low-footprint mode") + @Timeout(value = 6, unit = TimeUnit.MINUTES) + void testTwoPoolsIsolationSerialLowFootprint() throws Exception { + String tagA = "e2e-pool-a-" + UUID.randomUUID().toString().substring(0, 8); + String tagB = "e2e-pool-b-" + UUID.randomUUID().toString().substring(0, 8); + String poolNameA = "pool-a-" + tagA; + String poolNameB = "pool-b-" + tagB; + + SandboxPool poolA = null; + SandboxPool poolB = null; + try { + poolA = + SandboxPool.builder() + .poolName(poolNameA) + .ownerId("owner-a-" + tagA) + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(new InMemoryPoolStateStore()) + .connectionConfig(sharedConnectionConfig) + .creationSpec( + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata( + Map.of( + "tag", + tagA, + "suite", + "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build()) + .reconcileInterval(Duration.ofSeconds(2)) + .drainTimeout(Duration.ofMillis(200)) + .build(); + + poolA.start(); + SandboxPool finalPoolA = poolA; + eventually( + "poolA warmed to idle target", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + finalPoolA.snapshot().getState() == PoolState.HEALTHY + && finalPoolA.snapshot().getIdleCount() >= 1); + + Sandbox a = poolA.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST); + borrowed.add(a); + assertTrue(a.isHealthy(), "poolA acquire should return healthy sandbox"); + releaseAndRemoveBorrowed(a); + + poolB = + SandboxPool.builder() + .poolName(poolNameB) + .ownerId("owner-b-" + tagB) + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(new InMemoryPoolStateStore()) + .connectionConfig(sharedConnectionConfig) + .creationSpec( + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata( + Map.of( + "tag", + tagB, + "suite", + "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build()) + .reconcileInterval(Duration.ofSeconds(2)) + .drainTimeout(Duration.ofMillis(200)) + .build(); + + poolB.start(); + SandboxPool finalPoolB = poolB; + eventually( + "poolB warmed to idle target", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + finalPoolB.snapshot().getState() == PoolState.HEALTHY + && finalPoolB.snapshot().getIdleCount() >= 1); + + int taggedA = countTaggedSandboxes(tagA); + int taggedB = countTaggedSandboxes(tagB); + assertTrue(taggedA >= 1, "poolA should have at least one tagged sandbox"); + assertTrue(taggedB >= 1, "poolB should have at least one tagged sandbox"); + + poolA.resize(0); + poolA.releaseAllIdle(); + eventually( + "poolA idle drains to zero", + Duration.ofSeconds(30), + Duration.ofSeconds(1), + () -> finalPoolA.snapshot().getIdleCount() == 0); + + assertTrue( + poolB.snapshot().getIdleCount() >= 1, + "poolB idle should remain unaffected by poolA operations"); + assertTrue( + countTaggedSandboxes(tagB) >= 1, + "poolB tagged sandboxes should remain after poolA drain"); + } finally { + if (poolA != null) { + try { + poolA.resize(0); + poolA.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + poolA.shutdown(false); + } catch (Exception ignored) { + } + } + if (poolB != null) { + try { + poolB.resize(0); + poolB.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + poolB.shutdown(false); + } catch (Exception ignored) { + } + } + cleanupTaggedSandboxes(tagA); + cleanupTaggedSandboxes(tagB); + } + } + + @Test + @Order(12) + @DisplayName("releaseAllIdle drains store and remote tagged sandboxes in serial mode") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testReleaseAllIdleReducesRemoteTaggedSandboxesSerial() throws InterruptedException { + eventually( + "pool has warmed idle before releaseAllIdle remote validation", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> pool.snapshot().getIdleCount() >= 1); + + int before = countTaggedSandboxes(); + assertTrue(before >= 1, "expected at least one tagged sandbox before releaseAllIdle"); + + pool.resize(0); + int released = pool.releaseAllIdle(); + assertTrue(released >= 1, "releaseAllIdle should release at least one idle sandbox"); + + eventually( + "idle count reaches zero after releaseAllIdle", + Duration.ofSeconds(30), + Duration.ofSeconds(1), + () -> pool.snapshot().getIdleCount() == 0); + + eventually( + "remote tagged sandbox count decreases after releaseAllIdle", + Duration.ofSeconds(60), + Duration.ofSeconds(2), + () -> countTaggedSandboxes() <= Math.max(0, before - released + 1)); + } + + @Test + @Order(13) + @DisplayName("broken connection triggers DEGRADED state with low resource footprint") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testBrokenConnectionTriggersDegradedState() throws InterruptedException { + String badTag = "e2e-pool-bad-" + UUID.randomUUID().toString().substring(0, 8); + SandboxPool badPool = null; + try { + badPool = + SandboxPool.builder() + .poolName("pool-bad-" + badTag) + .ownerId("owner-bad-" + badTag) + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(new InMemoryPoolStateStore()) + .connectionConfig(buildBrokenConnectionConfig()) + .creationSpec( + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata( + Map.of( + "tag", + badTag, + "suite", + "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build()) + .degradedThreshold(1) + .reconcileInterval(Duration.ofSeconds(1)) + .drainTimeout(Duration.ofMillis(100)) + .build(); + badPool.start(); + + SandboxPool finalBadPool = badPool; + eventually( + "bad pool enters DEGRADED state", + Duration.ofSeconds(90), + Duration.ofSeconds(2), + () -> finalBadPool.snapshot().getState() == PoolState.DEGRADED); + + assertNotNull( + finalBadPool.snapshot().getLastError(), + "degraded snapshot should contain lastError"); + assertEquals( + 0, + finalBadPool.snapshot().getIdleCount(), + "broken pool should not create idle sandboxes"); + } finally { + if (badPool != null) { + try { + badPool.resize(0); + badPool.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + badPool.shutdown(false); + } catch (Exception ignored) { + } + } + cleanupTaggedSandboxes(badTag); + } + } + + @Test + @Order(14) + @DisplayName("broken pool enforces FAIL_FAST empty and direct-create failure semantics") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testBrokenPoolAcquireSemantics() throws InterruptedException { + String badTag = "e2e-pool-bad-acquire-" + UUID.randomUUID().toString().substring(0, 8); + SandboxPool badPool = null; + try { + badPool = + SandboxPool.builder() + .poolName("pool-bad-acquire-" + badTag) + .ownerId("owner-bad-acquire-" + badTag) + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(new InMemoryPoolStateStore()) + .connectionConfig(buildBrokenConnectionConfig()) + .creationSpec( + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata( + Map.of( + "tag", + badTag, + "suite", + "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build()) + .degradedThreshold(1) + .reconcileInterval(Duration.ofSeconds(1)) + .drainTimeout(Duration.ofMillis(100)) + .build(); + badPool.start(); + SandboxPool finalBadPool = badPool; + eventually( + "broken pool enters DEGRADED before acquire semantic checks", + Duration.ofSeconds(90), + Duration.ofSeconds(2), + () -> finalBadPool.snapshot().getState() == PoolState.DEGRADED); + + assertThrows( + PoolEmptyException.class, + () -> finalBadPool.acquire(Duration.ofMinutes(1), AcquirePolicy.FAIL_FAST)); + assertThrows( + Exception.class, + () -> finalBadPool.acquire(Duration.ofMinutes(1), AcquirePolicy.DIRECT_CREATE)); + } finally { + if (badPool != null) { + try { + badPool.resize(0); + badPool.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + badPool.shutdown(false); + } catch (Exception ignored) { + } + } + cleanupTaggedSandboxes(badTag); + } + } + + @Test + @Order(15) + @DisplayName("healthy pool still works after broken pool failure path") + @Timeout(value = 4, unit = TimeUnit.MINUTES) + void testHealthyPoolWorksAfterBrokenPoolPath() throws InterruptedException { + String goodTag = "e2e-pool-good-after-bad-" + UUID.randomUUID().toString().substring(0, 8); + SandboxPool goodPool = null; + try { + goodPool = + SandboxPool.builder() + .poolName("pool-good-after-bad-" + goodTag) + .ownerId("owner-good-after-bad-" + goodTag) + .maxIdle(1) + .warmupConcurrency(1) + .stateStore(new InMemoryPoolStateStore()) + .connectionConfig(sharedConnectionConfig) + .creationSpec( + PoolCreationSpec.builder() + .image(getSandboxImage()) + .entrypoint(List.of("tail -f /dev/null")) + .metadata( + Map.of( + "tag", + goodTag, + "suite", + "sandbox-pool-e2e")) + .env(Map.of("E2E_TEST", "true")) + .build()) + .reconcileInterval(Duration.ofSeconds(2)) + .drainTimeout(Duration.ofMillis(100)) + .build(); + goodPool.start(); + + SandboxPool finalGoodPool = goodPool; + eventually( + "healthy pool reaches warm idle", + Duration.ofMinutes(2), + Duration.ofSeconds(2), + () -> + finalGoodPool.snapshot().getState() == PoolState.HEALTHY + && finalGoodPool.snapshot().getIdleCount() >= 1); + + Sandbox sandbox = finalGoodPool.acquire(Duration.ofMinutes(5), AcquirePolicy.FAIL_FAST); + borrowed.add(sandbox); + Execution execution = + sandbox.commands() + .run(RunCommandRequest.builder().command("echo recovery-ok").build()); + assertNotNull(execution); + assertNull(execution.getError()); + releaseAndRemoveBorrowed(sandbox); + } finally { + if (goodPool != null) { + try { + goodPool.resize(0); + goodPool.releaseAllIdle(); + } catch (Exception ignored) { + } + try { + goodPool.shutdown(false); + } catch (Exception ignored) { + } + } + cleanupTaggedSandboxes(goodTag); + } + } + + private void cleanupTaggedSandboxes() { + cleanupTaggedSandboxes(tag); + } + + private void cleanupTaggedSandboxes(String cleanupTag) { + if (cleanupTag == null || cleanupTag.isBlank()) { + return; + } + if (sandboxManager == null) { + return; + } + try { + PagedSandboxInfos infos = + sandboxManager.listSandboxInfos( + SandboxFilter.builder() + .metadata(Map.of("tag", cleanupTag)) + .pageSize(20) + .build()); + infos.getSandboxInfos() + .forEach( + info -> { + try { + sandboxManager.killSandbox(info.getId()); + } catch (Exception ignored) { + } + }); + } catch (Exception ignored) { + } + } + + private int countTaggedSandboxes() { + return countTaggedSandboxes(tag); + } + + private int countTaggedSandboxes(String queryTag) { + if (queryTag == null || queryTag.isBlank()) { + return 0; + } + PagedSandboxInfos infos = + sandboxManager.listSandboxInfos( + SandboxFilter.builder() + .metadata(Map.of("tag", queryTag)) + .pageSize(20) + .build()); + return infos.getSandboxInfos().size(); + } + + private void eventually( + String description, Duration timeout, Duration interval, BooleanSupplier condition) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + Throwable lastError = null; + while (System.currentTimeMillis() < deadline) { + try { + if (condition.getAsBoolean()) { + return; + } + } catch (Throwable t) { + lastError = t; + } + Thread.sleep(interval.toMillis()); + } + if (lastError != null) { + fail( + "Timed out waiting for " + + description + + ", last error: " + + lastError.getMessage()); + } else { + fail("Timed out waiting for " + description); + } + } + + private static void killAndCloseQuietly(Sandbox sandbox) { + if (sandbox == null) { + return; + } + try { + sandbox.kill(); + } catch (Exception ignored) { + } + try { + sandbox.close(); + } catch (Exception ignored) { + } + } + + private void releaseAndRemoveBorrowed(Sandbox sandbox) { + if (sandbox == null) { + return; + } + killAndCloseQuietly(sandbox); + borrowed.remove(sandbox); + } + + private ConnectionConfig buildBrokenConnectionConfig() { + return ConnectionConfig.builder() + .apiKey(sharedConnectionConfig.getApiKey()) + .domain("127.0.0.1:1") + .protocol("http") + .requestTimeout(Duration.ofSeconds(1)) + .build(); + } + + private static boolean isPoolNotRunningError(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof SandboxException) { + SandboxException se = (SandboxException) current; + if (se.getError() != null && "POOL_NOT_RUNNING".equals(se.getError().getCode())) { + return true; + } + } + String message = current.getMessage(); + if (message != null && message.contains("POOL_NOT_RUNNING")) { + return true; + } + current = current.getCause(); + } + return false; + } +}