diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..02b58dc2 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,241 @@ +# AGENTS.md + +This file provides guidance for AI agents working with the Stakpak codebase. + +## Project Overview + +Stakpak is an open-source AI DevOps Agent that runs in the terminal. It enables developers to generate infrastructure code, debug Kubernetes, configure CI/CD, and automate deployments with security-first design principles. + +**Key differentiators:** +- **Secret Substitution** - LLMs work with credentials without seeing actual values +- **Warden Guardrails** - Network-level policies block destructive operations +- **DevOps Playbooks** - Curated library of DevOps knowledge via Rulebooks + +## Architecture + +### Workspace Structure + +``` +stakpak/agent (Rust workspace) +├── cli/ # CLI binary crate (stakpak-cli) +│ ├── src/commands/ # CLI commands (agent, mcp, auth, acp) +│ ├── src/config/ # Configuration handling +│ └── src/onboarding/ # First-run setup flows +├── tui/ # Terminal UI crate (stakpak-tui) +│ └── src/services/ # TUI services and handlers +├── libs/ +│ ├── ai/ # AI provider abstraction (stakai) +│ │ └── src/providers/ # Anthropic, OpenAI, Gemini implementations +│ ├── api/ # Stakpak API client (stakpak-api) +│ ├── mcp/ +│ │ ├── client/ # MCP client (stakpak-mcp-client) +│ │ ├── server/ # MCP server & tools (stakpak-mcp-server) +│ │ └── proxy/ # MCP proxy (stakpak-mcp-proxy) +│ └── shared/ # Shared utilities (stakpak-shared) +│ ├── src/secrets/ # Secret detection (gitleaks-based) +│ ├── src/models/ # Data models +│ └── src/oauth/ # OAuth providers +``` + +### Key Crates + +| Crate | Purpose | +|-------|---------| +| `stakpak-cli` | Main CLI binary, entry point | +| `stakpak-tui` | Ratatui-based terminal UI | +| `stakai` | Multi-provider AI client (Anthropic, OpenAI, Gemini) | +| `stakpak-mcp-server` | MCP tools (local & remote) | +| `stakpak-shared` | Secret management, file ops, task management | + +## Development Guidelines + +### Build Commands + +```bash +# Development build +cargo build + +# Release build +cargo build --release + +# Build specific crate +cargo build -p stakpak-cli + +# Run locally +cargo run -- --help +``` + +### Testing + +```bash +# All tests +cargo test --workspace + +# Specific crate +cargo test -p stakpak-shared + +# With output +cargo test --workspace -- --nocapture +``` + +### Code Quality + +```bash +# Format check (required for CI) +cargo fmt --check + +# Lint (required for CI) +cargo clippy --all-targets + +# Full check +cargo check --all-targets +``` + +### Critical Lints + +The workspace enforces strict linting via `clippy.toml`: + +```rust +// ❌ DENIED - will fail CI +let value = option.unwrap(); +let value = result.expect("msg"); + +// ✅ REQUIRED - proper error handling +let value = option.ok_or_else(|| anyhow::anyhow!("missing value"))?; +let value = match option { + Some(v) => v, + None => return Err(anyhow::anyhow!("missing value")), +}; +``` + +## Code Patterns + +### Error Handling + +Use `anyhow` for application errors with context: + +```rust +use anyhow::{Result, Context}; + +fn read_config(path: &Path) -> Result { + let contents = std::fs::read_to_string(path) + .context("Failed to read config file")?; + toml::from_str(&contents) + .context("Failed to parse config") +} +``` + +### Async Code + +The project uses `tokio` runtime: + +```rust +#[tokio::main] +async fn main() -> Result<()> { + // ... +} + +// In tests +#[tokio::test] +async fn test_async_feature() { + // ... +} +``` + +### Secret Handling + +Never log or expose secrets. Use the secret manager: + +```rust +// Redact secrets before displaying +let redacted = secret_manager.redact_and_store_secrets(&content, Some(&path)); + +// Restore secrets before execution +let actual = secret_manager.restore_secrets_in_string(&redacted); +``` + +### MCP Tools + +Tools are defined in `libs/mcp/server/src/`: +- `local_tools.rs` - File ops, command execution, task management +- `remote_tools.rs` - API-backed tools (docs search, memory, rulebooks) +- `subagent_tools.rs` - Subagent spawning tools + +Tool pattern: + +```rust +#[tool(description = "Tool description here")] +pub async fn tool_name( + &self, + ctx: RequestContext, + Parameters(request): Parameters, +) -> Result { + // Implementation +} +``` + +## File Conventions + +### Configuration Files + +- `~/.stakpak/config.toml` - User configuration +- `.stakpak/` - Project-level session data +- `.warden/` - Security guardrail configs + +### Important Files + +| File | Purpose | +|------|---------| +| `Cargo.toml` | Workspace definition, shared dependencies | +| `clippy.toml` | Clippy configuration (denies unwrap/expect) | +| `cliff.toml` | Changelog generation config | +| `release.sh` | Release automation script | + +## CI/CD + +GitHub Actions workflow (`.github/workflows/ci.yml`): + +1. `cargo check --all-targets` +2. `cargo fmt -- --check` +3. `cargo clippy` +4. `cargo build` +5. `cargo test --workspace` + +All checks must pass before merge. + +## Common Tasks + +### Adding a New MCP Tool + +1. Define request struct in appropriate tools file +2. Implement tool method with `#[tool]` attribute +3. Add to tool router macro +4. Test locally with `cargo run -- mcp start --tool-mode local` + +### Adding a New AI Provider + +1. Create provider module in `libs/ai/src/providers/` +2. Implement `Provider` trait from `libs/ai/src/provider/trait_def.rs` +3. Register in provider dispatcher +4. Add configuration support in `libs/shared/src/models/integrations/` + +### Modifying TUI + +1. Services in `tui/src/services/` +2. Event handlers in `tui/src/services/handlers/` +3. View rendering in `tui/src/view.rs` +4. Uses Ratatui framework + +## Security Considerations + +- **Never** commit secrets or credentials +- Use secret redaction for all user-facing output +- Validate all external input +- Be cautious with file system operations +- Test security features with `.warden/test-security.sh` + +## Resources + +- [Documentation](https://stakpak.gitbook.io/docs) +- [Contributing Guide](./CONTRIBUTING.md) +- [Getting Started](./GETTING-STARTED.md) diff --git a/Cargo.lock b/Cargo.lock index f5f8a99d..995ae73d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5601,12 +5601,14 @@ dependencies = [ "axum-server", "base64 0.22.1", "chrono", + "console", "dirs", "futures", "futures-util", "hyper 1.8.1", "itertools 0.14.0", "notify", + "portable-pty", "rand 0.9.2", "rcgen", "regex", diff --git a/cli/src/config/app.rs b/cli/src/config/app.rs index 05fca58b..b98d6802 100644 --- a/cli/src/config/app.rs +++ b/cli/src/config/app.rs @@ -793,6 +793,7 @@ impl From for Settings { anonymous_id: config.anonymous_id, collect_telemetry: config.collect_telemetry, editor: config.editor, + shell_sessions: None, } } } diff --git a/cli/src/config/file.rs b/cli/src/config/file.rs index dff62178..c2567d63 100644 --- a/cli/src/config/file.rs +++ b/cli/src/config/file.rs @@ -29,6 +29,7 @@ impl Default for ConfigFile { anonymous_id: Some(uuid::Uuid::new_v4().to_string()), collect_telemetry: Some(true), editor: Some("nano".to_string()), + shell_sessions: None, }, } } @@ -48,6 +49,7 @@ impl ConfigFile { anonymous_id: Some(uuid::Uuid::new_v4().to_string()), collect_telemetry: Some(true), editor: Some("nano".to_string()), + shell_sessions: None, }, } } @@ -91,6 +93,7 @@ impl ConfigFile { let existing_anonymous_id = self.settings.anonymous_id.clone(); let existing_collect_telemetry = self.settings.collect_telemetry; let existing_editor = self.settings.editor.clone(); + let existing_shell_sessions = self.settings.shell_sessions.clone(); self.settings = Settings { machine_name: config.machine_name, @@ -98,6 +101,7 @@ impl ConfigFile { anonymous_id: config.anonymous_id.or(existing_anonymous_id), collect_telemetry: config.collect_telemetry.or(existing_collect_telemetry), editor: config.editor.or(existing_editor), + shell_sessions: existing_shell_sessions, }; } diff --git a/cli/src/config/tests.rs b/cli/src/config/tests.rs index f6dd7f25..bca11f5e 100644 --- a/cli/src/config/tests.rs +++ b/cli/src/config/tests.rs @@ -200,6 +200,7 @@ fn resolved_profile_config_merges_all_profile_defaults() { anonymous_id: Some("test-user-id".into()), collect_telemetry: Some(true), editor: Some("nano".into()), + shell_sessions: None, }, }; diff --git a/cli/src/config/types.rs b/cli/src/config/types.rs index 0a4d66e3..70328566 100644 --- a/cli/src/config/types.rs +++ b/cli/src/config/types.rs @@ -1,6 +1,7 @@ //! Basic configuration types. use serde::{Deserialize, Serialize}; +use std::time::Duration; /// Provider type selection for the CLI. #[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -13,6 +14,70 @@ pub enum ProviderType { Local, } +/// Configuration for persistent shell sessions. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ShellSessionsConfig { + /// Enable persistent shell sessions (default: true) + #[serde(default = "default_shell_sessions_enabled")] + pub enabled: bool, + + /// Default shell for local sessions (auto-detect if not set) + pub default_shell: Option, + + /// Session timeout in seconds (0 = no timeout, default: 3600) + #[serde(default = "default_session_timeout")] + pub session_timeout: u64, + + /// Maximum concurrent sessions (default: 10) + #[serde(default = "default_max_sessions")] + pub max_sessions: usize, + + /// Command completion timeout in seconds (default: 300) + #[serde(default = "default_command_timeout")] + pub command_timeout: u64, +} + +fn default_shell_sessions_enabled() -> bool { + true +} + +fn default_session_timeout() -> u64 { + 3600 +} + +fn default_max_sessions() -> usize { + 10 +} + +fn default_command_timeout() -> u64 { + 300 +} + +impl Default for ShellSessionsConfig { + fn default() -> Self { + Self { + enabled: true, + default_shell: None, + session_timeout: 3600, + max_sessions: 10, + command_timeout: 300, + } + } +} + +impl ShellSessionsConfig { + /// Convert to the shared library's ShellSessionConfig + pub fn to_shared_config(&self) -> stakpak_shared::shell_session::ShellSessionConfig { + stakpak_shared::shell_session::ShellSessionConfig { + enabled: self.enabled, + default_shell: self.default_shell.clone(), + session_timeout: Duration::from_secs(self.session_timeout), + max_sessions: self.max_sessions, + command_timeout: Duration::from_secs(self.command_timeout), + } + } +} + /// Global settings that apply across all profiles. #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Settings { @@ -27,6 +92,9 @@ pub struct Settings { pub collect_telemetry: Option, /// Preferred external editor (e.g. vim, nano, code) pub editor: Option, + /// Shell session configuration + #[serde(default)] + pub shell_sessions: Option, } /// Legacy configuration format for migration purposes. @@ -46,6 +114,7 @@ impl From for Settings { anonymous_id: Some(uuid::Uuid::new_v4().to_string()), collect_telemetry: Some(true), editor: Some("nano".to_string()), + shell_sessions: None, } } } diff --git a/libs/mcp/server/src/local_tools.rs b/libs/mcp/server/src/local_tools.rs index d2bc1b1b..12fd629a 100644 --- a/libs/mcp/server/src/local_tools.rs +++ b/libs/mcp/server/src/local_tools.rs @@ -180,6 +180,10 @@ impl ToolContainer { #[tool( description = "A system command execution tool that allows running shell commands with full system access on local or remote systems via SSH. +PERSISTENT SHELL SESSIONS: +- Commands run in persistent shell sessions where environment variables, working directory, and shell state persist across commands +- Local commands use a default local session; remote commands use a default session per connection + REMOTE EXECUTION: - Set 'remote' parameter to 'user@host' or 'user@host:port' for SSH execution - Use 'password' for password authentication or 'private_key_path' for key-based auth @@ -208,11 +212,83 @@ If the command's output exceeds 300 lines the result will be truncated and the f private_key_path, }): Parameters, ) -> Result { - // Use unified command execution helper - match self - .execute_command_unified(&command, timeout, remote, password, private_key_path, &ctx) - .await - { + // Always use persistent shell sessions + let timeout_duration = timeout.map(std::time::Duration::from_secs); + + // Determine if this is a remote or local command and get/create appropriate session + let result = if let Some(ref remote_str) = remote { + // Remote command - get or create default remote session + match self + .get_shell_session_manager() + .get_or_create_default_remote_session( + remote_str, + password.clone(), + private_key_path.clone(), + ) + .await + { + Ok(session_id) => { + self.execute_in_persistent_session( + &session_id, + &command, + timeout_duration, + &ctx, + ) + .await + } + Err(e) => { + // Fall back to non-persistent execution if session creation fails + tracing::warn!( + "Failed to create remote session, falling back to non-persistent: {}", + e + ); + self.execute_command_unified( + &command, + timeout, + remote, + password, + private_key_path, + &ctx, + ) + .await + } + } + } else { + // Local command - get or create default local session + match self + .get_shell_session_manager() + .get_or_create_default_local_session() + .await + { + Ok(session_id) => { + self.execute_in_persistent_session( + &session_id, + &command, + timeout_duration, + &ctx, + ) + .await + } + Err(e) => { + // Fall back to non-persistent execution if session creation fails + tracing::warn!( + "Failed to create local session, falling back to non-persistent: {}", + e + ); + self.execute_command_unified( + &command, + timeout, + remote, + password, + private_key_path, + &ctx, + ) + .await + } + } + }; + + match result { Ok(mut command_result) => { command_result.output = match handle_large_output(&command_result.output, "command.output", 300, false) @@ -244,6 +320,116 @@ If the command's output exceeds 300 lines the result will be truncated and the f } } + /// Execute a command in a persistent shell session with streaming output + async fn execute_in_persistent_session( + &self, + session_id: &str, + command: &str, + timeout: Option, + ctx: &RequestContext, + ) -> Result { + use rmcp::model::{NumberOrString, ProgressNotificationParam, ProgressToken}; + use uuid::Uuid; + + // Restore secrets in command before execution + let actual_command = self.get_secret_manager().restore_secrets_in_string(command); + + // Try streaming execution first + match self + .get_shell_session_manager() + .execute_in_session_streaming(session_id, &actual_command, timeout) + .await + { + Ok((mut rx, handle)) => { + let progress_id = Uuid::new_v4(); + let mut accumulated_output = String::new(); + + // Stream chunks as they arrive + while let Some(chunk) = rx.recv().await { + if !chunk.text.is_empty() { + // Redact secrets in chunk before streaming + let redacted_chunk = self + .get_secret_manager() + .redact_and_store_secrets(&chunk.text, None); + + accumulated_output.push_str(&redacted_chunk); + + // Send progress notification for real-time streaming + let _ = ctx + .peer + .notify_progress(ProgressNotificationParam { + progress_token: ProgressToken(NumberOrString::Number(0)), + progress: 50.0, + total: Some(100.0), + message: Some( + serde_json::to_string(&ToolCallResultProgress { + id: progress_id, + message: redacted_chunk, + }) + .unwrap_or_default(), + ), + }) + .await; + } + + if chunk.is_final { + break; + } + } + + // Wait for final result + match handle.await { + Ok(Ok(output)) => { + // Use the cleaned output from the session + let redacted_output = self + .get_secret_manager() + .redact_and_store_secrets(&output.output, None); + + Ok(CommandResult { + output: redacted_output, + exit_code: output.exit_code.unwrap_or(0), + }) + } + Ok(Err(e)) => Err(CallToolResult::error(vec![ + Content::text("SESSION_ERROR"), + Content::text(format!("Command execution failed: {}", e)), + ])), + Err(e) => Err(CallToolResult::error(vec![ + Content::text("SESSION_ERROR"), + Content::text(format!("Task join error: {}", e)), + ])), + } + } + Err(e) => { + // Fall back to non-streaming execution + tracing::warn!( + "Streaming execution failed, falling back to non-streaming: {}", + e + ); + match self + .get_shell_session_manager() + .execute_in_session(session_id, &actual_command, timeout) + .await + { + Ok(output) => { + let redacted_output = self + .get_secret_manager() + .redact_and_store_secrets(&output.output, None); + + Ok(CommandResult { + output: redacted_output, + exit_code: output.exit_code.unwrap_or(0), + }) + } + Err(e) => Err(CallToolResult::error(vec![ + Content::text("SESSION_ERROR"), + Content::text(format!("Failed to execute in session: {}", e)), + ])), + } + } + } + } + #[tool( description = "Execute a shell command asynchronously in the background on LOCAL OR REMOTE systems and return immediately with task information without waiting for completion. diff --git a/libs/mcp/server/src/tool_container.rs b/libs/mcp/server/src/tool_container.rs index 3be45fbb..72880eca 100644 --- a/libs/mcp/server/src/tool_container.rs +++ b/libs/mcp/server/src/tool_container.rs @@ -8,6 +8,7 @@ use stakpak_api::AgentProvider; use stakpak_shared::models::subagent::SubagentConfigs; use stakpak_shared::remote_connection::RemoteConnectionManager; use stakpak_shared::secret_manager::SecretManager; +use stakpak_shared::shell_session::ShellSessionManager; use stakpak_shared::task_manager::TaskManagerHandle; use std::sync::Arc; @@ -17,6 +18,7 @@ pub struct ToolContainer { pub secret_manager: SecretManager, pub task_manager: Arc, pub remote_connection_manager: Arc, + pub shell_session_manager: Arc, pub subagent_configs: Option, pub enabled_tools: EnabledToolsConfig, pub tool_router: ToolRouter, @@ -38,6 +40,7 @@ impl ToolContainer { secret_manager: SecretManager::new(redact_secrets, privacy_mode), task_manager, remote_connection_manager: Arc::new(RemoteConnectionManager::new()), + shell_session_manager: Arc::new(ShellSessionManager::with_defaults()), subagent_configs, enabled_tools, tool_router, @@ -60,6 +63,10 @@ impl ToolContainer { &self.remote_connection_manager } + pub fn get_shell_session_manager(&self) -> &Arc { + &self.shell_session_manager + } + pub fn get_subagent_configs(&self) -> &Option { &self.subagent_configs } diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 7b33962c..e185d1f1 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -37,11 +37,15 @@ stakai = { workspace = true } reqwest-middleware = { version = "0.4.2", features = ["json"] } reqwest-retry = "0.8.0" itertools = "0.14.0" +console = "0.15" # OAuth dependencies sha2 = "0.10" base64 = "0.22" urlencoding = "2.1" +[target.'cfg(any(unix, windows))'.dependencies] +portable-pty = "0.8" + [dev-dependencies] tempfile = { workspace = true} diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index ff5fa209..293518cc 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -12,6 +12,7 @@ pub mod remote_connection; pub mod remote_store; pub mod secret_manager; pub mod secrets; +pub mod shell_session; pub mod task_manager; pub mod telemetry; pub mod tls_client; diff --git a/libs/shared/src/shell_session/local.rs b/libs/shared/src/shell_session/local.rs new file mode 100644 index 00000000..fe3997f8 --- /dev/null +++ b/libs/shared/src/shell_session/local.rs @@ -0,0 +1,1093 @@ +//! Local Shell Session Implementation +//! +//! Provides persistent local shell sessions using `portable-pty` for PTY-based +//! command execution with state persistence across commands. + +use super::session::{CommandOutput, ShellSession, ShellSessionError}; +use super::{MARKER_PREFIX, MARKER_SUFFIX, ShellSessionManager, clean_shell_output}; +use async_trait::async_trait; +use std::io::{Read, Write}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; +use tracing::{debug, trace}; + +#[cfg(any(unix, windows))] +use portable_pty::{CommandBuilder, PtySize, native_pty_system}; + +/// Default PTY size +const DEFAULT_ROWS: u16 = 24; +const DEFAULT_COLS: u16 = 80; + +/// Local shell session using PTY for persistent state +pub struct LocalShellSession { + session_id: String, + #[cfg(any(unix, windows))] + inner: Arc>, + #[cfg(not(any(unix, windows)))] + _phantom: std::marker::PhantomData<()>, +} + +#[cfg(any(unix, windows))] +struct LocalShellSessionInner { + writer: Box, + reader: Box, + #[allow(dead_code)] + child: Box, + #[allow(dead_code)] + shell_path: String, + closed: bool, +} + +impl LocalShellSession { + /// Create a new local shell session + /// + /// # Arguments + /// * `shell` - Optional shell path (auto-detects if None) + /// * `cwd` - Optional working directory (uses current dir if None) + /// * `rows` - PTY rows (default: 24) + /// * `cols` - PTY columns (default: 80) + #[cfg(any(unix, windows))] + pub fn new( + shell: Option<&str>, + cwd: Option<&std::path::Path>, + rows: Option, + cols: Option, + ) -> Result { + let session_id = ShellSessionManager::generate_session_id("local"); + let shell_path = shell.map(String::from).unwrap_or_else(Self::detect_shell); + + debug!( + session_id = %session_id, + shell = %shell_path, + "Creating local shell session" + ); + + let pty_system = native_pty_system(); + let pair = pty_system + .openpty(PtySize { + rows: rows.unwrap_or(DEFAULT_ROWS), + cols: cols.unwrap_or(DEFAULT_COLS), + pixel_width: 0, + pixel_height: 0, + }) + .map_err(|e| ShellSessionError::PtyError(format!("Failed to open PTY: {}", e)))?; + + let current_dir = cwd.map(|p| p.to_path_buf()).unwrap_or_else(|| { + std::env::current_dir().unwrap_or_else(|_| { + std::env::var("HOME") + .map(std::path::PathBuf::from) + .unwrap_or_else(|_| std::path::PathBuf::from("/")) + }) + }); + + let mut cmd = CommandBuilder::new(&shell_path); + cmd.cwd(¤t_dir); + + // Start interactive shell + #[cfg(not(windows))] + cmd.args(["-i"]); + + let child = pair + .slave + .spawn_command(cmd) + .map_err(|e| ShellSessionError::SpawnFailed(format!("Failed to spawn shell: {}", e)))?; + + let writer = pair + .master + .take_writer() + .map_err(|e| ShellSessionError::PtyError(format!("Failed to get PTY writer: {}", e)))?; + + let reader = pair + .master + .try_clone_reader() + .map_err(|e| ShellSessionError::PtyError(format!("Failed to get PTY reader: {}", e)))?; + + let inner = LocalShellSessionInner { + writer, + reader, + child, + shell_path, + closed: false, + }; + + Ok(Self { + session_id, + inner: Arc::new(Mutex::new(inner)), + }) + } + + #[cfg(not(any(unix, windows)))] + pub fn new( + _shell: Option<&str>, + _cwd: Option<&std::path::Path>, + _rows: Option, + _cols: Option, + ) -> Result { + Err(ShellSessionError::PtyError( + "PTY not supported on this platform".to_string(), + )) + } + + /// Detect the default shell for the current platform + fn detect_shell() -> String { + #[cfg(windows)] + { + std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string()) + } + #[cfg(not(windows))] + { + std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string()) + } + } + + /// Generate a unique marker for command completion detection + fn generate_marker() -> String { + let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); + format!("{}{}{}", MARKER_PREFIX, &uuid[..16], MARKER_SUFFIX) + } + + /// Execute command with marker-based completion detection + #[cfg(any(unix, windows))] + async fn execute_with_marker( + &self, + command: &str, + timeout: Option, + ) -> Result { + let marker = Self::generate_marker(); + let start = Instant::now(); + + let mut inner = self.inner.lock().await; + + if inner.closed { + return Err(ShellSessionError::SessionClosed); + } + + // Note: We don't drain pending output here because the PTY reader is blocking. + // Instead, we rely on the marker-based detection to find our command's output. + + // Send command followed by marker echo + // Use a subshell to capture exit code + let full_command = format!("{}\necho \"{}\"\n", command.trim(), marker); + + inner + .writer + .write_all(full_command.as_bytes()) + .map_err(ShellSessionError::IoError)?; + inner.writer.flush().map_err(ShellSessionError::IoError)?; + + trace!(command = %command, marker = %marker, "Sent command to PTY"); + + // Read output until marker is found + // Note: PTY reader is blocking, so we use spawn_blocking to avoid blocking the async runtime + let timeout_duration = timeout.unwrap_or(Duration::from_secs(300)); + let mut output = String::new(); + + // Set a read timeout on the reader if possible (platform-specific) + // For now, we'll use a polling approach with small reads + + // Read output until marker is found + // Use polling with small sleeps to avoid blocking the async runtime + // Drop the lock and use spawn_blocking for the blocking PTY read + drop(inner); + + loop { + if start.elapsed() > timeout_duration { + return Err(ShellSessionError::Timeout(timeout_duration)); + } + + // Clone Arc for the blocking task + let inner_arc = self.inner.clone(); + let _marker_clone = marker.clone(); + + // Perform blocking read in a separate thread + let read_result = tokio::task::spawn_blocking(move || { + // Use tokio's block_in_place alternative - just block on the lock + let rt = tokio::runtime::Handle::current(); + let mut inner_guard = rt.block_on(inner_arc.lock()); + + if inner_guard.closed { + return Ok((Vec::new(), true, false)); // (data, is_closed, found_marker) + } + + let mut buf = [0u8; 4096]; + match inner_guard.reader.read(&mut buf) { + Ok(0) => Ok((Vec::new(), false, false)), // EOF + Ok(n) => { + let data = buf[..n].to_vec(); + Ok((data, false, false)) + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Ok((Vec::new(), false, false)) // No data, but not EOF + } + Err(e) => Err(e), + } + }) + .await; + + match read_result { + Ok(Ok((data, is_closed, _))) => { + if is_closed { + return Err(ShellSessionError::SessionClosed); + } + if data.is_empty() { + // No data yet or WouldBlock, sleep and retry + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + + // Count occurrences of the marker in output + // First occurrence is the echo of our command: echo "__MARKER__" + // Second occurrence is the actual output of that echo command + let marker_count = output.matches(&marker).count(); + if marker_count >= 2 { + // We've seen both the command echo and the actual marker output + break; + } + } + Ok(Err(e)) => { + return Err(ShellSessionError::IoError(e)); + } + Err(e) => { + return Err(ShellSessionError::ExecutionFailed(format!( + "Read task failed: {}", + e + ))); + } + } + } + + let duration = start.elapsed(); + + // Clean up output: remove the command echo, marker, and prompt artifacts + trace!(raw_output = %output, "Raw output before cleaning"); + let cleaned_output = clean_shell_output(&output, command, &marker); + + debug!( + session_id = %self.session_id, + duration_ms = duration.as_millis(), + output_len = cleaned_output.len(), + raw_len = output.len(), + "Command completed" + ); + + Ok(CommandOutput { + output: cleaned_output, + exit_code: None, // PTY doesn't easily give us exit codes + duration, + }) + } +} + +#[async_trait] +impl ShellSession for LocalShellSession { + #[cfg(any(unix, windows))] + async fn execute( + &self, + command: &str, + timeout: Option, + ) -> Result { + self.execute_with_marker(command, timeout).await + } + + #[cfg(not(any(unix, windows)))] + async fn execute( + &self, + _command: &str, + _timeout: Option, + ) -> Result { + Err(ShellSessionError::PtyError( + "PTY not supported on this platform".to_string(), + )) + } + + #[cfg(any(unix, windows))] + async fn execute_streaming( + &self, + command: &str, + timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + use super::session::{OutputChunk, OutputSender}; + use tokio::sync::mpsc; + + let marker = Self::generate_marker(); + let start = Instant::now(); + let timeout_duration = timeout.unwrap_or(Duration::from_secs(300)); + + // Create channel for streaming output + let (tx, rx): (OutputSender, _) = mpsc::channel(100); + + // Check if session is closed before starting + { + let inner = self.inner.lock().await; + if inner.closed { + return Err(ShellSessionError::SessionClosed); + } + } + + // Send command to PTY + { + let mut inner = self.inner.lock().await; + let full_command = format!("{}\necho \"{}\"\n", command.trim(), marker); + inner + .writer + .write_all(full_command.as_bytes()) + .map_err(ShellSessionError::IoError)?; + inner.writer.flush().map_err(ShellSessionError::IoError)?; + } + + trace!(command = %command, marker = %marker, "Sent command to PTY for streaming"); + + // Clone what we need for the spawned task + let inner_arc = self.inner.clone(); + let session_id = self.session_id.clone(); + let command_owned = command.to_string(); + let marker_clone = marker.clone(); + + // Spawn task to read output and stream it + let handle = tokio::spawn(async move { + let mut output = String::new(); + let mut last_streamed_len = 0usize; // Track what we've already streamed + let mut pending_partial = String::new(); // Buffer for incomplete lines + let mut last_send = Instant::now(); + + // Debounce: send partial content after this delay if no newline arrives + // This makes progress bars, prompts, etc. appear naturally + const PARTIAL_LINE_DELAY: Duration = Duration::from_millis(100); + // Minimum time between sends to avoid overwhelming the TUI (~60fps) + const MIN_SEND_INTERVAL: Duration = Duration::from_millis(16); + + loop { + if start.elapsed() > timeout_duration { + let _ = tx + .send(OutputChunk { + text: "\n[TIMEOUT]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::Timeout(timeout_duration)); + } + + // Clone Arc for the blocking task + let inner_arc_clone = inner_arc.clone(); + + // Perform blocking read in a separate thread + let read_result = tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + let mut inner_guard = rt.block_on(inner_arc_clone.lock()); + + if inner_guard.closed { + return Ok((Vec::new(), true)); // (data, is_closed) + } + + let mut buf = [0u8; 4096]; + match inner_guard.reader.read(&mut buf) { + Ok(0) => Ok((Vec::new(), false)), // EOF + Ok(n) => { + let data = buf[..n].to_vec(); + Ok((data, false)) + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Ok((Vec::new(), false)) + } + Err(e) => Err(e), + } + }) + .await; + + match read_result { + Ok(Ok((data, is_closed))) => { + if is_closed { + let _ = tx + .send(OutputChunk { + text: "\n[SESSION CLOSED]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::SessionClosed); + } + + let has_new_data = !data.is_empty(); + + if has_new_data { + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + } + + // Clean accumulated output + let cleaned_so_far = + clean_shell_output(&output, &command_owned, &marker_clone); + + if cleaned_so_far.len() > last_streamed_len { + let new_content = &cleaned_so_far[last_streamed_len..]; + + // Check for complete lines + if let Some(last_newline) = new_content.rfind('\n') { + let complete_lines = &new_content[..=last_newline]; + let remainder = &new_content[last_newline + 1..]; + + // Send complete lines immediately (respecting min interval) + if !complete_lines.trim().is_empty() + && last_send.elapsed() >= MIN_SEND_INTERVAL + { + let _ = tx + .send(OutputChunk { + text: complete_lines.to_string(), + is_final: false, + }) + .await; + last_send = Instant::now(); + } + + last_streamed_len += last_newline + 1; + pending_partial = remainder.to_string(); + } else { + // No newline - accumulate as pending partial + pending_partial = new_content.to_string(); + } + } + + // Send partial lines after debounce delay + // This makes progress bars, prompts, etc. appear naturally + if !pending_partial.is_empty() && last_send.elapsed() >= PARTIAL_LINE_DELAY + { + let _ = tx + .send(OutputChunk { + text: std::mem::take(&mut pending_partial), + is_final: false, + }) + .await; + last_streamed_len = cleaned_so_far.len(); + last_send = Instant::now(); + } + + // Check for marker completion + let marker_count = output.matches(&marker_clone).count(); + if marker_count >= 2 { + // Send any remaining partial content + if !pending_partial.is_empty() { + let _ = tx + .send(OutputChunk { + text: pending_partial, + is_final: false, + }) + .await; + } + break; + } + + // Adaptive sleep: shorter when actively receiving data + if !has_new_data { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + Ok(Err(e)) => { + let _ = tx + .send(OutputChunk { + text: format!("\n[IO ERROR: {}]\n", e), + is_final: true, + }) + .await; + return Err(ShellSessionError::IoError(e)); + } + Err(e) => { + let _ = tx + .send(OutputChunk { + text: format!("\n[TASK ERROR: {}]\n", e), + is_final: true, + }) + .await; + return Err(ShellSessionError::ExecutionFailed(format!( + "Read task failed: {}", + e + ))); + } + } + } + + let duration = start.elapsed(); + + // Clean up output + let cleaned_output = clean_shell_output(&output, &command_owned, &marker_clone); + + // Send final chunk + let _ = tx + .send(OutputChunk { + text: String::new(), + is_final: true, + }) + .await; + + debug!( + session_id = %session_id, + duration_ms = duration.as_millis(), + output_len = cleaned_output.len(), + "Streaming command completed" + ); + + Ok(CommandOutput { + output: cleaned_output, + exit_code: None, + duration, + }) + }); + + Ok((rx, handle)) + } + + #[cfg(not(any(unix, windows)))] + async fn execute_streaming( + &self, + _command: &str, + _timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + Err(ShellSessionError::PtyError( + "PTY not supported on this platform".to_string(), + )) + } + + #[cfg(any(unix, windows))] + async fn is_alive(&self) -> bool { + let inner = self.inner.lock().await; + !inner.closed + } + + #[cfg(not(any(unix, windows)))] + async fn is_alive(&self) -> bool { + false + } + + fn session_id(&self) -> &str { + &self.session_id + } + + fn description(&self) -> String { + format!("local:{}", self.session_id) + } + + #[cfg(any(unix, windows))] + async fn close(&mut self) -> Result<(), ShellSessionError> { + let mut inner = self.inner.lock().await; + if inner.closed { + return Ok(()); + } + + // Send exit command + let _ = inner.writer.write_all(b"exit\n"); + let _ = inner.writer.flush(); + + inner.closed = true; + debug!(session_id = %self.session_id, "Local shell session closed"); + Ok(()) + } + + #[cfg(not(any(unix, windows)))] + async fn close(&mut self) -> Result<(), ShellSessionError> { + Ok(()) + } +} + +#[cfg(test)] +#[cfg(any(unix, windows))] +mod tests { + use super::*; + + #[tokio::test] + async fn test_local_session_creation() { + let session = LocalShellSession::new(None, None, None, None); + assert!(session.is_ok(), "Should create local session"); + + let session = session.unwrap(); + assert!(session.session_id().starts_with("local-")); + assert!(session.is_alive().await); + } + + #[tokio::test] + async fn test_marker_generation() { + let marker1 = LocalShellSession::generate_marker(); + let marker2 = LocalShellSession::generate_marker(); + + assert!(marker1.starts_with(MARKER_PREFIX)); + assert!(marker1.ends_with(MARKER_SUFFIX)); + assert_ne!(marker1, marker2, "Markers should be unique"); + } + + #[tokio::test] + async fn test_clean_output() { + use super::clean_shell_output; + let raw = "echo hello\nhello\n__STAKPAK_CMD_END_abc123__\n$ "; + let cleaned = clean_shell_output(raw, "echo hello", "__STAKPAK_CMD_END_abc123__"); + assert_eq!(cleaned.trim(), "hello"); + } + + #[tokio::test] + async fn test_execute_simple_command() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // No artificial delays - shell should be ready immediately + let result = session.execute("echo hello", None).await; + + assert!(result.is_ok(), "Command should succeed: {:?}", result.err()); + let output = result.unwrap(); + assert!( + output.output.contains("hello"), + "Output should contain 'hello', got: '{}'", + output.output + ); + // Verify command completed quickly (under 5 seconds for simple echo) + assert!( + output.duration.as_secs() < 5, + "Simple echo should complete quickly, took: {:?}", + output.duration + ); + } + + #[tokio::test] + async fn test_environment_persistence() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Set an environment variable - no timeout needed for fast commands + let set_result = session + .execute("export TEST_VAR=persistent_value", None) + .await; + assert!(set_result.is_ok(), "Setting env var should succeed"); + + // Read it back in a subsequent command + let get_result = session.execute("echo $TEST_VAR", None).await; + + assert!(get_result.is_ok(), "Reading env var should succeed"); + let output = get_result.unwrap(); + assert!( + output.output.contains("persistent_value"), + "Environment variable should persist, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_working_directory_persistence() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Change directory - no timeout needed + let cd_result = session.execute("cd /tmp", None).await; + assert!(cd_result.is_ok(), "cd should succeed"); + + // Check we're still in /tmp + let pwd_result = session.execute("pwd", None).await; + + assert!(pwd_result.is_ok(), "pwd should succeed"); + let output = pwd_result.unwrap(); + assert!( + output.output.contains("/tmp") || output.output.contains("/private/tmp"), + "Working directory should persist to /tmp, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_multiple_commands_in_sequence() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Run multiple commands in sequence to verify session stays responsive + for i in 1..=5 { + let result = session + .execute(&format!("echo iteration_{}", i), None) + .await; + assert!( + result.is_ok(), + "Command {} should succeed: {:?}", + i, + result.err() + ); + let output = result.unwrap(); + assert!( + output.output.contains(&format!("iteration_{}", i)), + "Output should contain 'iteration_{}', got: '{}'", + i, + output.output + ); + } + } + + #[tokio::test] + async fn test_markers_are_transparent_across_commands() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Run multiple commands and verify NO markers leak into output + let commands = vec![ + "echo first_command", + "echo second_command", + "ls -la /tmp | head -3", + "echo third_command", + "pwd", + ]; + + for (i, cmd) in commands.iter().enumerate() { + let result = session.execute(cmd, None).await; + assert!( + result.is_ok(), + "Command {} '{}' should succeed: {:?}", + i, + cmd, + result.err() + ); + + let output = result.unwrap(); + + // CRITICAL: Verify no markers leak into output + assert!( + !output.output.contains("__STAKPAK_CMD_END_"), + "Marker should NOT appear in output for command '{}', got: '{}'", + cmd, + output.output + ); + assert!( + !output.output.contains(MARKER_PREFIX), + "Marker prefix should NOT appear in output for command '{}', got: '{}'", + cmd, + output.output + ); + assert!( + !output.output.contains(MARKER_SUFFIX), + "Marker suffix should NOT appear in output for command '{}', got: '{}'", + cmd, + output.output + ); + } + } + + #[tokio::test] + async fn test_output_clean_no_marker_contamination() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Set env var, change dir, then verify output is clean + let _ = session.execute("export CLEAN_TEST=value123", None).await; + let _ = session.execute("cd /tmp", None).await; + + // Now run a command that produces output + let result = session.execute("echo $CLEAN_TEST && pwd", None).await; + assert!(result.is_ok(), "Command should succeed"); + + let output = result.unwrap(); + + // Verify expected content is present + assert!( + output.output.contains("value123"), + "Should contain env var value" + ); + assert!( + output.output.contains("/tmp") || output.output.contains("/private/tmp"), + "Should contain pwd output" + ); + + // Verify NO implementation details leak + assert!( + !output.output.contains("__STAKPAK"), + "No internal markers should leak" + ); + assert!( + !output.output.contains("STAKPAK_CMD"), + "No internal markers should leak" + ); + + // Verify output doesn't contain the echo command for the marker + assert!( + !output.output.contains("echo \"__STAKPAK"), + "Marker echo command should not appear in output" + ); + } + + #[tokio::test] + async fn test_command_with_multiline_output() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + let result = session + .execute("echo 'line1'; echo 'line2'; echo 'line3'", None) + .await; + + assert!(result.is_ok(), "Command should succeed: {:?}", result.err()); + let output = result.unwrap(); + assert!(output.output.contains("line1"), "Should contain line1"); + assert!(output.output.contains("line2"), "Should contain line2"); + assert!(output.output.contains("line3"), "Should contain line3"); + } + + #[tokio::test] + async fn test_command_with_special_characters() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Test with quotes and special chars + let result = session.execute("echo 'hello \"world\"'", None).await; + + assert!(result.is_ok(), "Command should succeed: {:?}", result.err()); + let output = result.unwrap(); + assert!( + output.output.contains("hello") && output.output.contains("world"), + "Output should contain quoted text, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_shell_aliases_and_functions() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Define a function + let define_result = session + .execute("myfunc() { echo \"function output: $1\"; }", None) + .await; + assert!(define_result.is_ok(), "Function definition should succeed"); + + // Call the function + let call_result = session.execute("myfunc test_arg", None).await; + + assert!( + call_result.is_ok(), + "Function call should succeed: {:?}", + call_result.err() + ); + let output = call_result.unwrap(); + assert!( + output.output.contains("function output: test_arg"), + "Function should execute correctly, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_session_close_and_reopen() { + let mut session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Execute a command + let result = session.execute("echo before_close", None).await; + assert!(result.is_ok(), "Command before close should succeed"); + + // Close the session + let close_result = session.close().await; + assert!(close_result.is_ok(), "Close should succeed"); + + // Session should report as not alive + assert!( + !session.is_alive().await, + "Session should not be alive after close" + ); + + // Commands should fail on closed session + let result = session.execute("echo after_close", None).await; + assert!(result.is_err(), "Command on closed session should fail"); + } + + #[tokio::test] + async fn test_command_with_exit_code() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Run a command that succeeds + let result = session.execute("true", None).await; + assert!(result.is_ok(), "true command should succeed"); + + // Run a command that fails (false returns exit code 1) + let result = session.execute("false", None).await; + // Note: PTY doesn't easily give us exit codes, so this should still "succeed" + // in terms of execution, but the command itself returns non-zero + assert!( + result.is_ok(), + "false command should execute (even if exit code is non-zero)" + ); + } + + #[tokio::test] + async fn test_empty_command() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + // Empty command should still work (just produces empty output) + let result = session.execute("", None).await; + assert!(result.is_ok(), "Empty command should succeed"); + } + + #[tokio::test] + async fn test_command_with_pipes() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + let result = session + .execute("echo 'hello world' | tr 'a-z' 'A-Z'", None) + .await; + + assert!( + result.is_ok(), + "Piped command should succeed: {:?}", + result.err() + ); + let output = result.unwrap(); + assert!( + output.output.contains("HELLO WORLD"), + "Pipe should transform output, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_command_with_subshell() { + let session = + LocalShellSession::new(None, None, None, None).expect("Should create session"); + + let result = session.execute("echo $(echo nested)", None).await; + + assert!( + result.is_ok(), + "Subshell command should succeed: {:?}", + result.err() + ); + let output = result.unwrap(); + assert!( + output.output.contains("nested"), + "Subshell should execute, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_clean_output_removes_prompts() { + use super::clean_shell_output; + // Test standalone prompt patterns are removed + let raw = "echo test\ntest\n__MARKER__\n$ "; + let cleaned = clean_shell_output(raw, "echo test", "__MARKER__"); + assert!( + !cleaned.ends_with("$ "), + "Should remove standalone prompt, got: '{}'", + cleaned + ); + assert!(cleaned.contains("test"), "Should keep output"); + + // Test zsh-style prompt + let raw2 = "echo test\ntest\n__MARKER__\n% "; + let cleaned2 = clean_shell_output(raw2, "echo test", "__MARKER__"); + assert!( + !cleaned2.ends_with("% "), + "Should remove zsh prompt, got: '{}'", + cleaned2 + ); + } +} + +#[cfg(test)] +mod streaming_tests { + use super::*; + + #[tokio::test] + async fn test_execute_streaming_basic() { + let session = + LocalShellSession::new(None, None, None, None).expect("Failed to create session"); + + let (mut rx, handle) = session + .execute_streaming("echo 'hello streaming'", Some(Duration::from_secs(10))) + .await + .expect("Failed to start streaming"); + + // Collect all chunks + let mut chunks = Vec::new(); + while let Some(chunk) = rx.recv().await { + println!("Received chunk: {:?}", chunk); + chunks.push(chunk.clone()); + if chunk.is_final { + break; + } + } + + // Wait for completion + let result = handle.await.expect("Task panicked"); + let output = result.expect("Command failed"); + + println!("Final cleaned output: {:?}", output.output); + + // Verify the final output is clean (no ANSI codes, no prompts) + assert!( + output.output.contains("hello streaming"), + "Output should contain 'hello streaming', got: {}", + output.output + ); + assert!( + !output.output.contains("\x1b["), + "Output should not contain ANSI codes" + ); + assert!( + !output.output.contains("@"), + "Output should not contain prompt (user@host)" + ); + assert!( + !chunks.is_empty(), + "Should have received at least one chunk" + ); + + // Verify streamed chunks are also clean + for chunk in &chunks { + if !chunk.is_final && !chunk.text.is_empty() { + assert!( + !chunk.text.contains("\x1b["), + "Streamed chunk should not contain ANSI codes: {:?}", + chunk.text + ); + } + } + } + + #[tokio::test] + async fn test_execute_streaming_multiline() { + let session = + LocalShellSession::new(None, None, None, None).expect("Failed to create session"); + + let (mut rx, handle) = session + .execute_streaming( + "for i in 1 2 3; do echo \"line $i\"; done", + Some(Duration::from_secs(10)), + ) + .await + .expect("Failed to start streaming"); + + // Collect chunks + let mut all_text = String::new(); + while let Some(chunk) = rx.recv().await { + all_text.push_str(&chunk.text); + if chunk.is_final { + break; + } + } + + let result = handle.await.expect("Task panicked"); + let output = result.expect("Command failed"); + + println!("Streamed text: {:?}", all_text); + println!("Final output: {:?}", output.output); + + assert!(output.output.contains("line 1"), "Should contain line 1"); + assert!(output.output.contains("line 2"), "Should contain line 2"); + assert!(output.output.contains("line 3"), "Should contain line 3"); + } +} diff --git a/libs/shared/src/shell_session/manager.rs b/libs/shared/src/shell_session/manager.rs new file mode 100644 index 00000000..736bbf87 --- /dev/null +++ b/libs/shared/src/shell_session/manager.rs @@ -0,0 +1,691 @@ +//! Shell Session Manager +//! +//! Central manager for creating, tracking, and destroying persistent shell sessions. + +use super::session::{ShellSession, ShellSessionError}; +use crate::helper::generate_simple_id; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +/// Configuration for shell sessions +#[derive(Debug, Clone)] +pub struct ShellSessionConfig { + /// Enable persistent shell sessions + pub enabled: bool, + + /// Default shell for local sessions (auto-detect if None) + pub default_shell: Option, + + /// Session timeout in seconds (0 = no timeout) + pub session_timeout: Duration, + + /// Maximum concurrent sessions + pub max_sessions: usize, + + /// Default command timeout in seconds + pub command_timeout: Duration, +} + +impl Default for ShellSessionConfig { + fn default() -> Self { + Self { + enabled: true, + default_shell: None, + session_timeout: Duration::from_secs(3600), // 1 hour + max_sessions: 10, + command_timeout: Duration::from_secs(300), // 5 minutes + } + } +} + +/// Information about a managed session +#[derive(Debug, Clone)] +pub struct SessionInfo { + pub id: String, + pub description: String, + pub is_remote: bool, + pub created_at: std::time::Instant, + pub last_used: std::time::Instant, +} + +/// Central manager for shell sessions +/// +/// Handles session lifecycle including creation, lookup, and cleanup. +/// Thread-safe and can be shared across multiple tool invocations. +pub struct ShellSessionManager { + /// Active sessions indexed by session ID + sessions: RwLock>>>>, + + /// Session metadata for quick lookups without locking sessions + session_info: RwLock>, + + /// Configuration + config: ShellSessionConfig, +} + +impl ShellSessionManager { + /// Create a new session manager with the given configuration + pub fn new(config: ShellSessionConfig) -> Self { + Self { + sessions: RwLock::new(HashMap::new()), + session_info: RwLock::new(HashMap::new()), + config, + } + } + + /// Create a new session manager with default configuration + pub fn with_defaults() -> Self { + Self::new(ShellSessionConfig::default()) + } + + /// Get the current configuration + pub fn config(&self) -> &ShellSessionConfig { + &self.config + } + + /// Check if sessions are enabled + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + /// Get the number of active sessions + pub async fn session_count(&self) -> usize { + self.sessions.read().await.len() + } + + /// Default session ID for local commands + const DEFAULT_LOCAL_SESSION_ID: &'static str = "default-local"; + + /// Default session ID prefix for remote commands + const DEFAULT_REMOTE_SESSION_PREFIX: &'static str = "default-remote-"; + + /// Get or create the default local session + /// + /// This provides automatic persistent shell behavior without explicit session management. + pub async fn get_or_create_default_local_session(&self) -> Result { + // Check if default session already exists + if self + .get_session(Self::DEFAULT_LOCAL_SESSION_ID) + .await + .is_some() + { + return Ok(Self::DEFAULT_LOCAL_SESSION_ID.to_string()); + } + + // Create new default local session + let session = super::LocalShellSession::new( + self.config.default_shell.as_deref(), + None, // Use current directory + None, // Default rows + None, // Default cols + )?; + + // Override the session ID to use our default ID + let session_with_id = DefaultIdLocalSession { + inner: session, + id: Self::DEFAULT_LOCAL_SESSION_ID.to_string(), + }; + + self.register_session(Box::new(session_with_id), false) + .await?; + Ok(Self::DEFAULT_LOCAL_SESSION_ID.to_string()) + } + + /// Get or create a default remote session for a given connection string + /// + /// Each unique remote connection gets its own default session. + pub async fn get_or_create_default_remote_session( + &self, + connection_string: &str, + password: Option, + private_key_path: Option, + ) -> Result { + // Create a deterministic session ID based on connection string + let session_id = format!( + "{}{}", + Self::DEFAULT_REMOTE_SESSION_PREFIX, + connection_string.replace(['@', ':', '.'], "-") + ); + + // Check if session already exists + if self.get_session(&session_id).await.is_some() { + return Ok(session_id); + } + + // Create new remote session + let connection_info = crate::remote_connection::RemoteConnectionInfo { + connection_string: connection_string.to_string(), + password, + private_key_path, + }; + + let session = super::RemoteShellSession::new(connection_info, None, None).await?; + + // Override the session ID to use our default ID + let session_with_id = DefaultIdRemoteSession { + inner: session, + id: session_id.clone(), + }; + + self.register_session(Box::new(session_with_id), true) + .await?; + Ok(session_id) + } + + /// Register a new session with the manager + /// + /// # Arguments + /// * `session` - The session to register + /// * `is_remote` - Whether this is a remote (SSH) session + /// + /// # Returns + /// * `Ok(session_id)` - The ID assigned to the session + /// * `Err` - If max sessions reached or registration fails + pub async fn register_session( + &self, + session: Box, + is_remote: bool, + ) -> Result { + // Check session limit + let current_count = self.session_count().await; + if current_count >= self.config.max_sessions { + return Err(ShellSessionError::ExecutionFailed(format!( + "Maximum session limit ({}) reached. Close unused sessions first.", + self.config.max_sessions + ))); + } + + let session_id = session.session_id().to_string(); + let description = session.description(); + let now = std::time::Instant::now(); + + let info = SessionInfo { + id: session_id.clone(), + description, + is_remote, + created_at: now, + last_used: now, + }; + + // Store session and info + { + let mut sessions = self.sessions.write().await; + let mut session_info = self.session_info.write().await; + + sessions.insert(session_id.clone(), Arc::new(RwLock::new(session))); + session_info.insert(session_id.clone(), info); + } + + info!(session_id = %session_id, is_remote = is_remote, "Registered new shell session"); + Ok(session_id) + } + + /// Get a session by ID + /// + /// Returns None if session doesn't exist or has been closed. + pub async fn get_session( + &self, + session_id: &str, + ) -> Option>>> { + let sessions = self.sessions.read().await; + sessions.get(session_id).cloned() + } + + /// Execute a command in a specific session + /// + /// Updates the last_used timestamp on successful execution. + pub async fn execute_in_session( + &self, + session_id: &str, + command: &str, + timeout: Option, + ) -> Result { + let session = self.get_session(session_id).await.ok_or_else(|| { + ShellSessionError::SessionDead(format!("Session {} not found", session_id)) + })?; + + let timeout = timeout.unwrap_or(self.config.command_timeout); + + // Execute command + let result = { + let session_guard = session.read().await; + session_guard.execute(command, Some(timeout)).await + }; + + // Update last_used on success + if result.is_ok() { + let mut session_info = self.session_info.write().await; + if let Some(info) = session_info.get_mut(session_id) { + info.last_used = std::time::Instant::now(); + } + } + + result + } + + /// Execute a command in a session with streaming output + /// + /// Returns a receiver for output chunks and a join handle for the final result. + pub async fn execute_in_session_streaming( + &self, + session_id: &str, + command: &str, + timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + let session = self.get_session(session_id).await.ok_or_else(|| { + ShellSessionError::SessionDead(format!("Session {} not found", session_id)) + })?; + + let timeout = timeout.unwrap_or(self.config.command_timeout); + + // Start streaming execution + let (rx, handle) = { + let session_guard = session.read().await; + session_guard + .execute_streaming(command, Some(timeout)) + .await? + }; + + Ok((rx, handle)) + } + + /// Close and remove a session + pub async fn close_session(&self, session_id: &str) -> Result<(), ShellSessionError> { + // Remove from maps + let session = { + let mut sessions = self.sessions.write().await; + let mut session_info = self.session_info.write().await; + + session_info.remove(session_id); + sessions.remove(session_id) + }; + + // Close the session if it existed + if let Some(session) = session { + let mut session_guard = session.write().await; + session_guard.close().await?; + info!(session_id = %session_id, "Closed shell session"); + Ok(()) + } else { + Err(ShellSessionError::SessionDead(format!( + "Session {} not found", + session_id + ))) + } + } + + /// List all active sessions + pub async fn list_sessions(&self) -> Vec { + let session_info = self.session_info.read().await; + session_info.values().cloned().collect() + } + + /// Close all sessions + pub async fn close_all_sessions(&self) { + let session_ids: Vec = { + let sessions = self.sessions.read().await; + sessions.keys().cloned().collect() + }; + + for session_id in session_ids { + if let Err(e) = self.close_session(&session_id).await { + warn!(session_id = %session_id, error = %e, "Failed to close session during cleanup"); + } + } + + info!("Closed all shell sessions"); + } + + /// Clean up timed-out sessions + /// + /// Should be called periodically to remove stale sessions. + pub async fn cleanup_timed_out_sessions(&self) { + if self.config.session_timeout.is_zero() { + return; // No timeout configured + } + + let now = std::time::Instant::now(); + let timeout = self.config.session_timeout; + + let timed_out: Vec = { + let session_info = self.session_info.read().await; + session_info + .iter() + .filter(|(_, info)| now.duration_since(info.last_used) > timeout) + .map(|(id, _)| id.clone()) + .collect() + }; + + for session_id in timed_out { + debug!(session_id = %session_id, "Closing timed-out session"); + if let Err(e) = self.close_session(&session_id).await { + warn!(session_id = %session_id, error = %e, "Failed to close timed-out session"); + } + } + } + + /// Generate a unique session ID + pub fn generate_session_id(prefix: &str) -> String { + format!("{}-{}", prefix, generate_simple_id(8)) + } +} + +impl Drop for ShellSessionManager { + fn drop(&mut self) { + // Note: We can't do async cleanup in Drop, but sessions will be cleaned up + // when their Arc references are dropped. For proper cleanup, call close_all_sessions() + // before dropping the manager. + debug!("ShellSessionManager dropped"); + } +} + +/// Wrapper to override session ID for default local sessions +struct DefaultIdLocalSession { + inner: super::LocalShellSession, + id: String, +} + +#[async_trait::async_trait] +impl ShellSession for DefaultIdLocalSession { + async fn execute( + &self, + command: &str, + timeout: Option, + ) -> Result { + self.inner.execute(command, timeout).await + } + + async fn is_alive(&self) -> bool { + self.inner.is_alive().await + } + + fn session_id(&self) -> &str { + &self.id + } + + fn description(&self) -> String { + format!("Default local shell ({})", self.inner.description()) + } + + async fn close(&mut self) -> Result<(), ShellSessionError> { + self.inner.close().await + } + + async fn execute_streaming( + &self, + command: &str, + timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + self.inner.execute_streaming(command, timeout).await + } +} + +/// Wrapper to override session ID for default remote sessions +struct DefaultIdRemoteSession { + inner: super::RemoteShellSession, + id: String, +} + +#[async_trait::async_trait] +impl ShellSession for DefaultIdRemoteSession { + async fn execute( + &self, + command: &str, + timeout: Option, + ) -> Result { + self.inner.execute(command, timeout).await + } + + async fn is_alive(&self) -> bool { + self.inner.is_alive().await + } + + fn session_id(&self) -> &str { + &self.id + } + + fn description(&self) -> String { + format!("Default remote shell ({})", self.inner.description()) + } + + async fn close(&mut self) -> Result<(), ShellSessionError> { + self.inner.close().await + } + + async fn execute_streaming( + &self, + command: &str, + timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + self.inner.execute_streaming(command, timeout).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = ShellSessionConfig::default(); + assert!(config.enabled); + assert_eq!(config.max_sessions, 10); + assert_eq!(config.session_timeout, Duration::from_secs(3600)); + assert_eq!(config.command_timeout, Duration::from_secs(300)); + } + + #[test] + fn test_generate_session_id() { + let id1 = ShellSessionManager::generate_session_id("local"); + let id2 = ShellSessionManager::generate_session_id("local"); + + assert!(id1.starts_with("local-")); + assert!(id2.starts_with("local-")); + assert_ne!(id1, id2); // Should be unique + } + + #[tokio::test] + async fn test_manager_creation() { + let manager = ShellSessionManager::with_defaults(); + assert!(manager.is_enabled()); + assert_eq!(manager.session_count().await, 0); + } + + #[tokio::test] + async fn test_default_local_session_creation() { + let manager = ShellSessionManager::with_defaults(); + + // Create default local session + let session_id = manager.get_or_create_default_local_session().await; + assert!( + session_id.is_ok(), + "Should create default local session: {:?}", + session_id.err() + ); + + let session_id = session_id.unwrap(); + assert_eq!( + session_id, "default-local", + "Should use default local session ID" + ); + assert_eq!(manager.session_count().await, 1); + + // Getting again should return same session + let session_id2 = manager.get_or_create_default_local_session().await; + assert!(session_id2.is_ok()); + assert_eq!(session_id2.unwrap(), "default-local"); + assert_eq!( + manager.session_count().await, + 1, + "Should not create duplicate session" + ); + } + + #[tokio::test] + async fn test_execute_in_session() { + let manager = ShellSessionManager::with_defaults(); + + let session_id = manager + .get_or_create_default_local_session() + .await + .expect("Should create session"); + + // Execute a command + let result = manager + .execute_in_session(&session_id, "echo test_output", None) + .await; + assert!(result.is_ok(), "Command should succeed: {:?}", result.err()); + + let output = result.unwrap(); + assert!( + output.output.contains("test_output"), + "Output should contain test_output, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_session_state_persistence_via_manager() { + let manager = ShellSessionManager::with_defaults(); + + let session_id = manager + .get_or_create_default_local_session() + .await + .expect("Should create session"); + + // Set environment variable + let set_result = manager + .execute_in_session(&session_id, "export MANAGER_TEST_VAR=manager_value", None) + .await; + assert!(set_result.is_ok(), "Setting env var should succeed"); + + // Read it back + let get_result = manager + .execute_in_session(&session_id, "echo $MANAGER_TEST_VAR", None) + .await; + assert!(get_result.is_ok(), "Reading env var should succeed"); + + let output = get_result.unwrap(); + assert!( + output.output.contains("manager_value"), + "Environment variable should persist through manager, got: '{}'", + output.output + ); + } + + #[tokio::test] + async fn test_close_session() { + let manager = ShellSessionManager::with_defaults(); + + let session_id = manager + .get_or_create_default_local_session() + .await + .expect("Should create session"); + + assert_eq!(manager.session_count().await, 1); + + // Close the session + let close_result = manager.close_session(&session_id).await; + assert!(close_result.is_ok(), "Close should succeed"); + assert_eq!(manager.session_count().await, 0); + + // Executing on closed session should fail + let result = manager + .execute_in_session(&session_id, "echo test", None) + .await; + assert!(result.is_err(), "Should fail on closed session"); + } + + #[tokio::test] + async fn test_list_sessions() { + let manager = ShellSessionManager::with_defaults(); + + // Initially empty + let sessions = manager.list_sessions().await; + assert!(sessions.is_empty()); + + // Create a session + let _ = manager + .get_or_create_default_local_session() + .await + .expect("Should create session"); + + let sessions = manager.list_sessions().await; + assert_eq!(sessions.len(), 1); + assert_eq!(sessions[0].id, "default-local"); + assert!(!sessions[0].is_remote); + } + + #[tokio::test] + async fn test_max_sessions_limit() { + let config = ShellSessionConfig { + max_sessions: 2, + ..Default::default() + }; + let manager = ShellSessionManager::new(config); + + // Create first session + let _ = manager + .get_or_create_default_local_session() + .await + .expect("Should create first session"); + + // Create second session by registering directly + let session2 = super::super::LocalShellSession::new(None, None, None, None) + .expect("Should create local session"); + let result = manager.register_session(Box::new(session2), false).await; + assert!(result.is_ok(), "Should create second session"); + + // Third session should fail + let session3 = super::super::LocalShellSession::new(None, None, None, None) + .expect("Should create local session"); + let result = manager.register_session(Box::new(session3), false).await; + assert!(result.is_err(), "Should fail when max sessions reached"); + } + + #[tokio::test] + async fn test_close_all_sessions() { + let manager = ShellSessionManager::with_defaults(); + + // Create default session + let _ = manager + .get_or_create_default_local_session() + .await + .expect("Should create session"); + + // Create another session + let session2 = super::super::LocalShellSession::new(None, None, None, None) + .expect("Should create local session"); + let _ = manager + .register_session(Box::new(session2), false) + .await + .expect("Should register session"); + + assert_eq!(manager.session_count().await, 2); + + // Close all + manager.close_all_sessions().await; + assert_eq!(manager.session_count().await, 0); + } +} diff --git a/libs/shared/src/shell_session/mod.rs b/libs/shared/src/shell_session/mod.rs new file mode 100644 index 00000000..1c562b59 --- /dev/null +++ b/libs/shared/src/shell_session/mod.rs @@ -0,0 +1,126 @@ +//! Persistent Shell Session Management +//! +//! This module provides persistent shell sessions that maintain state (environment variables, +//! working directory, aliases) across multiple command executions. +//! +//! # Architecture +//! +//! - `ShellSession` trait: Common interface for local and remote sessions +//! - `LocalShellSession`: PTY-based local shell using `portable-pty` +//! - `RemoteShellSession`: SSH-based remote shell using `russh` with PTY allocation +//! - `ShellSessionManager`: Central manager for session lifecycle + +mod local; +mod manager; +mod remote; +mod session; + +pub use local::LocalShellSession; +pub use manager::{SessionInfo, ShellSessionConfig, ShellSessionManager}; +pub use remote::RemoteShellSession; +pub use session::{CommandOutput, ShellSession, ShellSessionError}; + +/// Marker prefix for command completion detection (shared between local and remote) +pub(crate) const MARKER_PREFIX: &str = "__STAKPAK_CMD_END_"; +pub(crate) const MARKER_SUFFIX: &str = "__"; + +/// Clean shell output by removing command echo, markers, and shell artifacts +/// +/// This is shared logic used by both local and remote sessions +pub(crate) fn clean_shell_output(raw_output: &str, command: &str, marker: &str) -> String { + // First, strip ANSI escape codes + let stripped = console::strip_ansi_codes(raw_output); + + // Remove control characters that PTY emits + let cleaned = stripped.replace('\r', "").replace('\x08', ""); // backspace + + let mut lines: Vec<&str> = cleaned.lines().collect(); + + // Remove lines containing ANY marker (current or leftover from previous commands) + // This ensures our implementation is transparent even if buffer has leftover data + lines.retain(|line| { + !line.contains(marker) + && !line.contains(MARKER_PREFIX) + && !line.contains("__STAKPAK_CMD_END_") + }); + + // Remove the echoed command (first line often contains it) + if let Some(first) = lines.first() + && (first.trim() == command.trim() || first.contains(command.trim())) + { + lines.remove(0); + } + + // Remove empty lines at start and end + while lines.first().map(|l| l.trim().is_empty()).unwrap_or(false) { + lines.remove(0); + } + while lines.last().map(|l| l.trim().is_empty()).unwrap_or(false) { + lines.pop(); + } + + // Remove shell prompt lines (common patterns) + lines.retain(|line| { + let trimmed = line.trim(); + // Skip typical prompt patterns + !is_prompt_line(trimmed) + }); + + lines.join("\n") +} + +/// Check if a line looks like a shell prompt or contains prompt + echoed command +fn is_prompt_line(trimmed: &str) -> bool { + // Empty or whitespace-only + if trimmed.is_empty() { + return false; // Let caller decide on empty lines + } + + // Lines that are just whitespace/control chars (leftover from ANSI stripping) + if trimmed.chars().all(|c| c.is_whitespace() || c == '\r') { + return true; + } + + // Standalone prompt characters + if trimmed == "$" || trimmed == "#" || trimmed == ">" || trimmed == "%" { + return true; + } + + // Check for user@host pattern which indicates a prompt line + // This catches: "user@host dir % command" or "user@host:dir$ command" + if trimmed.contains('@') { + // Look for prompt endings: % $ # > + // The prompt is typically: user@host path % or user@host:path$ + for prompt_char in ['%', '$', '#'] { + if let Some(pos) = trimmed.rfind(prompt_char) { + // Check if this looks like a prompt (has @ before the prompt char) + let before_prompt = &trimmed[..pos]; + if before_prompt.contains('@') { + return true; + } + } + } + } + + // Lines ending with prompt patterns (standalone prompts) + if trimmed.ends_with("$ ") + || trimmed.ends_with("# ") + || trimmed.ends_with("> ") + || trimmed.ends_with("% ") + || trimmed.ends_with('$') + || trimmed.ends_with('#') + || trimmed.ends_with('%') + { + // Short lines ending with prompt chars are likely prompts + if trimmed.len() < 50 || trimmed.starts_with('[') { + return true; + } + } + + // Bash-style [user@host dir]$ pattern + if trimmed.starts_with('[') && (trimmed.contains("]$") || trimmed.contains("]#")) { + return true; + } + + false +} diff --git a/libs/shared/src/shell_session/remote.rs b/libs/shared/src/shell_session/remote.rs new file mode 100644 index 00000000..69f1424f --- /dev/null +++ b/libs/shared/src/shell_session/remote.rs @@ -0,0 +1,731 @@ +//! Remote Shell Session Implementation +//! +//! Provides persistent remote shell sessions over SSH using `russh` with PTY allocation +//! for state persistence across commands. + +use super::ShellSessionManager; +use super::clean_shell_output; +use super::session::{CommandOutput, ShellSession, ShellSessionError}; +use crate::remote_connection::RemoteConnectionInfo; +use async_trait::async_trait; +use russh::client::{self, Handler}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::sync::Mutex; +use tracing::{debug, trace}; + +/// Marker prefix for command completion detection +const MARKER_PREFIX: &str = "__STAKPAK_CMD_END_"; +const MARKER_SUFFIX: &str = "__"; + +/// Default PTY size for remote sessions +const DEFAULT_ROWS: u32 = 24; +const DEFAULT_COLS: u32 = 80; + +/// SSH client handler for remote shell sessions +struct RemoteShellHandler; + +impl Handler for RemoteShellHandler { + type Error = russh::Error; + + async fn check_server_key( + &mut self, + _server_public_key: &russh::keys::PublicKey, + ) -> Result { + // Accept all keys (same as RemoteConnection behavior) + Ok(true) + } +} + +/// Remote shell session using SSH with PTY allocation +pub struct RemoteShellSession { + session_id: String, + connection_info: RemoteConnectionInfo, + inner: Arc>, +} + +struct RemoteShellSessionInner { + channel: russh::Channel, + #[allow(dead_code)] + session: client::Handle, + closed: bool, +} + +impl RemoteShellSession { + /// Create a new remote shell session + /// + /// # Arguments + /// * `connection_info` - SSH connection details + /// * `rows` - PTY rows (default: 24) + /// * `cols` - PTY columns (default: 80) + pub async fn new( + connection_info: RemoteConnectionInfo, + rows: Option, + cols: Option, + ) -> Result { + let session_id = ShellSessionManager::generate_session_id("remote"); + + debug!( + session_id = %session_id, + connection = %connection_info.connection_string, + "Creating remote shell session" + ); + + // Parse connection string + let parsed = Self::parse_connection_string(&connection_info.connection_string)?; + + // Create SSH session + let config = client::Config::default(); + let mut session = client::connect( + config.into(), + (parsed.hostname.as_str(), parsed.port), + RemoteShellHandler {}, + ) + .await + .map_err(|e| ShellSessionError::SshError(format!("Connection failed: {}", e)))?; + + // Authenticate + Self::authenticate(&mut session, &parsed.username, &connection_info).await?; + + // Open channel with PTY + let channel = session + .channel_open_session() + .await + .map_err(|e| ShellSessionError::SshError(format!("Failed to open channel: {}", e)))?; + + // Request PTY + channel + .request_pty( + true, + "xterm-256color", + cols.unwrap_or(DEFAULT_COLS), + rows.unwrap_or(DEFAULT_ROWS), + 0, + 0, + &[], + ) + .await + .map_err(|e| ShellSessionError::SshError(format!("Failed to request PTY: {}", e)))?; + + // Request shell + channel + .request_shell(true) + .await + .map_err(|e| ShellSessionError::SshError(format!("Failed to request shell: {}", e)))?; + + // Wait for initial prompt + tokio::time::sleep(Duration::from_millis(500)).await; + + let inner = RemoteShellSessionInner { + channel, + session, + closed: false, + }; + + debug!(session_id = %session_id, "Remote shell session created"); + + Ok(Self { + session_id, + connection_info, + inner: Arc::new(Mutex::new(inner)), + }) + } + + /// Parse connection string into components + fn parse_connection_string( + connection_string: &str, + ) -> Result { + let (username, host_port) = connection_string.split_once('@').ok_or_else(|| { + ShellSessionError::SshError( + "Invalid connection string format. Expected: user@host or user@host:port" + .to_string(), + ) + })?; + + let (hostname, port) = if let Some((host, port_str)) = host_port.split_once(':') { + let port = port_str + .parse::() + .map_err(|_| ShellSessionError::SshError(format!("Invalid port: {}", port_str)))?; + (host.to_string(), port) + } else { + (host_port.to_string(), 22) + }; + + Ok(ParsedConnection { + username: username.to_string(), + hostname, + port, + }) + } + + /// Authenticate the SSH session + async fn authenticate( + session: &mut client::Handle, + username: &str, + connection_info: &RemoteConnectionInfo, + ) -> Result<(), ShellSessionError> { + if let Some(password) = &connection_info.password { + let auth_result = session + .authenticate_password(username, password) + .await + .map_err(|e| ShellSessionError::SshError(format!("Password auth failed: {}", e)))?; + + match auth_result { + russh::client::AuthResult::Success => Ok(()), + _ => Err(ShellSessionError::SshError( + "Password authentication failed".to_string(), + )), + } + } else { + // Use public key authentication + let private_key_path = if let Some(path) = &connection_info.private_key_path { + crate::remote_connection::RemoteConnection::canonicalize_key_path(path) + .map_err(|e| ShellSessionError::SshError(format!("Key path error: {}", e)))? + } else { + crate::remote_connection::RemoteConnection::get_default_key_files() + .map_err(|e| ShellSessionError::SshError(format!("No SSH key found: {}", e)))? + .0 + }; + + let keypair = russh::keys::load_secret_key(&private_key_path, None) + .map_err(|e| ShellSessionError::SshError(format!("Failed to load key: {}", e)))?; + + let auth_result = session + .authenticate_publickey( + username, + russh::keys::PrivateKeyWithHashAlg::new( + Arc::new(keypair), + Some(russh::keys::HashAlg::Sha256), + ), + ) + .await + .map_err(|e| { + ShellSessionError::SshError(format!("Public key auth failed: {}", e)) + })?; + + match auth_result { + russh::client::AuthResult::Success => Ok(()), + _ => Err(ShellSessionError::SshError( + "Public key authentication failed".to_string(), + )), + } + } + } + + /// Generate a unique marker for command completion detection + fn generate_marker() -> String { + let uuid = uuid::Uuid::new_v4().to_string().replace("-", ""); + format!("{}{}{}", MARKER_PREFIX, &uuid[..16], MARKER_SUFFIX) + } + + /// Execute command with marker-based completion detection + async fn execute_with_marker( + &self, + command: &str, + timeout: Option, + ) -> Result { + let marker = Self::generate_marker(); + let start = Instant::now(); + + let mut inner = self.inner.lock().await; + + if inner.closed { + return Err(ShellSessionError::SessionClosed); + } + + // Send command followed by marker echo + let full_command = format!("{}\necho \"{}\"\n", command.trim(), marker); + + inner + .channel + .data(full_command.as_bytes()) + .await + .map_err(|e| ShellSessionError::SshError(format!("Failed to send command: {}", e)))?; + + trace!(command = %command, marker = %marker, "Sent command to remote PTY"); + + // Read output until marker is found + let timeout_duration = timeout.unwrap_or(Duration::from_secs(300)); + let mut output = String::new(); + + loop { + if start.elapsed() > timeout_duration { + return Err(ShellSessionError::Timeout(timeout_duration)); + } + + // Wait for channel messages with timeout + let wait_result = + tokio::time::timeout(Duration::from_millis(100), inner.channel.wait()).await; + + match wait_result { + Ok(Some(msg)) => { + match msg { + russh::ChannelMsg::Data { data } => { + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + + // Check if marker is in output + if output.contains(&marker) { + break; + } + } + russh::ChannelMsg::ExtendedData { data, .. } => { + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + + if output.contains(&marker) { + break; + } + } + russh::ChannelMsg::Eof => { + return Err(ShellSessionError::SessionDead( + "Remote shell closed".to_string(), + )); + } + russh::ChannelMsg::Close => { + inner.closed = true; + return Err(ShellSessionError::SessionDead( + "Remote channel closed".to_string(), + )); + } + _ => {} + } + } + Ok(None) => { + // Channel closed + inner.closed = true; + return Err(ShellSessionError::SessionDead( + "Remote channel closed unexpectedly".to_string(), + )); + } + Err(_) => { + // Timeout on wait, continue loop + continue; + } + } + } + + let duration = start.elapsed(); + + // Clean up output + let cleaned_output = Self::clean_output(&output, command, &marker); + + debug!( + session_id = %self.session_id, + duration_ms = duration.as_millis(), + output_len = cleaned_output.len(), + "Remote command completed" + ); + + Ok(CommandOutput { + output: cleaned_output, + exit_code: None, // PTY doesn't easily give us exit codes + duration, + }) + } + + /// Clean output by removing command echo, marker, and shell artifacts + fn clean_output(raw_output: &str, command: &str, marker: &str) -> String { + // First, strip ANSI escape codes + let stripped = console::strip_ansi_codes(raw_output); + + let mut lines: Vec<&str> = stripped.lines().collect(); + + // Remove lines containing ANY marker (current or leftover from previous commands) + // This ensures our implementation is transparent even if buffer has leftover data + lines.retain(|line| { + !line.contains(marker) + && !line.contains(MARKER_PREFIX) + && !line.contains("__STAKPAK_CMD_END_") + }); + + // Remove the echoed command (first line often contains it) + if let Some(first) = lines.first() + && (first.trim() == command.trim() || first.contains(command.trim())) + { + lines.remove(0); + } + + // Remove empty lines at start and end + while lines.first().map(|l| l.trim().is_empty()).unwrap_or(false) { + lines.remove(0); + } + while lines.last().map(|l| l.trim().is_empty()).unwrap_or(false) { + lines.pop(); + } + + // Remove shell prompt lines (common patterns) + lines.retain(|line| { + let trimmed = line.trim(); + !(trimmed == "$" + || trimmed == "#" + || trimmed == ">" + || trimmed.ends_with("$ ") + || trimmed.ends_with("# ") + || trimmed.ends_with("> ") + || (trimmed.starts_with("[") && trimmed.contains("]$"))) + }); + + lines.join("\n") + } +} + +struct ParsedConnection { + username: String, + hostname: String, + port: u16, +} + +#[async_trait] +impl ShellSession for RemoteShellSession { + async fn execute( + &self, + command: &str, + timeout: Option, + ) -> Result { + self.execute_with_marker(command, timeout).await + } + + async fn execute_streaming( + &self, + command: &str, + timeout: Option, + ) -> Result< + ( + super::session::OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + > { + use super::session::{OutputChunk, OutputSender}; + use tokio::sync::mpsc; + + let marker = Self::generate_marker(); + let start = Instant::now(); + let timeout_duration = timeout.unwrap_or(Duration::from_secs(300)); + + // Create channel for streaming output + let (tx, rx): (OutputSender, _) = mpsc::channel(100); + + // Check if session is closed and send command + { + let inner = self.inner.lock().await; + if inner.closed { + return Err(ShellSessionError::SessionClosed); + } + + // Send command followed by marker echo + let full_command = format!("{}\necho \"{}\"\n", command.trim(), marker); + inner + .channel + .data(full_command.as_bytes()) + .await + .map_err(|e| { + ShellSessionError::SshError(format!("Failed to send command: {}", e)) + })?; + } + + trace!(command = %command, marker = %marker, "Sent command to remote for streaming"); + + // Clone what we need for the spawned task + let inner_arc = self.inner.clone(); + let session_id = self.session_id.clone(); + let command_owned = command.to_string(); + let marker_clone = marker.clone(); + + // Spawn task to read output and stream it + let handle = tokio::spawn(async move { + let mut output = String::new(); + let mut last_streamed_len = 0usize; // Track what we've already streamed + + loop { + if start.elapsed() > timeout_duration { + let _ = tx + .send(OutputChunk { + text: "\n[TIMEOUT]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::Timeout(timeout_duration)); + } + + let mut inner = inner_arc.lock().await; + + if inner.closed { + let _ = tx + .send(OutputChunk { + text: "\n[SESSION CLOSED]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::SessionClosed); + } + + // Wait for data with timeout + match tokio::time::timeout(Duration::from_millis(100), inner.channel.wait()).await { + Ok(Some(msg)) => { + match msg { + russh::ChannelMsg::Data { data } => { + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + + // Clean the entire accumulated output and stream only new complete lines + let cleaned_so_far = + clean_shell_output(&output, &command_owned, &marker_clone); + if cleaned_so_far.len() > last_streamed_len { + let new_content = &cleaned_so_far[last_streamed_len..]; + // Only stream complete lines to avoid flicker + if let Some(last_newline) = new_content.rfind('\n') { + let complete_lines = &new_content[..=last_newline]; + if !complete_lines.trim().is_empty() { + let _ = tx + .send(OutputChunk { + text: complete_lines.to_string(), + is_final: false, + }) + .await; + } + last_streamed_len += last_newline + 1; + } + } + + // Check for marker completion + let marker_count = output.matches(&marker_clone).count(); + if marker_count >= 2 { + break; + } + } + russh::ChannelMsg::ExtendedData { data, .. } => { + let chunk = String::from_utf8_lossy(&data); + output.push_str(&chunk); + + // Clean and stream stderr using same approach + let cleaned_so_far = + clean_shell_output(&output, &command_owned, &marker_clone); + if cleaned_so_far.len() > last_streamed_len { + let new_content = &cleaned_so_far[last_streamed_len..]; + // Only stream complete lines to avoid flicker + if let Some(last_newline) = new_content.rfind('\n') { + let complete_lines = &new_content[..=last_newline]; + if !complete_lines.trim().is_empty() { + let _ = tx + .send(OutputChunk { + text: complete_lines.to_string(), + is_final: false, + }) + .await; + } + last_streamed_len += last_newline + 1; + } + } + } + russh::ChannelMsg::Eof => { + let _ = tx + .send(OutputChunk { + text: "\n[EOF]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::SessionDead( + "Remote shell closed".to_string(), + )); + } + russh::ChannelMsg::Close => { + inner.closed = true; + let _ = tx + .send(OutputChunk { + text: "\n[CHANNEL CLOSED]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::SessionDead( + "Remote channel closed".to_string(), + )); + } + _ => {} + } + } + Ok(None) => { + inner.closed = true; + let _ = tx + .send(OutputChunk { + text: "\n[CHANNEL CLOSED]\n".to_string(), + is_final: true, + }) + .await; + return Err(ShellSessionError::SessionDead( + "Remote channel closed unexpectedly".to_string(), + )); + } + Err(_) => { + // Timeout on wait, continue loop + continue; + } + } + } + + let duration = start.elapsed(); + + // Clean up output + let cleaned_output = + RemoteShellSession::clean_output(&output, &command_owned, &marker_clone); + + // Send final chunk + let _ = tx + .send(OutputChunk { + text: String::new(), + is_final: true, + }) + .await; + + debug!( + session_id = %session_id, + duration_ms = duration.as_millis(), + output_len = cleaned_output.len(), + "Remote streaming command completed" + ); + + Ok(CommandOutput { + output: cleaned_output, + exit_code: None, + duration, + }) + }); + + Ok((rx, handle)) + } + + async fn is_alive(&self) -> bool { + let inner = self.inner.lock().await; + !inner.closed + } + + fn session_id(&self) -> &str { + &self.session_id + } + + fn description(&self) -> String { + format!( + "remote:{}:{}", + self.connection_info.connection_string, self.session_id + ) + } + + async fn close(&mut self) -> Result<(), ShellSessionError> { + let mut inner = self.inner.lock().await; + if inner.closed { + return Ok(()); + } + + // Send exit command + let _ = inner.channel.data(&b"exit\n"[..]).await; + + // Close channel + let _ = inner.channel.close().await; + + inner.closed = true; + debug!(session_id = %self.session_id, "Remote shell session closed"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_marker_generation() { + let marker1 = RemoteShellSession::generate_marker(); + let marker2 = RemoteShellSession::generate_marker(); + + assert!(marker1.starts_with(MARKER_PREFIX)); + assert!(marker1.ends_with(MARKER_SUFFIX)); + assert_ne!(marker1, marker2, "Markers should be unique"); + } + + #[test] + fn test_clean_output() { + let raw = "echo hello\nhello\n__STAKPAK_CMD_END_abc123__\n$ "; + let cleaned = + RemoteShellSession::clean_output(raw, "echo hello", "__STAKPAK_CMD_END_abc123__"); + assert_eq!(cleaned.trim(), "hello"); + } + + #[test] + fn test_clean_output_with_ansi_codes() { + // Test output with ANSI escape codes (common in remote shells) + let raw = "\x1b[32mecho hello\x1b[0m\nhello\n__STAKPAK_CMD_END_abc123__\n\x1b[1m$ \x1b[0m"; + let cleaned = + RemoteShellSession::clean_output(raw, "echo hello", "__STAKPAK_CMD_END_abc123__"); + assert!( + cleaned.contains("hello"), + "Should extract hello from ANSI-coded output, got: '{}'", + cleaned + ); + } + + #[test] + fn test_clean_output_multiline() { + let raw = "ls -la\nfile1.txt\nfile2.txt\nfile3.txt\n__STAKPAK_CMD_END_xyz789__\n$ "; + let cleaned = RemoteShellSession::clean_output(raw, "ls -la", "__STAKPAK_CMD_END_xyz789__"); + assert!(cleaned.contains("file1.txt"), "Should contain file1.txt"); + assert!(cleaned.contains("file2.txt"), "Should contain file2.txt"); + assert!(cleaned.contains("file3.txt"), "Should contain file3.txt"); + assert!( + !cleaned.contains("__STAKPAK_CMD_END"), + "Should not contain marker" + ); + } + + #[test] + fn test_parse_connection_string() { + let result = RemoteShellSession::parse_connection_string("user@host.com"); + assert!(result.is_ok()); + let parsed = result.unwrap(); + assert_eq!(parsed.username, "user"); + assert_eq!(parsed.hostname, "host.com"); + assert_eq!(parsed.port, 22); + + let result = RemoteShellSession::parse_connection_string("admin@server.io:2222"); + assert!(result.is_ok()); + let parsed = result.unwrap(); + assert_eq!(parsed.username, "admin"); + assert_eq!(parsed.hostname, "server.io"); + assert_eq!(parsed.port, 2222); + } + + #[test] + fn test_parse_connection_string_invalid() { + // Missing @ symbol + let result = RemoteShellSession::parse_connection_string("userhost.com"); + assert!(result.is_err(), "Should fail without @ symbol"); + + // Invalid port + let result = RemoteShellSession::parse_connection_string("user@host:notaport"); + assert!(result.is_err(), "Should fail with invalid port"); + } + + #[test] + fn test_parse_connection_string_edge_cases() { + // Username with dots + let result = RemoteShellSession::parse_connection_string("user.name@host.com"); + assert!(result.is_ok()); + let parsed = result.unwrap(); + assert_eq!(parsed.username, "user.name"); + + // Hostname with subdomain + let result = RemoteShellSession::parse_connection_string("admin@sub.domain.example.com:22"); + assert!(result.is_ok()); + let parsed = result.unwrap(); + assert_eq!(parsed.hostname, "sub.domain.example.com"); + assert_eq!(parsed.port, 22); + + // IP address + let result = RemoteShellSession::parse_connection_string("root@192.168.1.100:2222"); + assert!(result.is_ok()); + let parsed = result.unwrap(); + assert_eq!(parsed.hostname, "192.168.1.100"); + assert_eq!(parsed.port, 2222); + } +} diff --git a/libs/shared/src/shell_session/session.rs b/libs/shared/src/shell_session/session.rs new file mode 100644 index 00000000..e6c12560 --- /dev/null +++ b/libs/shared/src/shell_session/session.rs @@ -0,0 +1,158 @@ +//! Shell Session Trait Definition +//! +//! Defines the common interface for both local and remote persistent shell sessions. + +use async_trait::async_trait; +use std::time::Duration; +use thiserror::Error; +use tokio::sync::mpsc; + +/// Errors that can occur during shell session operations +#[derive(Debug, Error)] +pub enum ShellSessionError { + #[error("Session not alive: {0}")] + SessionDead(String), + + #[error("Command execution failed: {0}")] + ExecutionFailed(String), + + #[error("Command timed out after {0:?}")] + Timeout(Duration), + + #[error("Failed to spawn shell: {0}")] + SpawnFailed(String), + + #[error("PTY error: {0}")] + PtyError(String), + + #[error("SSH error: {0}")] + SshError(String), + + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Session closed")] + SessionClosed, + + #[error("Marker detection failed: {0}")] + MarkerDetectionFailed(String), + + #[error("Channel send error: {0}")] + ChannelError(String), +} + +/// A chunk of output from streaming command execution +#[derive(Debug, Clone)] +pub struct OutputChunk { + /// The text content of this chunk + pub text: String, + /// Whether this is the final chunk (command completed) + pub is_final: bool, +} + +/// Type alias for the streaming output receiver +pub type OutputReceiver = mpsc::Receiver; + +/// Type alias for the streaming output sender +pub type OutputSender = mpsc::Sender; + +/// Output from a command execution +#[derive(Debug, Clone)] +pub struct CommandOutput { + /// The stdout/stderr output from the command + pub output: String, + + /// Exit code if available (may not be available for all session types) + pub exit_code: Option, + + /// Duration the command took to execute + pub duration: Duration, +} + +impl CommandOutput { + /// Check if the command succeeded (exit code 0 or not available) + pub fn success(&self) -> bool { + self.exit_code.map(|c| c == 0).unwrap_or(true) + } +} + +/// Common interface for persistent shell sessions +/// +/// Both local (PTY-based) and remote (SSH-based) sessions implement this trait, +/// allowing uniform handling of shell commands regardless of execution location. +#[async_trait] +pub trait ShellSession: Send + Sync { + /// Execute a command in the persistent shell session + /// + /// The command is executed in the context of the persistent shell, meaning: + /// - Environment variables set by previous commands are available + /// - Working directory changes persist + /// - Shell aliases and functions are available + /// + /// # Arguments + /// * `command` - The shell command to execute + /// * `timeout` - Optional timeout for command execution + /// + /// # Returns + /// * `Ok(CommandOutput)` - Command output and metadata + /// * `Err(ShellSessionError)` - If execution fails + async fn execute( + &self, + command: &str, + timeout: Option, + ) -> Result; + + /// Execute a command with streaming output + /// + /// Similar to `execute`, but returns a channel receiver that yields output chunks + /// as they become available. This enables real-time output streaming to the UI. + /// + /// # Arguments + /// * `command` - The shell command to execute + /// * `timeout` - Optional timeout for command execution + /// + /// # Returns + /// * `Ok((OutputReceiver, JoinHandle))` - Receiver for output chunks and task handle + /// * `Err(ShellSessionError)` - If execution fails to start + async fn execute_streaming( + &self, + command: &str, + timeout: Option, + ) -> Result< + ( + OutputReceiver, + tokio::task::JoinHandle>, + ), + ShellSessionError, + >; + + /// Check if the session is still alive and responsive + /// + /// This performs a lightweight check (e.g., sending a simple echo command) + /// to verify the shell process is still running. + async fn is_alive(&self) -> bool; + + /// Get the unique identifier for this session + fn session_id(&self) -> &str; + + /// Get a human-readable description of the session + /// + /// For local sessions: "local:shell-abc123" + /// For remote sessions: "remote:user@host:shell-def456" + fn description(&self) -> String; + + /// Close the session and clean up resources + /// + /// After calling this, the session should not be used again. + async fn close(&mut self) -> Result<(), ShellSessionError>; + + /// Get the current working directory of the shell session + /// + /// Returns None if unable to determine (e.g., session dead) + async fn get_cwd(&self) -> Option { + match self.execute("pwd", Some(Duration::from_secs(5))).await { + Ok(output) => Some(output.output.trim().to_string()), + Err(_) => None, + } + } +} diff --git a/libs/shared/tests/_ssh_integration_test.rs.disabled b/libs/shared/tests/_ssh_integration_test.rs.disabled new file mode 100644 index 00000000..887219e6 --- /dev/null +++ b/libs/shared/tests/_ssh_integration_test.rs.disabled @@ -0,0 +1,332 @@ +//! SSH Integration Tests for Persistent Remote Shell Sessions +//! +//! These tests require a Docker-based SSH server to be running. +//! Run with: cargo test -p stakpak-shared --test ssh_integration_test -- --ignored +//! +//! To start the test SSH server: +//! docker run -d --name test-ssh -p 2222:22 \ +//! -e SSH_ENABLE_ROOT=true \ +//! -e ROOT_PASSWORD=testpass123 \ +//! panubo/sshd:latest +//! +//! Or use the provided docker-compose in platform-testing/ + +use stakpak_shared::remote_connection::RemoteConnectionInfo; +use stakpak_shared::shell_session::{RemoteShellSession, ShellSession, ShellSessionManager}; +use std::time::Duration; + +const TEST_SSH_HOST: &str = "127.0.0.1"; +const TEST_SSH_PORT: u16 = 2222; +const TEST_SSH_USER: &str = "root"; +const TEST_SSH_PASS: &str = "testpass123"; + +fn connection_string() -> String { + format!("{}@{}:{}", TEST_SSH_USER, TEST_SSH_HOST, TEST_SSH_PORT) +} + +fn connection_info() -> RemoteConnectionInfo { + RemoteConnectionInfo { + connection_string: connection_string(), + password: Some(TEST_SSH_PASS.to_string()), + private_key_path: None, + } +} + +/// Helper to check if SSH server is available +async fn ssh_server_available() -> bool { + use std::net::TcpStream; + TcpStream::connect(format!("{}:{}", TEST_SSH_HOST, TEST_SSH_PORT)).is_ok() +} + +#[tokio::test] +#[ignore] // Run with --ignored flag when SSH server is available +async fn test_ssh_connection_establishment() { + if !ssh_server_available().await { + eprintln!( + "SSH server not available at {}:{}, skipping test", + TEST_SSH_HOST, TEST_SSH_PORT + ); + return; + } + + let result = RemoteShellSession::new(connection_info(), None, None).await; + + assert!( + result.is_ok(), + "Should establish SSH connection: {:?}", + result.err() + ); + + let session = result.unwrap(); + assert!( + session.is_alive().await, + "Session should be alive after connection" + ); + assert!( + session.session_id().starts_with("remote-"), + "Session ID should have remote prefix" + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_simple_command() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + let result = session.execute("echo hello_ssh", None).await; + + assert!(result.is_ok(), "Command should succeed: {:?}", result.err()); + let output = result.unwrap(); + assert!( + output.output.contains("hello_ssh"), + "Output should contain 'hello_ssh', got: '{}'", + output.output + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_env_var_persistence() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Set environment variable + let set_result = session + .execute("export SSH_TEST_VAR=persistent_value_123", None) + .await; + assert!(set_result.is_ok(), "Setting env var should succeed"); + + // Read it back in subsequent command + let get_result = session.execute("echo $SSH_TEST_VAR", None).await; + assert!(get_result.is_ok(), "Reading env var should succeed"); + + let output = get_result.unwrap(); + assert!( + output.output.contains("persistent_value_123"), + "Environment variable should persist across commands, got: '{}'", + output.output + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_cwd_persistence() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Change directory + let cd_result = session.execute("cd /tmp", None).await; + assert!(cd_result.is_ok(), "cd should succeed"); + + // Verify we're still in /tmp + let pwd_result = session.execute("pwd", None).await; + assert!(pwd_result.is_ok(), "pwd should succeed"); + + let output = pwd_result.unwrap(); + assert!( + output.output.contains("/tmp"), + "Working directory should persist to /tmp, got: '{}'", + output.output + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_marker_transparency() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Run multiple commands and verify NO markers leak + let commands = vec![ + "echo first_command", + "echo second_command", + "ls /tmp | head -3", + "echo third_command", + ]; + + for (i, cmd) in commands.iter().enumerate() { + let result = session.execute(cmd, None).await; + assert!( + result.is_ok(), + "Command {} '{}' should succeed: {:?}", + i, + cmd, + result.err() + ); + + let output = result.unwrap(); + + // CRITICAL: Verify no markers leak into output + assert!( + !output.output.contains("__STAKPAK_CMD_END_"), + "Marker should NOT appear in output for command '{}', got: '{}'", + cmd, + output.output + ); + assert!( + !output.output.contains("__STAKPAK_CMD_"), + "Marker prefix should NOT appear in output for command '{}', got: '{}'", + cmd, + output.output + ); + } +} + +#[tokio::test] +#[ignore] +async fn test_ssh_multiple_sequential_commands() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Run 10 commands in sequence + for i in 1..=10 { + let result = session + .execute(&format!("echo iteration_{}", i), None) + .await; + assert!( + result.is_ok(), + "Command {} should succeed: {:?}", + i, + result.err() + ); + + let output = result.unwrap(); + assert!( + output.output.contains(&format!("iteration_{}", i)), + "Output should contain 'iteration_{}', got: '{}'", + i, + output.output + ); + } +} + +#[tokio::test] +#[ignore] +async fn test_ssh_session_close() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let mut session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Execute a command + let result = session.execute("echo before_close", None).await; + assert!(result.is_ok(), "Command before close should succeed"); + + // Close the session + let close_result = session.close().await; + assert!(close_result.is_ok(), "Close should succeed"); + + // Session should report as not alive + assert!( + !session.is_alive().await, + "Session should not be alive after close" + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_via_manager() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let manager = ShellSessionManager::with_defaults(); + + // Create remote session via manager + let session_id = manager + .get_or_create_default_remote_session( + &connection_string(), + Some(TEST_SSH_PASS.to_string()), + None, + ) + .await; + + assert!( + session_id.is_ok(), + "Should create remote session via manager: {:?}", + session_id.err() + ); + let session_id = session_id.unwrap(); + + // Execute command via manager + let result = manager + .execute_in_session(&session_id, "echo manager_test", None) + .await; + assert!( + result.is_ok(), + "Command via manager should succeed: {:?}", + result.err() + ); + + let output = result.unwrap(); + assert!( + output.output.contains("manager_test"), + "Output should contain 'manager_test', got: '{}'", + output.output + ); + + // Verify session is listed + let sessions = manager.list_sessions().await; + assert!(!sessions.is_empty(), "Should have at least one session"); + assert!( + sessions.iter().any(|s| s.is_remote), + "Should have a remote session" + ); +} + +#[tokio::test] +#[ignore] +async fn test_ssh_command_timeout() { + if !ssh_server_available().await { + eprintln!("SSH server not available, skipping test"); + return; + } + + let session = RemoteShellSession::new(connection_info(), None, None) + .await + .expect("Should connect"); + + // Run a command with a very short timeout + let result = session + .execute("sleep 10", Some(Duration::from_millis(500))) + .await; + + // Should timeout + assert!(result.is_err(), "Long-running command should timeout"); +} diff --git a/test-reports/shell-persistence-test-report.md b/test-reports/shell-persistence-test-report.md new file mode 100644 index 00000000..535d286f --- /dev/null +++ b/test-reports/shell-persistence-test-report.md @@ -0,0 +1,377 @@ +# Shell Persistence Test Report + +**Date:** 2026-01-15 +**Branch:** `feat/persistent-shell-sessions` +**Test Environment:** macOS with Docker SSH server (`rastasheep/ubuntu-sshd:18.04`) + +--- + +## Overview + +This report documents comprehensive testing of shell session persistence across all command execution modes in Stakpak: + +- **Local Foreground** - `run_command` without remote parameter +- **Remote Foreground** - `run_command` with SSH remote connection +- **Local Background** - `run_command_task` without remote parameter +- **Remote Background** - `run_command_task` with SSH remote connection + +## Test Setup + +```bash +# SSH server container for remote testing +docker run -d --name ssh-test-server -p 2222:22 rastasheep/ubuntu-sshd:18.04 +# Credentials: root/root +``` + +--- + +## Reproduction Guide + +Follow these steps to reproduce all tests manually: + +### 1. Start SSH Test Server + +```bash +# Start container +docker run -d --name ssh-test-server -p 2222:22 rastasheep/ubuntu-sshd:18.04 + +# Verify it's running +docker ps | grep ssh-test-server + +# Test SSH connectivity (password: root) +ssh -o StrictHostKeyChecking=no -p 2222 root@localhost echo "SSH works" +``` + +### 2. Test Local Foreground Persistence + +Run these commands sequentially in Stakpak using `run_command`: + +```bash +# Command 1: Set state +export LOCAL_VAR="test123" && cd /tmp && my_func() { echo "arg: $1"; } + +# Command 2: Verify persistence +echo "VAR=$LOCAL_VAR CWD=$(pwd)" && my_func "hello" +# Expected: VAR=test123 CWD=/tmp +# arg: hello +``` + +### 3. Test Remote Foreground Persistence + +Run these commands with `run_command` using `remote: root@localhost:2222` and `password: root`: + +```bash +# Command 1: Set state +export REMOTE_VAR="remote456" && cd /tmp && alias ll='ls -la' + +# Command 2: Verify persistence +echo "VAR=$REMOTE_VAR CWD=$(pwd)" && alias ll +# Expected: VAR=remote456 CWD=/tmp +# alias ll='ls -la' +``` + +### 4. Test Background Task Isolation + +Run these using `run_command_task`: + +```bash +# Background Task 1: Set variable +export BG_VAR="background789" && echo "Set: $BG_VAR" +# Expected output: Set: background789 + +# Background Task 2: Try to read Task 1's variable +echo "Read: $BG_VAR" +# Expected output: Read: (empty - isolated) +``` + +Then verify foreground can't see background state using `run_command`: + +```bash +echo "Foreground sees: $BG_VAR" +# Expected: Foreground sees: (empty - isolated) +``` + +### 5. Test Remote Background Isolation + +Run with `run_command_task` using `remote: root@localhost:2222` and `password: root`: + +```bash +# Remote Background: Set and echo +export REMOTE_BG="remotebg" && echo "VAR=$REMOTE_BG CWD=$(pwd)" +# Expected: VAR= CWD=/root (variable doesn't expand, starts in /root) +``` + +### 6. Cleanup + +```bash +docker rm -f ssh-test-server +``` + +### Quick Validation Script + +Save this as `test-persistence.sh` and run sections manually: + +```bash +#!/bin/bash +# This is a reference - run commands through Stakpak, not directly + +echo "=== LOCAL FOREGROUND ===" +# run_command: export TEST=1 && cd /tmp +# run_command: echo "TEST=$TEST CWD=$(pwd)" +# Expected: TEST=1 CWD=/tmp + +echo "=== REMOTE FOREGROUND ===" +# run_command (remote): export TEST=2 && cd /var +# run_command (remote): echo "TEST=$TEST CWD=$(pwd)" +# Expected: TEST=2 CWD=/var + +echo "=== LOCAL BACKGROUND ===" +# run_command_task: export BG=3 && echo $BG +# run_command: echo "BG=$BG" +# Expected: BG= (empty) + +echo "=== REMOTE BACKGROUND ===" +# run_command_task (remote): export RBG=4 && echo $RBG +# run_command (remote): echo "RBG=$RBG" +# Expected: RBG= (empty) +``` + +--- + +## Results + +### Environment Variables + +| Mode | Set Variable | Persists Across Commands | Notes | +|------|-------------|-------------------------|-------| +| **Local Foreground** | ✓ | ✓ | Full persistence within session | +| **Remote Foreground** | ✓ | ✓ | Full persistence within session | +| **Local Background** | ✓ | ✗ | Isolated - doesn't persist to foreground | +| **Remote Background** | ✗ | ✗ | Doesn't even set properly (runs in fresh shell) | + +**Test Commands:** +```bash +# Local Foreground - SET +export LOCAL_TEST_VAR="local_value_123" && echo "Set: $LOCAL_TEST_VAR" +# Output: Set: local_value_123 + +# Local Foreground - CHECK (subsequent command) +echo "Check: $LOCAL_TEST_VAR" +# Output: Check: local_value_123 ✓ PERSISTED + +# Remote Foreground - SET +export REMOTE_TEST_VAR="remote_value_456" && echo "Set: $REMOTE_TEST_VAR" +# Output: Set: remote_value_456 + +# Remote Foreground - CHECK (subsequent command) +echo "Check: $REMOTE_TEST_VAR" +# Output: Check: remote_value_456 ✓ PERSISTED + +# Local Background - SET +export BG_LOCAL_VAR="bg_local_value_789" && echo "Set: $BG_LOCAL_VAR" +# Output: Set: bg_local_value_789 + +# Local Foreground - CHECK BG VAR +echo "Check BG var: $BG_LOCAL_VAR" +# Output: Check BG var: ✗ NOT PERSISTED +``` + +--- + +### Working Directory (CWD) + +| Mode | Change CWD | Persists Across Commands | Notes | +|------|-----------|-------------------------|-------| +| **Local Foreground** | ✓ | ✓ | `cd /tmp` persisted | +| **Remote Foreground** | ✓ | ✓ | `cd /tmp` persisted | +| **Local Background** | ✓ | ✗ | CWD change isolated to task | +| **Remote Background** | ✗ | ✗ | Always starts in `/root` | + +**Test Commands:** +```bash +# Local Foreground - CHANGE +cd /tmp && echo "CWD: $(pwd)" +# Output: CWD: /tmp + +# Local Foreground - CHECK (subsequent command) +echo "CWD: $(pwd)" +# Output: CWD: /tmp ✓ PERSISTED + +# Remote Foreground - CHANGE +cd /tmp && echo "CWD: $(pwd)" +# Output: CWD: /tmp + +# Remote Foreground - CHECK (subsequent command) +echo "CWD: $(pwd)" +# Output: CWD: /tmp ✓ PERSISTED + +# Local Background - CHANGE +cd /var && echo "CWD: $(pwd)" +# Output: CWD: /var + +# Local Foreground - CHECK +echo "CWD: $(pwd)" +# Output: CWD: /tmp ✗ BG change NOT visible (still /tmp from foreground) +``` + +--- + +### Shell Functions & Aliases + +| Mode | Define | Persists Across Commands | Notes | +|------|--------|-------------------------|-------| +| **Local Foreground** | ✓ | ✓ | Functions and aliases persist | +| **Remote Foreground** | ✓ | ✓ | Functions and aliases persist | +| **Local Background** | N/A | N/A | Isolated - not practical to test | +| **Remote Background** | N/A | N/A | Isolated - not practical to test | + +**Test Commands:** +```bash +# Local Foreground - DEFINE +my_func() { echo "Function called with: $1"; } && alias ll='ls -la' + +# Local Foreground - CHECK (subsequent command) +my_func "test_arg" && alias ll +# Output: Function called with: test_arg +# ll='ls -la' ✓ BOTH PERSISTED + +# Remote Foreground - DEFINE +my_remote_func() { echo "Remote function: $1"; } && alias rll='ls -la' + +# Remote Foreground - CHECK (subsequent command) +my_remote_func "test_arg" && alias rll +# Output: Remote function: test_arg +# alias rll='ls -la' ✓ BOTH PERSISTED +``` + +--- + +### Shell History + +| Mode | History Available | Persists | +|------|------------------|----------| +| **Local Foreground** | ✓ | ✓ | +| **Remote Foreground** | ✓ | ✓ | +| **Background (both)** | N/A | N/A | + +**Test Output:** +```bash +# Local Foreground +history | tail -5 +# Shows previous commands from session ✓ + +# Remote Foreground +history | tail -5 +# Shows previous commands from remote session ✓ +``` + +--- + +### Cross-Session Isolation + +| Test | Result | Details | +|------|--------|---------| +| BG task 1 → BG task 2 (local) | ✗ Isolated | BG2 cannot see BG1's variables | +| BG task → Foreground (local) | ✗ Isolated | Foreground cannot see BG variables | +| BG task → Foreground (remote) | ✗ Isolated | Foreground cannot see BG variables | +| Subshell → Parent shell | ✗ Isolated | Expected POSIX behavior | + +**Test Commands:** +```bash +# Background Task 1 +export BG_SESSION_VAR="bg_session_123" && echo "BG1: $BG_SESSION_VAR" +# Output: BG1: bg_session_123 + +# Background Task 2 (started immediately after) +echo "BG2: checking BG1 var=$BG_SESSION_VAR" +# Output: BG2: checking BG1 var= ✗ ISOLATED + +# Subshell test (expected behavior) +(export SUBSHELL_VAR="val" && cd /usr && echo "In: $SUBSHELL_VAR $(pwd)") +echo "After: $SUBSHELL_VAR $(pwd)" +# Output: In: val /usr +# After: /tmp ✗ ISOLATED (correct POSIX behavior) +``` + +--- + +### Special Cases + +| Feature | Local FG | Remote FG | Local BG | Remote BG | +|---------|----------|-----------|----------|-----------| +| Exported vars | ✓ Persist | ✓ Persist | ✗ | ✗ | +| Non-exported vars | ✓ Persist | ✓ Persist | ✗ | ✗ | +| Special characters | ✓ | ✓ | ✓ | ✓ | +| Multi-line commands | ✓ | ✓ | ✓ | ✓ | + +**Exported vs Non-Exported Test:** +```bash +# Set both types +NON_EXPORTED_VAR="not_exported" && export EXPORTED_VAR="is_exported" + +# Check persistence (subsequent command) +echo "NON_EXPORTED: $NON_EXPORTED_VAR, EXPORTED: $EXPORTED_VAR" +# Output: NON_EXPORTED: not_exported, EXPORTED: is_exported +# ✓ BOTH persist in foreground sessions +``` + +**Special Characters Test:** +```bash +echo "Testing 'quotes' and \"double quotes\" and \$dollar and spaces here" +# Output: Testing 'quotes' and "double quotes" and $dollar and spaces here +# ✓ All special characters handled correctly +``` + +--- + +## Summary Table + +| Execution Mode | Persistent Shell Session | Use Case | +|----------------|-------------------------|----------| +| **Local Foreground** (`run_command`) | ✓ **YES** | Interactive work, stateful operations | +| **Remote Foreground** (`run_command` + remote) | ✓ **YES** | Remote server management | +| **Local Background** (`run_command_task`) | ✗ **NO** | Long-running isolated tasks | +| **Remote Background** (`run_command_task` + remote) | ✗ **NO** | Remote long-running tasks | + +--- + +## Key Findings + +### ✓ What Works + +1. **Foreground sessions are fully persistent** - Environment variables, working directory, functions, aliases, and history all persist across commands +2. **Local and remote foreground behave identically** - Both maintain persistent shell sessions +3. **Special characters and quoting work correctly** - No escaping issues observed + +### ✗ What Doesn't Work + +1. **Background tasks are completely isolated** - Each task runs in a fresh shell +2. **Background → Foreground state transfer** - Variables set in background tasks are not visible in foreground +3. **Background → Background state transfer** - Background tasks cannot share state with each other +4. **Remote background variable expansion** - Variables set in the same command don't expand properly (non-interactive shell) + +--- + +## Recommendations + +### For Users + +1. **Use foreground commands for stateful workflows** - When you need to set variables, change directories, or define functions that subsequent commands will use +2. **Use background tasks for isolated long-running operations** - Port forwarding, log tailing, servers - where state isolation is acceptable or desired +3. **Pass all required context in a single background command** - Don't rely on state from previous commands + +### For Developers + +1. **Background tasks should document isolation behavior** - Users should understand that each task is independent +2. **Consider adding session-based background tasks** - Option to run background tasks in the same session as foreground (if needed) +3. **Remote background could benefit from PTY allocation** - Would fix variable expansion issues + +--- + +## Test Environment Details + +- **Host OS:** macOS (Apple Silicon) +- **Docker Image:** `rastasheep/ubuntu-sshd:18.04` (amd64 via emulation) +- **SSH Port:** 2222 +- **Credentials:** root/root +- **Shell:** bash (remote), zsh (local) diff --git a/platform-testing/windows-testing-report.md b/test-reports/windows-testing-report.md similarity index 100% rename from platform-testing/windows-testing-report.md rename to test-reports/windows-testing-report.md