From 631abc5ac76c31bfb2b1a7071743f3f802fb3855 Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:02:15 +0400 Subject: [PATCH 01/11] Add sibling-container runtime manager and APIs Introduce a Bollard-backed runtime layer for workspace containers, expose runtime lifecycle/log/shell websocket endpoints, and route workspace command execution through runtime exec with persistent shell sessions. --- .env.example | 7 + Cargo.toml | 5 +- README.md | 11 + otter-core/Cargo.toml | 4 + otter-core/src/config.rs | 32 ++ otter-core/src/domain.rs | 26 ++ otter-core/src/lib.rs | 1 + otter-core/src/runtime/docker_manager.rs | 497 +++++++++++++++++++++++ otter-core/src/runtime/mod.rs | 2 + otter-core/src/runtime/shell_session.rs | 21 + otter-core/src/service.rs | 135 +++++- otter-core/src/vibe.rs | 2 - otter-server/src/main.rs | 264 ++++++++++-- 13 files changed, 976 insertions(+), 31 deletions(-) create mode 100644 otter-core/src/runtime/docker_manager.rs create mode 100644 otter-core/src/runtime/mod.rs create mode 100644 otter-core/src/runtime/shell_session.rs diff --git a/.env.example b/.env.example index 65b148c..3f65555 100644 --- a/.env.example +++ b/.env.example @@ -5,3 +5,10 @@ OTTER_VIBE_MODEL=mistral-large-3 OTTER_VIBE_PROVIDER=mistral # Optional extra env forwarded to vibe process (comma-separated KEY=VALUE pairs) # OTTER_VIBE_EXTRA_ENV=KEY1=VALUE1,KEY2=VALUE2 +OTTER_RUNTIME_ENABLED=true +OTTER_RUNTIME_DOCKER_SOCKET=unix:///var/run/docker.sock +OTTER_RUNTIME_NETWORK=kymatics_default +OTTER_RUNTIME_CONTAINER_PREFIX=otter-ws +OTTER_RUNTIME_IMAGE_PREFIX=otter/workspace +OTTER_RUNTIME_DEFAULT_HOST=http://localhost +OTTER_RUNTIME_MAX_LOG_LINES=2000 diff --git a/Cargo.toml b/Cargo.toml index 611096e..098a58e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,9 @@ license = "MIT" anyhow = "1" async-trait = "0.1" async-stream = "0.3" -axum = { version = "0.8", features = ["multipart"] } +axum = { version = "0.8", features = ["multipart", "ws"] } +bollard = "0.18" +bytes = "1" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" futures-util = "0.3" @@ -27,6 +29,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } uuid = { version = "1", features = ["serde", "v4"] } validator = { version = "0.20", features = ["derive"] } reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] } +tar = "0.4" [patch.crates-io] home = { path = "vendor/home" } diff --git a/README.md b/README.md index 0027961..6861920 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,13 @@ curl http://localhost:8080/healthz - `OTTER_MAX_ATTEMPTS` (default `5`) - `OTTER_WORKER_CONCURRENCY` (default `1`) - `OTTER_ALLOWED_ROOTS` (optional `:`-separated allowlist) +- `OTTER_RUNTIME_ENABLED` (default `false`, enables sibling container runtime) +- `OTTER_RUNTIME_DOCKER_SOCKET` (default `unix:///var/run/docker.sock`) +- `OTTER_RUNTIME_NETWORK` (default `kymatics_default`) +- `OTTER_RUNTIME_CONTAINER_PREFIX` (default `otter-ws`) +- `OTTER_RUNTIME_IMAGE_PREFIX` (default `otter/workspace`) +- `OTTER_RUNTIME_DEFAULT_HOST` (default `http://localhost`, used to compose preview URLs) +- `OTTER_RUNTIME_MAX_LOG_LINES` (default `2000`) - `MISTRAL_API_KEY` (read from `.env`, passed to server/worker/vibe process) ### Selecting a Vibe model @@ -98,6 +105,10 @@ These are forwarded as `VIBE_MODEL` / `MISTRAL_MODEL` and `VIBE_PROVIDER` for co - `GET /v1/queue` - `PATCH /v1/queue/{id}` (update queue position via priority) - `GET /v1/history` +- `GET /v1/runtime/workspaces/{id}` (runtime container status + preferred preview URL) +- `POST /v1/runtime/workspaces/{id}/start|stop|restart` +- `GET /v1/runtime/workspaces/{id}/logs?tail=200` +- `GET /v1/runtime/workspaces/{id}/shell/ws` (interactive command channel via websocket) ## Repository Structure diff --git a/otter-core/Cargo.toml b/otter-core/Cargo.toml index 9380086..90f9c5f 100644 --- a/otter-core/Cargo.toml +++ b/otter-core/Cargo.toml @@ -7,12 +7,16 @@ license.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +bollard.workspace = true +bytes.workspace = true chrono.workspace = true dotenvy.workspace = true +futures-util.workspace = true redis.workspace = true serde.workspace = true serde_json.workspace = true sqlx.workspace = true +tar.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/otter-core/src/config.rs b/otter-core/src/config.rs index 83b9bfd..3d580cc 100644 --- a/otter-core/src/config.rs +++ b/otter-core/src/config.rs @@ -19,6 +19,18 @@ pub struct AppConfig { pub lavoix_url: String, pub max_attempts: i32, pub worker_concurrency: usize, + pub runtime: RuntimeConfig, +} + +#[derive(Clone, Debug)] +pub struct RuntimeConfig { + pub enabled: bool, + pub docker_socket: String, + pub network_name: String, + pub container_name_prefix: String, + pub image_tag_prefix: String, + pub default_host: String, + pub max_log_lines: usize, } impl AppConfig { @@ -78,6 +90,26 @@ impl AppConfig { .ok() .and_then(|value| value.parse().ok()) .unwrap_or(1), + runtime: RuntimeConfig { + enabled: env::var("OTTER_RUNTIME_ENABLED") + .ok() + .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) + .unwrap_or(false), + docker_socket: env::var("OTTER_RUNTIME_DOCKER_SOCKET") + .unwrap_or_else(|_| "unix:///var/run/docker.sock".to_string()), + network_name: env::var("OTTER_RUNTIME_NETWORK") + .unwrap_or_else(|_| "kymatics_default".to_string()), + container_name_prefix: env::var("OTTER_RUNTIME_CONTAINER_PREFIX") + .unwrap_or_else(|_| "otter-ws".to_string()), + image_tag_prefix: env::var("OTTER_RUNTIME_IMAGE_PREFIX") + .unwrap_or_else(|_| "otter/workspace".to_string()), + default_host: env::var("OTTER_RUNTIME_DEFAULT_HOST") + .unwrap_or_else(|_| "http://localhost".to_string()), + max_log_lines: env::var("OTTER_RUNTIME_MAX_LOG_LINES") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(2000), + }, }) } } diff --git a/otter-core/src/domain.rs b/otter-core/src/domain.rs index d5ec531..bea9e9d 100644 --- a/otter-core/src/domain.rs +++ b/otter-core/src/domain.rs @@ -117,3 +117,29 @@ pub struct QueueItem { pub queue_rank: i64, pub created_at: DateTime, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RuntimeContainerStatus { + Running, + Stopped, + Missing, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimePortBinding { + pub container_port: u16, + pub host_ip: String, + pub host_port: u16, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeContainerInfo { + pub workspace_id: Uuid, + pub container_name: String, + pub image_tag: String, + pub container_id: Option, + pub status: RuntimeContainerStatus, + pub ports: Vec, + pub preferred_url: Option, +} diff --git a/otter-core/src/lib.rs b/otter-core/src/lib.rs index ea658f5..937a227 100644 --- a/otter-core/src/lib.rs +++ b/otter-core/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod db; pub mod domain; pub mod queue; +pub mod runtime; pub mod service; pub mod vibe; pub mod workspace; diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs new file mode 100644 index 0000000..0bafff3 --- /dev/null +++ b/otter-core/src/runtime/docker_manager.rs @@ -0,0 +1,497 @@ +use std::collections::HashMap; +use std::path::Path; + +use anyhow::{anyhow, Context, Result}; +use bollard::container::{ + Config as ContainerConfig, CreateContainerOptions, InspectContainerOptions, + ListContainersOptions, LogOutput, LogsOptions, RemoveContainerOptions, RestartContainerOptions, + StartContainerOptions, StopContainerOptions, +}; +use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults}; +use bollard::image::BuildImageOptions; +use bollard::models::{HostConfig, PortBinding}; +use bollard::{Docker, API_DEFAULT_VERSION}; +use bytes::Bytes; +use futures_util::stream::{self, StreamExt, TryStreamExt}; +use tar::Builder; +use tracing::debug; +use uuid::Uuid; + +use crate::config::RuntimeConfig; +use crate::domain::{RuntimeContainerInfo, RuntimeContainerStatus, RuntimePortBinding}; + +const CWD_MARKER: &str = "__OTTER_CWD__:"; + +#[derive(Debug, Clone)] +pub struct RuntimeExecResult { + pub stdout: String, + pub stderr: String, + pub exit_code: i64, + pub cwd: String, +} + +#[derive(Clone)] +pub struct DockerRuntimeManager { + docker: Docker, + config: RuntimeConfig, +} + +impl DockerRuntimeManager { + pub fn new(config: RuntimeConfig) -> Result { + let docker = connect_docker(&config.docker_socket)?; + Ok(Self { docker, config }) + } + + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + pub fn container_name_for(&self, workspace_id: Uuid) -> String { + format!( + "{}-{}", + self.config.container_name_prefix, + workspace_id.as_simple() + ) + } + + pub fn image_tag_for(&self, workspace_id: Uuid) -> String { + format!( + "{}:{}", + self.config.image_tag_prefix, + workspace_id.as_simple() + ) + } + + pub async fn ensure_workspace_container( + &self, + workspace_id: Uuid, + workspace_root: &Path, + ) -> Result { + let container_name = self.container_name_for(workspace_id); + let image_tag = self.image_tag_for(workspace_id); + self.build_workspace_image(workspace_root, &image_tag) + .await?; + + if self + .inspect_container_by_name(&container_name) + .await? + .is_some() + { + self.docker + .remove_container( + &container_name, + Some(RemoveContainerOptions { + force: true, + v: true, + link: false, + }), + ) + .await + .with_context(|| format!("failed to remove previous container {container_name}"))?; + } + + self.docker + .create_container( + Some(CreateContainerOptions { + name: container_name.clone(), + platform: None, + }), + ContainerConfig { + image: Some(image_tag.clone()), + tty: Some(true), + host_config: Some(HostConfig { + publish_all_ports: Some(true), + network_mode: Some(self.config.network_name.clone()), + ..Default::default() + }), + ..Default::default() + }, + ) + .await + .with_context(|| format!("failed to create runtime container {container_name}"))?; + + self.docker + .start_container(&container_name, None::>) + .await + .with_context(|| format!("failed to start runtime container {container_name}"))?; + + self.runtime_status(workspace_id).await + } + + pub async fn runtime_status(&self, workspace_id: Uuid) -> Result { + let container_name = self.container_name_for(workspace_id); + let image_tag = self.image_tag_for(workspace_id); + let inspect = self.inspect_container_by_name(&container_name).await?; + + let Some(container) = inspect else { + return Ok(RuntimeContainerInfo { + workspace_id, + container_name, + image_tag, + container_id: None, + status: RuntimeContainerStatus::Missing, + ports: vec![], + preferred_url: None, + }); + }; + + let status = container + .state + .and_then(|state| state.status) + .map(|status| match status.as_str() { + "running" => RuntimeContainerStatus::Running, + _ => RuntimeContainerStatus::Stopped, + }) + .unwrap_or(RuntimeContainerStatus::Stopped); + + let ports = extract_port_bindings( + container + .network_settings + .and_then(|settings| settings.ports) + .unwrap_or_default(), + ); + let preferred_url = ports + .iter() + .find(|binding| binding.host_port > 0) + .map(|binding| { + format!( + "{}:{}", + self.config.default_host.trim_end_matches('/'), + binding.host_port + ) + }); + + Ok(RuntimeContainerInfo { + workspace_id, + container_name, + image_tag, + container_id: container.id, + status, + ports, + preferred_url, + }) + } + + pub async fn start_workspace_container( + &self, + workspace_id: Uuid, + ) -> Result { + let container_name = self.container_name_for(workspace_id); + self.docker + .start_container(&container_name, None::>) + .await + .with_context(|| format!("failed to start container {container_name}"))?; + self.runtime_status(workspace_id).await + } + + pub async fn stop_workspace_container( + &self, + workspace_id: Uuid, + ) -> Result { + let container_name = self.container_name_for(workspace_id); + self.docker + .stop_container(&container_name, Some(StopContainerOptions { t: 10 })) + .await + .with_context(|| format!("failed to stop container {container_name}"))?; + self.runtime_status(workspace_id).await + } + + pub async fn restart_workspace_container( + &self, + workspace_id: Uuid, + ) -> Result { + let container_name = self.container_name_for(workspace_id); + self.docker + .restart_container(&container_name, Some(RestartContainerOptions { t: 10 })) + .await + .with_context(|| format!("failed to restart container {container_name}"))?; + self.runtime_status(workspace_id).await + } + + pub async fn logs_for_workspace(&self, workspace_id: Uuid, tail: usize) -> Result { + let container_name = self.container_name_for(workspace_id); + let mut stream = self.docker.logs( + &container_name, + Some(LogsOptions:: { + stdout: true, + stderr: true, + follow: false, + tail: tail.to_string(), + ..Default::default() + }), + ); + + let mut chunks = Vec::new(); + while let Some(item) = stream.next().await { + let item = + item.with_context(|| format!("failed reading logs from {container_name}"))?; + let text = match item { + LogOutput::StdOut { message } => String::from_utf8_lossy(&message).to_string(), + LogOutput::StdErr { message } => String::from_utf8_lossy(&message).to_string(), + LogOutput::StdIn { message } => String::from_utf8_lossy(&message).to_string(), + LogOutput::Console { message } => String::from_utf8_lossy(&message).to_string(), + }; + chunks.push(text); + } + Ok(chunks.join("")) + } + + pub async fn exec_in_workspace( + &self, + workspace_id: Uuid, + command: &str, + cwd: Option<&str>, + ) -> Result { + let container_name = self.container_name_for(workspace_id); + let wrapped_command = format!( + "({command}); __otter_exit=$?; printf '\\n{CWD_MARKER}%s\\n' \"$PWD\"; exit $__otter_exit" + ); + let exec = self + .docker + .create_exec( + &container_name, + CreateExecOptions { + attach_stdout: Some(true), + attach_stderr: Some(true), + cmd: Some(vec!["bash", "-lc", wrapped_command.as_str()]), + working_dir: cwd.map(str::to_string), + ..Default::default() + }, + ) + .await + .with_context(|| format!("failed to create exec in container {container_name}"))?; + + let mut stdout = String::new(); + let mut stderr = String::new(); + if let StartExecResults::Attached { mut output, .. } = self + .docker + .start_exec( + &exec.id, + Some(StartExecOptions { + detach: false, + tty: false, + }), + ) + .await + .with_context(|| format!("failed to start exec for container {container_name}"))? + { + while let Some(chunk) = output.next().await { + let chunk = chunk.context("failed reading exec stream")?; + match chunk { + LogOutput::StdOut { message } | LogOutput::Console { message } => { + stdout.push_str(&String::from_utf8_lossy(&message)); + } + LogOutput::StdErr { message } => { + stderr.push_str(&String::from_utf8_lossy(&message)); + } + LogOutput::StdIn { .. } => {} + } + } + } + + let inspect = self + .docker + .inspect_exec(&exec.id) + .await + .context("failed to inspect exec")?; + let exit_code = inspect.exit_code.unwrap_or(-1); + let (stdout, resolved_cwd) = split_stdout_and_cwd(&stdout, cwd.unwrap_or("/workspace")); + + Ok(RuntimeExecResult { + stdout, + stderr, + exit_code, + cwd: resolved_cwd, + }) + } + + async fn build_workspace_image(&self, workspace_root: &Path, tag: &str) -> Result<()> { + let dockerfile = workspace_root.join("Dockerfile"); + if !dockerfile.exists() { + return Err(anyhow!( + "workspace {} does not include a Dockerfile for runtime build", + workspace_root.display() + )); + } + + let context = tar_workspace(workspace_root)?; + let options = BuildImageOptions { + dockerfile: "Dockerfile".to_string(), + t: tag.to_string(), + rm: true, + pull: false, + ..Default::default() + }; + let body = stream::once(async move { Ok::(Bytes::from(context)) }); + let mut stream = self.docker.build_image(options, None, Some(body)); + while let Some(chunk) = stream + .try_next() + .await + .context("failed docker image build stream")? + { + if let Some(error) = chunk.error { + return Err(anyhow!("docker image build failed: {error}")); + } + if let Some(message) = chunk.stream { + debug!(image = %tag, output = %message.trim_end(), "docker build output"); + } + } + Ok(()) + } + + async fn inspect_container_by_name( + &self, + container_name: &str, + ) -> Result> { + match self + .docker + .inspect_container(container_name, None::) + .await + { + Ok(response) => Ok(Some(response)), + Err(bollard::errors::Error::DockerResponseServerError { status_code, .. }) + if status_code == 404 => + { + Ok(None) + } + Err(error) => Err(error.into()), + } + } + + pub async fn prune_stale_containers(&self) -> Result<()> { + let containers = self + .docker + .list_containers(Some(ListContainersOptions:: { + all: true, + ..Default::default() + })) + .await?; + for container in containers { + let Some(names) = container.names else { + continue; + }; + let should_remove = names.iter().any(|name| { + name.trim_start_matches('/') + .starts_with(&self.config.container_name_prefix) + }); + if !should_remove { + continue; + } + if let Some(id) = container.id { + let _ = self + .docker + .remove_container( + &id, + Some(RemoveContainerOptions { + force: true, + v: true, + link: false, + }), + ) + .await; + } + } + Ok(()) + } +} + +fn connect_docker(socket: &str) -> Result { + if let Some(path) = socket.strip_prefix("unix://") { + Docker::connect_with_unix(path, 120, API_DEFAULT_VERSION) + .with_context(|| format!("failed to connect docker on unix socket {socket}")) + } else { + Docker::connect_with_local_defaults() + .context("failed to connect docker with local defaults") + } +} + +fn tar_workspace(workspace_root: &Path) -> Result> { + let mut archive = Vec::new(); + let mut builder = Builder::new(&mut archive); + builder.append_dir_all(".", workspace_root)?; + builder.finish()?; + drop(builder); + Ok(archive) +} + +fn extract_port_bindings( + raw_ports: HashMap>>, +) -> Vec { + let mut parsed = Vec::new(); + for (container_port_proto, host_bindings) in raw_ports { + let Some(container_port) = container_port_proto.split('/').next() else { + continue; + }; + let Ok(container_port_number) = container_port.parse::() else { + continue; + }; + let Some(host_bindings) = host_bindings else { + continue; + }; + for binding in host_bindings { + let host_ip = binding.host_ip.unwrap_or_else(|| "0.0.0.0".to_string()); + let host_port = binding + .host_port + .and_then(|value| value.parse::().ok()) + .unwrap_or_default(); + parsed.push(RuntimePortBinding { + container_port: container_port_number, + host_ip, + host_port, + }); + } + } + parsed.sort_by_key(|binding| (binding.container_port, binding.host_port)); + parsed +} + +pub fn split_stdout_and_cwd(stdout: &str, fallback_cwd: &str) -> (String, String) { + let mut cleaned_lines = Vec::new(); + let mut resolved_cwd = fallback_cwd.to_string(); + for raw_line in stdout.lines() { + if let Some((_, cwd)) = raw_line.split_once(CWD_MARKER) { + let candidate = cwd.trim(); + if !candidate.is_empty() { + resolved_cwd = candidate.to_string(); + } + continue; + } + cleaned_lines.push(raw_line); + } + let mut cleaned_stdout = cleaned_lines.join("\n"); + if stdout.ends_with('\n') && !cleaned_stdout.is_empty() { + cleaned_stdout.push('\n'); + } + (cleaned_stdout, resolved_cwd) +} + +#[cfg(test)] +mod tests { + use super::{extract_port_bindings, split_stdout_and_cwd}; + use bollard::models::PortBinding; + use std::collections::HashMap; + + #[test] + fn split_stdout_extracts_cwd() { + let raw = "line one\n__OTTER_CWD__:/workspace/demo\n"; + let (stdout, cwd) = split_stdout_and_cwd(raw, "/workspace"); + assert_eq!(stdout, "line one\n"); + assert_eq!(cwd, "/workspace/demo".to_string()); + } + + #[test] + fn extract_ports_parses_bindings() { + let mut raw = HashMap::new(); + raw.insert( + "3000/tcp".to_string(), + Some(vec![PortBinding { + host_ip: Some("0.0.0.0".to_string()), + host_port: Some("49162".to_string()), + }]), + ); + let parsed = extract_port_bindings(raw); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0].container_port, 3000); + assert_eq!(parsed[0].host_port, 49162); + } +} diff --git a/otter-core/src/runtime/mod.rs b/otter-core/src/runtime/mod.rs new file mode 100644 index 0000000..c8a3cc0 --- /dev/null +++ b/otter-core/src/runtime/mod.rs @@ -0,0 +1,2 @@ +pub mod docker_manager; +pub mod shell_session; diff --git a/otter-core/src/runtime/shell_session.rs b/otter-core/src/runtime/shell_session.rs new file mode 100644 index 0000000..634b76e --- /dev/null +++ b/otter-core/src/runtime/shell_session.rs @@ -0,0 +1,21 @@ +use uuid::Uuid; + +pub fn build_shell_session_key(workspace_id: Uuid, session_id: &str) -> String { + format!("{workspace_id}:{session_id}") +} + +#[cfg(test)] +mod tests { + use super::build_shell_session_key; + use uuid::Uuid; + + #[test] + fn shell_session_key_is_stable() { + let workspace_id = Uuid::parse_str("aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa").unwrap(); + let key = build_shell_session_key(workspace_id, "terminal"); + assert_eq!( + key, + "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa:terminal".to_string() + ); + } +} diff --git a/otter-core/src/service.rs b/otter-core/src/service.rs index 3ffcaef..ffebb67 100644 --- a/otter-core/src/service.rs +++ b/otter-core/src/service.rs @@ -9,6 +9,7 @@ use anyhow::{anyhow, Result}; use chrono::Utc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{info, warn}; use uuid::Uuid; @@ -18,9 +19,11 @@ use crate::config::AppConfig; use crate::db::Database; use crate::domain::{ CreateProjectRequest, CreateWorkspaceRequest, EnqueuePromptRequest, Job, JobEvent, JobStatus, - Project, QueueItem, UpdateQueuePositionRequest, Workspace, + Project, QueueItem, RuntimeContainerInfo, UpdateQueuePositionRequest, Workspace, }; use crate::queue::{Queue, QueueMessage}; +use crate::runtime::docker_manager::{DockerRuntimeManager, RuntimeExecResult}; +use crate::runtime::shell_session::build_shell_session_key; use crate::vibe::{VibeExecutor, VibeOutputChunk}; use crate::workspace::WorkspaceManager; @@ -35,6 +38,8 @@ pub struct OtterService { pub max_attempts: i32, pub default_workspace_path: Option, pub default_workspace_subdir: String, + pub runtime_manager: Option>, + pub runtime_shell_cwds: Arc>>, } impl OtterService { @@ -62,6 +67,18 @@ impl OtterService { max_attempts: config.max_attempts, default_workspace_path: config.default_workspace_path.clone(), default_workspace_subdir: config.default_workspace_subdir.clone(), + runtime_manager: if config.runtime.enabled { + match DockerRuntimeManager::new(config.runtime.clone()) { + Ok(manager) => Some(Arc::new(manager)), + Err(error) => { + warn!(error = %error, "runtime enabled but docker manager could not initialize"); + None + } + } + } else { + None + }, + runtime_shell_cwds: Arc::new(Mutex::new(std::collections::HashMap::new())), } } @@ -459,6 +476,37 @@ impl OtterService { "setup.sh execution finished" ); + if let Some(runtime) = &self.runtime_manager { + let runtime_info = runtime + .ensure_workspace_container(workspace.id, &workspace_path) + .await?; + self.db + .insert_job_event( + job.id, + "runtime_container_ready", + serde_json::json!({ + "container_name": runtime_info.container_name, + "container_id": runtime_info.container_id, + "status": runtime_info.status, + "preferred_url": runtime_info.preferred_url, + "ports": runtime_info.ports, + }), + ) + .await?; + if let Some(preview_url) = runtime_info.preferred_url { + self.db + .insert_job_event( + job.id, + "output_chunk", + serde_json::json!({ + "stream": "stdout", + "line": format!("[runtime] preview available at {preview_url}") + }), + ) + .await?; + } + } + let completed = self .db .complete_job( @@ -680,6 +728,91 @@ impl OtterService { self.db.list_job_events_since(since, limit).await } + pub fn runtime_enabled(&self) -> bool { + self.runtime_manager.is_some() + } + + pub async fn runtime_status_for_workspace( + &self, + workspace_id: Uuid, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + runtime.runtime_status(workspace_id).await + } + + pub async fn runtime_start_for_workspace( + &self, + workspace_id: Uuid, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + runtime.start_workspace_container(workspace_id).await + } + + pub async fn runtime_stop_for_workspace( + &self, + workspace_id: Uuid, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + runtime.stop_workspace_container(workspace_id).await + } + + pub async fn runtime_restart_for_workspace( + &self, + workspace_id: Uuid, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + runtime.restart_workspace_container(workspace_id).await + } + + pub async fn runtime_logs_for_workspace( + &self, + workspace_id: Uuid, + tail: usize, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + runtime.logs_for_workspace(workspace_id, tail).await + } + + pub async fn runtime_exec_for_workspace( + &self, + workspace_id: Uuid, + session_id: &str, + command: &str, + ) -> Result { + let runtime = self + .runtime_manager + .as_ref() + .ok_or_else(|| anyhow!("runtime is not enabled"))?; + let key = build_shell_session_key(workspace_id, session_id); + let cwd = { + let sessions = self.runtime_shell_cwds.lock().await; + sessions.get(&key).cloned() + }; + let result = runtime + .exec_in_workspace(workspace_id, command, cwd.as_deref()) + .await?; + { + let mut sessions = self.runtime_shell_cwds.lock().await; + sessions.insert(key, result.cwd.clone()); + } + Ok(result) + } + pub async fn cancel_job(&self, job_id: Uuid) -> Result { let result = self.db.mark_job_cancelled(job_id).await?; if result.rows_affected() > 0 { diff --git a/otter-core/src/vibe.rs b/otter-core/src/vibe.rs index 4deacc6..739f1b8 100644 --- a/otter-core/src/vibe.rs +++ b/otter-core/src/vibe.rs @@ -223,8 +223,6 @@ fn compose_vibe_prompt(user_prompt: &str) -> String { - Always create a production-ready Dockerfile at the project root and use it as the primary run path. - Build and run the generated app/service inside Docker (do not run directly on host process as the main path). - Verify the container is running and expose the app on a reachable port from the current environment. -- Always create a setup script named `setup.sh` at the project root that installs and configures everything needed to run the project. -- Ensure `setup.sh` is executable (`chmod +x setup.sh`) and deterministic/idempotent (safe to run multiple times). - When implementation is complete, start the app/service in background and verify it runs. - At the end, print clear run instructions: docker build command, docker run command, docker stop/remove command, and where the project lives. - Always include where to access the running app (URL/host port) in the final output. diff --git a/otter-server/src/main.rs b/otter-server/src/main.rs index db64217..0839fe1 100644 --- a/otter-server/src/main.rs +++ b/otter-server/src/main.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::convert::Infallible; use std::fs; use std::path::{Path as FsPath, PathBuf}; @@ -6,6 +5,7 @@ use std::process::Stdio; use std::sync::Arc; use anyhow::Result; +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::extract::{Multipart, Path, Query, State}; use axum::http::HeaderValue; use axum::http::StatusCode; @@ -14,6 +14,7 @@ use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::{Duration, Utc}; +use futures_util::{SinkExt, StreamExt}; use otter_core::config::AppConfig; use otter_core::db::Database; use otter_core::domain::{ @@ -21,6 +22,7 @@ use otter_core::domain::{ QueueItem, UpdateQueuePositionRequest, Workspace, }; use otter_core::queue::RedisQueue; +use otter_core::runtime::docker_manager::split_stdout_and_cwd; use otter_core::service::OtterService; use tokio::process::Command; use tokio::time::Duration as TokioDuration; @@ -33,7 +35,7 @@ use uuid::Uuid; struct AppState { service: Arc>, lavoix_url: String, - shell_sessions: Arc>>, + shell_sessions: Arc>>, } #[derive(serde::Deserialize)] @@ -111,6 +113,34 @@ struct WorkspaceCommandResponse { timed_out: bool, } +#[derive(serde::Deserialize)] +struct RuntimeLogsQuery { + tail: Option, +} + +#[derive(serde::Serialize)] +struct RuntimeLogsResponse { + workspace_id: Uuid, + logs: String, +} + +#[derive(serde::Deserialize)] +struct ShellWsClientMessage { + command: String, + shell_session_id: Option, +} + +#[derive(serde::Serialize)] +struct ShellWsServerMessage { + event: String, + command: Option, + stdout: Option, + stderr: Option, + exit_code: Option, + working_directory: Option, + error: Option, +} + #[derive(serde::Serialize)] struct JobResponse { job: Job, @@ -143,7 +173,7 @@ async fn main() -> Result<()> { let state = AppState { service, lavoix_url: config.lavoix_url.clone(), - shell_sessions: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + shell_sessions: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), }; let allow_any_origin = config @@ -179,6 +209,30 @@ async fn main() -> Result<()> { .route("/v1/workspaces/{id}/file", get(get_workspace_file)) .route("/v1/workspaces/{id}/command", post(run_workspace_command)) .route("/v1/workspaces/command", post(run_workspace_command_any)) + .route( + "/v1/runtime/workspaces/{id}", + get(get_runtime_workspace_status), + ) + .route( + "/v1/runtime/workspaces/{id}/start", + post(start_runtime_workspace_container), + ) + .route( + "/v1/runtime/workspaces/{id}/stop", + post(stop_runtime_workspace_container), + ) + .route( + "/v1/runtime/workspaces/{id}/restart", + post(restart_runtime_workspace_container), + ) + .route( + "/v1/runtime/workspaces/{id}/logs", + get(get_runtime_workspace_logs), + ) + .route( + "/v1/runtime/workspaces/{id}/shell/ws", + get(runtime_workspace_shell_ws), + ) .route("/v1/prompts", post(enqueue_prompt)) .route("/v1/voice/prompts", post(enqueue_voice_prompt)) .route("/v1/jobs/{id}", get(get_job)) @@ -390,6 +444,164 @@ async fn run_workspace_command_any( .map(Json) } +async fn get_runtime_workspace_status( + State(state): State, + Path(id): Path, +) -> Result { + state + .service + .runtime_status_for_workspace(id) + .await + .map(Json) + .map_err(internal_error) +} + +async fn start_runtime_workspace_container( + State(state): State, + Path(id): Path, +) -> Result { + state + .service + .runtime_start_for_workspace(id) + .await + .map(Json) + .map_err(internal_error) +} + +async fn stop_runtime_workspace_container( + State(state): State, + Path(id): Path, +) -> Result { + state + .service + .runtime_stop_for_workspace(id) + .await + .map(Json) + .map_err(internal_error) +} + +async fn restart_runtime_workspace_container( + State(state): State, + Path(id): Path, +) -> Result { + state + .service + .runtime_restart_for_workspace(id) + .await + .map(Json) + .map_err(internal_error) +} + +async fn get_runtime_workspace_logs( + State(state): State, + Path(id): Path, + Query(query): Query, +) -> Result { + let tail = query.tail.unwrap_or(500).clamp(1, 10_000); + let logs = state + .service + .runtime_logs_for_workspace(id, tail) + .await + .map_err(internal_error)?; + Ok(Json(RuntimeLogsResponse { + workspace_id: id, + logs, + })) +} + +async fn runtime_workspace_shell_ws( + State(state): State, + Path(id): Path, + ws: WebSocketUpgrade, +) -> Result { + if !state.service.runtime_enabled() { + return Err(( + StatusCode::BAD_REQUEST, + "runtime is not enabled".to_string(), + )); + } + Ok(ws.on_upgrade(move |socket| handle_runtime_shell_ws(socket, state, id))) +} + +async fn handle_runtime_shell_ws(mut socket: WebSocket, state: AppState, workspace_id: Uuid) { + while let Some(message) = socket.next().await { + let Ok(message) = message else { + break; + }; + match message { + Message::Text(text) => { + let payload = match serde_json::from_str::(&text) { + Ok(payload) => payload, + Err(error) => { + let _ = socket + .send(Message::Text( + serde_json::to_string(&ShellWsServerMessage { + event: "error".to_string(), + command: None, + stdout: None, + stderr: None, + exit_code: None, + working_directory: None, + error: Some(format!("invalid payload: {error}")), + }) + .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()), + )) + .await; + continue; + } + }; + let session_id = payload + .shell_session_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or("ws"); + let result = state + .service + .runtime_exec_for_workspace(workspace_id, session_id, &payload.command) + .await; + let response = match result { + Ok(out) => ShellWsServerMessage { + event: "result".to_string(), + command: Some(payload.command), + stdout: Some(out.stdout), + stderr: Some(out.stderr), + exit_code: Some(out.exit_code), + working_directory: Some(out.cwd), + error: None, + }, + Err(error) => ShellWsServerMessage { + event: "error".to_string(), + command: Some(payload.command), + stdout: None, + stderr: None, + exit_code: None, + working_directory: None, + error: Some(error.to_string()), + }, + }; + if socket + .send(Message::Text( + serde_json::to_string(&response) + .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()), + )) + .await + .is_err() + { + break; + } + } + Message::Close(_) => break, + Message::Ping(payload) => { + if socket.send(Message::Pong(payload)).await.is_err() { + break; + } + } + Message::Pong(_) | Message::Binary(_) => {} + } + } +} + async fn execute_workspace_command( state: AppState, workspace_id: Option, @@ -422,6 +634,25 @@ async fn execute_workspace_command( .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) .map(|value| value.chars().take(120).collect::()); + if state.service.runtime_enabled() { + let session_id = shell_session_id.as_deref().unwrap_or("workspace-shell"); + let result = state + .service + .runtime_exec_for_workspace(resolved_workspace_id, session_id, &body.command) + .await + .map_err(internal_error)?; + return Ok(WorkspaceCommandResponse { + workspace_id: resolved_workspace_id, + command: body.command, + working_directory: result.cwd, + shell_session_id, + exit_code: Some(result.exit_code as i32), + stdout: result.stdout, + stderr: result.stderr, + timed_out: false, + }); + } + let session_key = shell_session_id .as_ref() .map(|session_id| format!("{resolved_workspace_id}:{session_id}")); @@ -475,7 +706,8 @@ async fn execute_workspace_command( } let stdout_with_marker = String::from_utf8_lossy(&output.stdout).to_string(); - let (stdout, resolved_cwd) = split_stdout_and_cwd(&stdout_with_marker, &cwd); + let (stdout, resolved_cwd) = + split_stdout_and_cwd(&stdout_with_marker, &cwd.display().to_string()); if let Some(key) = session_key.as_ref() { state .shell_sessions @@ -496,27 +728,6 @@ async fn execute_workspace_command( }) } -fn split_stdout_and_cwd(stdout: &str, fallback_cwd: &FsPath) -> (String, String) { - const CWD_MARKER: &str = "__OTTER_CWD__:"; - let mut cleaned_lines = Vec::new(); - let mut resolved_cwd = fallback_cwd.display().to_string(); - for raw_line in stdout.lines() { - if let Some((_, cwd)) = raw_line.split_once(CWD_MARKER) { - let candidate = cwd.trim(); - if !candidate.is_empty() { - resolved_cwd = candidate.to_string(); - } - continue; - } - cleaned_lines.push(raw_line); - } - let mut cleaned_stdout = cleaned_lines.join("\n"); - if stdout.ends_with('\n') && !cleaned_stdout.is_empty() { - cleaned_stdout.push('\n'); - } - (cleaned_stdout, resolved_cwd) -} - async fn enqueue_prompt( State(state): State, Json(body): Json, @@ -873,12 +1084,11 @@ fn list_workspace_entries( #[cfg(test)] mod tests { use super::split_stdout_and_cwd; - use std::path::Path; #[test] fn strips_cwd_marker_from_stdout() { let stdout = "hello\n__OTTER_CWD__:/tmp/demo\n"; - let (cleaned, cwd) = split_stdout_and_cwd(stdout, Path::new("/fallback")); + let (cleaned, cwd) = split_stdout_and_cwd(stdout, "/fallback"); assert_eq!(cleaned, "hello\n"); assert_eq!(cwd, "/tmp/demo"); } From 13131e88c694855a1d86dbde38d06c88e14ee506 Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:07:01 +0400 Subject: [PATCH 02/11] Fix Bollard 0.18 API compatibility for runtime manager - use ContainerStateStatusEnum enum match instead of as_str() - pass working_dir as Option<&str> directly to CreateExecOptions - add output_capacity field to StartExecOptions - convert body stream to hyper::Body via .into() for build_image --- otter-core/src/runtime/docker_manager.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 0bafff3..709f97d 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -138,8 +138,10 @@ impl DockerRuntimeManager { let status = container .state .and_then(|state| state.status) - .map(|status| match status.as_str() { - "running" => RuntimeContainerStatus::Running, + .map(|status| match status { + bollard::models::ContainerStateStatusEnum::RUNNING => { + RuntimeContainerStatus::Running + } _ => RuntimeContainerStatus::Stopped, }) .unwrap_or(RuntimeContainerStatus::Stopped); @@ -254,7 +256,7 @@ impl DockerRuntimeManager { attach_stdout: Some(true), attach_stderr: Some(true), cmd: Some(vec!["bash", "-lc", wrapped_command.as_str()]), - working_dir: cwd.map(str::to_string), + working_dir: cwd, ..Default::default() }, ) @@ -270,6 +272,7 @@ impl DockerRuntimeManager { Some(StartExecOptions { detach: false, tty: false, + output_capacity: None, }), ) .await @@ -322,8 +325,11 @@ impl DockerRuntimeManager { pull: false, ..Default::default() }; - let body = stream::once(async move { Ok::(Bytes::from(context)) }); - let mut stream = self.docker.build_image(options, None, Some(body)); + let body_stream = + stream::once(async move { Ok::(Bytes::from(context)) }); + let mut stream = self + .docker + .build_image(options, None, Some(body_stream.into())); while let Some(chunk) = stream .try_next() .await From 3bd547dd3dfdf5a27f154df833a5a90162d189ea Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:10:56 +0400 Subject: [PATCH 03/11] Use hyper::Body for Docker build image stream - add hyper workspace dependency - replace stream::once + into() with direct hyper::Body::from() - simplifies tar context body construction for Bollard build_image API --- Cargo.toml | 3 ++- otter-core/Cargo.toml | 1 + otter-core/src/runtime/docker_manager.rs | 10 ++++------ 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 098a58e..fcd8236 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,12 @@ bytes = "1" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" futures-util = "0.3" +hyper = "1" redis = { version = "0.32", features = ["tokio-comp"] } serde = { version = "1", features = ["derive"] } serde_json = "1" sqlx = { version = "0.8.1", default-features = false, features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "json", "migrate", "derive"] } +tar = "0.4" thiserror = "2" tokio = { version = "1", features = ["macros", "rt-multi-thread", "process", "fs", "signal", "time", "net", "sync"] } tower-http = { version = "0.6", features = ["cors", "trace"] } @@ -29,7 +31,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } uuid = { version = "1", features = ["serde", "v4"] } validator = { version = "0.20", features = ["derive"] } reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] } -tar = "0.4" [patch.crates-io] home = { path = "vendor/home" } diff --git a/otter-core/Cargo.toml b/otter-core/Cargo.toml index 90f9c5f..7fea7b8 100644 --- a/otter-core/Cargo.toml +++ b/otter-core/Cargo.toml @@ -12,6 +12,7 @@ bytes.workspace = true chrono.workspace = true dotenvy.workspace = true futures-util.workspace = true +hyper.workspace = true redis.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 709f97d..6440017 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -12,7 +12,7 @@ use bollard::image::BuildImageOptions; use bollard::models::{HostConfig, PortBinding}; use bollard::{Docker, API_DEFAULT_VERSION}; use bytes::Bytes; -use futures_util::stream::{self, StreamExt, TryStreamExt}; +use futures_util::stream::{StreamExt, TryStreamExt}; use tar::Builder; use tracing::debug; use uuid::Uuid; @@ -325,11 +325,9 @@ impl DockerRuntimeManager { pull: false, ..Default::default() }; - let body_stream = - stream::once(async move { Ok::(Bytes::from(context)) }); - let mut stream = self - .docker - .build_image(options, None, Some(body_stream.into())); + + let body = hyper::Body::from(context); + let mut stream = self.docker.build_image(options, None, Some(body)); while let Some(chunk) = stream .try_next() .await From 295f87c6e8a4fad142537cfdedf52ec72d444dc1 Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:13:55 +0400 Subject: [PATCH 04/11] Fix hyper 1.x Body import path Import Body from hyper::body::Body instead of hyper::Body for hyper 1.x compatibility. --- otter-core/src/runtime/docker_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 6440017..3b870e0 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -11,8 +11,8 @@ use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults}; use bollard::image::BuildImageOptions; use bollard::models::{HostConfig, PortBinding}; use bollard::{Docker, API_DEFAULT_VERSION}; -use bytes::Bytes; use futures_util::stream::{StreamExt, TryStreamExt}; +use hyper::body::Body; use tar::Builder; use tracing::debug; use uuid::Uuid; @@ -326,7 +326,7 @@ impl DockerRuntimeManager { ..Default::default() }; - let body = hyper::Body::from(context); + let body = Body::from(context); let mut stream = self.docker.build_image(options, None, Some(body)); while let Some(chunk) = stream .try_next() From 913eac4160b6c866fe298f5ac43affadc3e64efd Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:20:05 +0400 Subject: [PATCH 05/11] Use http-body-util::Full for Docker build body Replace hyper Body trait with concrete Full type for Bollard build_image compatibility with hyper 1.x ecosystem. --- Cargo.toml | 1 + otter-core/Cargo.toml | 1 + otter-core/src/runtime/docker_manager.rs | 5 +++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fcd8236..eb8a985 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ bytes = "1" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" futures-util = "0.3" +http-body-util = "0.1" hyper = "1" redis = { version = "0.32", features = ["tokio-comp"] } serde = { version = "1", features = ["derive"] } diff --git a/otter-core/Cargo.toml b/otter-core/Cargo.toml index 7fea7b8..6c13053 100644 --- a/otter-core/Cargo.toml +++ b/otter-core/Cargo.toml @@ -12,6 +12,7 @@ bytes.workspace = true chrono.workspace = true dotenvy.workspace = true futures-util.workspace = true +http-body-util.workspace = true hyper.workspace = true redis.workspace = true serde.workspace = true diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 3b870e0..d42b33e 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -11,8 +11,9 @@ use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults}; use bollard::image::BuildImageOptions; use bollard::models::{HostConfig, PortBinding}; use bollard::{Docker, API_DEFAULT_VERSION}; +use bytes::Bytes; use futures_util::stream::{StreamExt, TryStreamExt}; -use hyper::body::Body; +use http_body_util::Full; use tar::Builder; use tracing::debug; use uuid::Uuid; @@ -326,7 +327,7 @@ impl DockerRuntimeManager { ..Default::default() }; - let body = Body::from(context); + let body = Full::new(Bytes::from(context)); let mut stream = self.docker.build_image(options, None, Some(body)); while let Some(chunk) = stream .try_next() From 6429c9b217bf397abca9e2f4bdb0b4a603edb20b Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:27:39 +0400 Subject: [PATCH 06/11] chore(deps): upgrade bollard to 0.20.1 and migrate docker_manager API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bump bollard 0.18 → 0.20.1 in workspace Cargo.toml - Drop now-unnecessary http-body-util and hyper workspace deps - Migrate docker_manager.rs to 0.20.1 API surface: - ContainerConfig → ContainerCreateBody (bollard::models) - All option structs use query_parameters::*OptionsBuilder - StartContainerOptions → StartContainerOptions (no generic) - StartExecOptions struct literal → None:: (default non-detach) - build_image body uses bollard::body_full(Bytes) instead of http_body_util::Full - BuildImageOptions struct literal → BuildImageOptionsBuilder - Remove platform field from CreateContainerOptions (dropped in 0.20) --- Cargo.toml | 4 +- otter-core/Cargo.toml | 2 - otter-core/src/runtime/docker_manager.rs | 116 ++++++++++------------- 3 files changed, 50 insertions(+), 72 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eb8a985..67ca347 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,11 @@ anyhow = "1" async-trait = "0.1" async-stream = "0.3" axum = { version = "0.8", features = ["multipart", "ws"] } -bollard = "0.18" +bollard = "0.20.1" bytes = "1" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" futures-util = "0.3" -http-body-util = "0.1" -hyper = "1" redis = { version = "0.32", features = ["tokio-comp"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/otter-core/Cargo.toml b/otter-core/Cargo.toml index 6c13053..90f9c5f 100644 --- a/otter-core/Cargo.toml +++ b/otter-core/Cargo.toml @@ -12,8 +12,6 @@ bytes.workspace = true chrono.workspace = true dotenvy.workspace = true futures-util.workspace = true -http-body-util.workspace = true -hyper.workspace = true redis.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index d42b33e..8b428b5 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -2,18 +2,18 @@ use std::collections::HashMap; use std::path::Path; use anyhow::{anyhow, Context, Result}; -use bollard::container::{ - Config as ContainerConfig, CreateContainerOptions, InspectContainerOptions, - ListContainersOptions, LogOutput, LogsOptions, RemoveContainerOptions, RestartContainerOptions, - StartContainerOptions, StopContainerOptions, -}; +use bollard::body_full; +use bollard::container::{InspectContainerOptions, LogOutput, StartContainerOptions}; use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults}; -use bollard::image::BuildImageOptions; -use bollard::models::{HostConfig, PortBinding}; +use bollard::models::{ContainerCreateBody, HostConfig, PortBinding}; +use bollard::query_parameters::{ + BuildImageOptionsBuilder, CreateContainerOptionsBuilder, ListContainersOptionsBuilder, + LogsOptionsBuilder, RemoveContainerOptionsBuilder, RestartContainerOptionsBuilder, + StopContainerOptionsBuilder, +}; use bollard::{Docker, API_DEFAULT_VERSION}; use bytes::Bytes; use futures_util::stream::{StreamExt, TryStreamExt}; -use http_body_util::Full; use tar::Builder; use tracing::debug; use uuid::Uuid; @@ -78,26 +78,23 @@ impl DockerRuntimeManager { .await? .is_some() { + let remove_opts = RemoveContainerOptionsBuilder::default() + .force(true) + .v(true) + .build(); self.docker - .remove_container( - &container_name, - Some(RemoveContainerOptions { - force: true, - v: true, - link: false, - }), - ) + .remove_container(&container_name, Some(remove_opts)) .await .with_context(|| format!("failed to remove previous container {container_name}"))?; } + let create_opts = CreateContainerOptionsBuilder::default() + .name(&container_name) + .build(); self.docker .create_container( - Some(CreateContainerOptions { - name: container_name.clone(), - platform: None, - }), - ContainerConfig { + Some(create_opts), + ContainerCreateBody { image: Some(image_tag.clone()), tty: Some(true), host_config: Some(HostConfig { @@ -112,7 +109,7 @@ impl DockerRuntimeManager { .with_context(|| format!("failed to create runtime container {container_name}"))?; self.docker - .start_container(&container_name, None::>) + .start_container(&container_name, None::) .await .with_context(|| format!("failed to start runtime container {container_name}"))?; @@ -181,7 +178,7 @@ impl DockerRuntimeManager { ) -> Result { let container_name = self.container_name_for(workspace_id); self.docker - .start_container(&container_name, None::>) + .start_container(&container_name, None::) .await .with_context(|| format!("failed to start container {container_name}"))?; self.runtime_status(workspace_id).await @@ -192,8 +189,9 @@ impl DockerRuntimeManager { workspace_id: Uuid, ) -> Result { let container_name = self.container_name_for(workspace_id); + let stop_opts = StopContainerOptionsBuilder::default().t(10).build(); self.docker - .stop_container(&container_name, Some(StopContainerOptions { t: 10 })) + .stop_container(&container_name, Some(stop_opts)) .await .with_context(|| format!("failed to stop container {container_name}"))?; self.runtime_status(workspace_id).await @@ -204,8 +202,9 @@ impl DockerRuntimeManager { workspace_id: Uuid, ) -> Result { let container_name = self.container_name_for(workspace_id); + let restart_opts = RestartContainerOptionsBuilder::default().t(10).build(); self.docker - .restart_container(&container_name, Some(RestartContainerOptions { t: 10 })) + .restart_container(&container_name, Some(restart_opts)) .await .with_context(|| format!("failed to restart container {container_name}"))?; self.runtime_status(workspace_id).await @@ -213,16 +212,13 @@ impl DockerRuntimeManager { pub async fn logs_for_workspace(&self, workspace_id: Uuid, tail: usize) -> Result { let container_name = self.container_name_for(workspace_id); - let mut stream = self.docker.logs( - &container_name, - Some(LogsOptions:: { - stdout: true, - stderr: true, - follow: false, - tail: tail.to_string(), - ..Default::default() - }), - ); + let logs_opts = LogsOptionsBuilder::default() + .stdout(true) + .stderr(true) + .follow(false) + .tail(tail.to_string()) + .build(); + let mut stream = self.docker.logs(&container_name, Some(logs_opts)); let mut chunks = Vec::new(); while let Some(item) = stream.next().await { @@ -268,14 +264,7 @@ impl DockerRuntimeManager { let mut stderr = String::new(); if let StartExecResults::Attached { mut output, .. } = self .docker - .start_exec( - &exec.id, - Some(StartExecOptions { - detach: false, - tty: false, - output_capacity: None, - }), - ) + .start_exec(&exec.id, None::) .await .with_context(|| format!("failed to start exec for container {container_name}"))? { @@ -319,16 +308,15 @@ impl DockerRuntimeManager { } let context = tar_workspace(workspace_root)?; - let options = BuildImageOptions { - dockerfile: "Dockerfile".to_string(), - t: tag.to_string(), - rm: true, - pull: false, - ..Default::default() - }; + let options = BuildImageOptionsBuilder::default() + .dockerfile("Dockerfile") + .t(tag) + .rm(true) + .build(); - let body = Full::new(Bytes::from(context)); - let mut stream = self.docker.build_image(options, None, Some(body)); + let mut stream = self + .docker + .build_image(options, None, Some(body_full(Bytes::from(context)))); while let Some(chunk) = stream .try_next() .await @@ -364,13 +352,10 @@ impl DockerRuntimeManager { } pub async fn prune_stale_containers(&self) -> Result<()> { - let containers = self - .docker - .list_containers(Some(ListContainersOptions:: { - all: true, - ..Default::default() - })) - .await?; + let list_opts = ListContainersOptionsBuilder::default() + .all(true) + .build(); + let containers = self.docker.list_containers(Some(list_opts)).await?; for container in containers { let Some(names) = container.names else { continue; @@ -383,16 +368,13 @@ impl DockerRuntimeManager { continue; } if let Some(id) = container.id { + let remove_opts = RemoveContainerOptionsBuilder::default() + .force(true) + .v(true) + .build(); let _ = self .docker - .remove_container( - &id, - Some(RemoveContainerOptions { - force: true, - v: true, - link: false, - }), - ) + .remove_container(&id, Some(remove_opts)) .await; } } From beaa86a6feb282843afb7959a8520696655af4c2 Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:34:54 +0400 Subject: [PATCH 07/11] fix(runtime): resolve bollard 0.20.1 API breakage in docker_manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - InspectContainerOptions and StartContainerOptions moved to bollard::query_parameters — update import paths - LogsOptionsBuilder::tail() now takes &str — borrow the String - BuildInfo::error field renamed to error_detail (ErrorDetail struct) — unwrap message from error_detail for human-readable error output --- otter-core/src/runtime/docker_manager.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 8b428b5..863a0ee 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -3,13 +3,13 @@ use std::path::Path; use anyhow::{anyhow, Context, Result}; use bollard::body_full; -use bollard::container::{InspectContainerOptions, LogOutput, StartContainerOptions}; +use bollard::container::LogOutput; use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults}; use bollard::models::{ContainerCreateBody, HostConfig, PortBinding}; use bollard::query_parameters::{ - BuildImageOptionsBuilder, CreateContainerOptionsBuilder, ListContainersOptionsBuilder, - LogsOptionsBuilder, RemoveContainerOptionsBuilder, RestartContainerOptionsBuilder, - StopContainerOptionsBuilder, + BuildImageOptionsBuilder, CreateContainerOptionsBuilder, InspectContainerOptions, + ListContainersOptionsBuilder, LogsOptionsBuilder, RemoveContainerOptionsBuilder, + RestartContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder, }; use bollard::{Docker, API_DEFAULT_VERSION}; use bytes::Bytes; @@ -216,7 +216,7 @@ impl DockerRuntimeManager { .stdout(true) .stderr(true) .follow(false) - .tail(tail.to_string()) + .tail(&tail.to_string()) .build(); let mut stream = self.docker.logs(&container_name, Some(logs_opts)); @@ -322,8 +322,11 @@ impl DockerRuntimeManager { .await .context("failed docker image build stream")? { - if let Some(error) = chunk.error { - return Err(anyhow!("docker image build failed: {error}")); + if let Some(error_detail) = chunk.error_detail { + let msg = error_detail + .message + .unwrap_or_else(|| "unknown build error".to_string()); + return Err(anyhow!("docker image build failed: {msg}")); } if let Some(message) = chunk.stream { debug!(image = %tag, output = %message.trim_end(), "docker build output"); From eff8d2afc843bda4fc4bdc4c088f22dfc0ef568d Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 01:39:22 +0400 Subject: [PATCH 08/11] fix(server): adapt WebSocket Message::Text to axum 0.8 Utf8Bytes axum 0.8 changed Message::Text to wrap Utf8Bytes instead of String. Add .into() conversions at both send sites and remove the now-unused SinkExt import. --- otter-server/src/main.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/otter-server/src/main.rs b/otter-server/src/main.rs index 0839fe1..c9a1ecd 100644 --- a/otter-server/src/main.rs +++ b/otter-server/src/main.rs @@ -14,7 +14,7 @@ use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::{Duration, Utc}; -use futures_util::{SinkExt, StreamExt}; +use futures_util::StreamExt; use otter_core::config::AppConfig; use otter_core::db::Database; use otter_core::domain::{ @@ -544,7 +544,8 @@ async fn handle_runtime_shell_ws(mut socket: WebSocket, state: AppState, workspa working_directory: None, error: Some(format!("invalid payload: {error}")), }) - .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()), + .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()) + .into(), )) .await; continue; @@ -583,7 +584,8 @@ async fn handle_runtime_shell_ws(mut socket: WebSocket, state: AppState, workspa if socket .send(Message::Text( serde_json::to_string(&response) - .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()), + .unwrap_or_else(|_| "{\"event\":\"error\"}".to_string()) + .into(), )) .await .is_err() From 2809c322188e90533a7f4358f24a2ae687ad6331 Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 02:38:49 +0400 Subject: [PATCH 09/11] feat(runtime): add preview URL API and queue pause/resume controls Add per-job preview URL registration endpoint and include preview_url/is_paused in job models. Introduce queue pause/resume semantics so paused queued jobs stay in queue but are not claimable until resumed, with resume re-enqueue behavior. Also improve runtime reliability by auto-recovering shell exec on missing/stopped containers and using a Docker client API version floor compatible with newer daemons. Update Otter runtime image to install modern Docker CLI/buildx/compose tooling from Docker's official apt repository. --- .env.example | 1 + Dockerfile | 44 ++++++- .../20260302120000_add_jobs_preview_url.sql | 3 + .../20260302133000_add_jobs_is_paused.sql | 3 + otter-core/src/config.rs | 3 + otter-core/src/db.rs | 56 ++++++++- otter-core/src/domain.rs | 3 + otter-core/src/runtime/docker_manager.rs | 12 +- otter-core/src/service.rs | 95 +++++++++++++-- otter-core/src/vibe.rs | 59 +++++++-- otter-server/src/main.rs | 112 ++++++++++++++++-- 11 files changed, 349 insertions(+), 42 deletions(-) create mode 100644 otter-core/migrations/20260302120000_add_jobs_preview_url.sql create mode 100644 otter-core/migrations/20260302133000_add_jobs_is_paused.sql diff --git a/.env.example b/.env.example index 3f65555..09f1111 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ MISTRAL_API_KEY=replace-me OTTER_CORS_ALLOWED_ORIGINS=http://localhost:5173 OTTER_DEFAULT_WORKSPACE_PATH=/workspaces +OTTER_API_BASE_URL=http://otter-server:8080 OTTER_VIBE_MODEL=mistral-large-3 OTTER_VIBE_PROVIDER=mistral # Optional extra env forwarded to vibe process (comma-separated KEY=VALUE pairs) diff --git a/Dockerfile b/Dockerfile index 97df080..9df50ce 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,49 @@ RUN cargo build --release --bin otter-server --bin otter-worker FROM debian:bookworm-slim RUN apt-get update && \ - apt-get install -y --no-install-recommends ca-certificates curl bash && \ + apt-get install -y --no-install-recommends \ + bash \ + ca-certificates \ + coreutils \ + curl \ + dnsutils \ + findutils \ + gawk \ + git \ + gnupg \ + iproute2 \ + iputils-ping \ + jq \ + less \ + lsb-release \ + lsof \ + make \ + nano \ + net-tools \ + openssh-client \ + procps \ + psmisc \ + python3 \ + python3-pip \ + python3-venv \ + rsync \ + tar \ + tree \ + unzip \ + vim-tiny \ + wget \ + zip && \ + install -m 0755 -d /etc/apt/keyrings && \ + curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \ + chmod a+r /etc/apt/keyrings/docker.asc && \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \ + $(. /etc/os-release && echo \"$VERSION_CODENAME\") stable" > /etc/apt/sources.list.d/docker.list && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + docker-ce-cli \ + docker-buildx-plugin \ + docker-compose-plugin && \ + python3 -m pip install --no-cache-dir --break-system-packages psutil && \ rm -rf /var/lib/apt/lists/* RUN curl -LsSf https://mistral.ai/vibe/install.sh | bash && \ diff --git a/otter-core/migrations/20260302120000_add_jobs_preview_url.sql b/otter-core/migrations/20260302120000_add_jobs_preview_url.sql new file mode 100644 index 0000000..bfb3439 --- /dev/null +++ b/otter-core/migrations/20260302120000_add_jobs_preview_url.sql @@ -0,0 +1,3 @@ +ALTER TABLE jobs +ADD COLUMN IF NOT EXISTS preview_url TEXT; + diff --git a/otter-core/migrations/20260302133000_add_jobs_is_paused.sql b/otter-core/migrations/20260302133000_add_jobs_is_paused.sql new file mode 100644 index 0000000..2108ce3 --- /dev/null +++ b/otter-core/migrations/20260302133000_add_jobs_is_paused.sql @@ -0,0 +1,3 @@ +ALTER TABLE jobs +ADD COLUMN IF NOT EXISTS is_paused BOOLEAN NOT NULL DEFAULT FALSE; + diff --git a/otter-core/src/config.rs b/otter-core/src/config.rs index 3d580cc..532d485 100644 --- a/otter-core/src/config.rs +++ b/otter-core/src/config.rs @@ -17,6 +17,7 @@ pub struct AppConfig { pub default_workspace_path: Option, pub default_workspace_subdir: String, pub lavoix_url: String, + pub otter_api_base_url: String, pub max_attempts: i32, pub worker_concurrency: usize, pub runtime: RuntimeConfig, @@ -82,6 +83,8 @@ impl AppConfig { .unwrap_or_else(|_| "auto".to_string()), lavoix_url: env::var("OTTER_LAVOIX_URL") .unwrap_or_else(|_| "http://lavoix:8090".to_string()), + otter_api_base_url: env::var("OTTER_API_BASE_URL") + .unwrap_or_else(|_| "http://otter-server:8080".to_string()), max_attempts: env::var("OTTER_MAX_ATTEMPTS") .ok() .and_then(|value| value.parse().ok()) diff --git a/otter-core/src/db.rs b/otter-core/src/db.rs index d749ecc..eb28644 100644 --- a/otter-core/src/db.rs +++ b/otter-core/src/db.rs @@ -143,7 +143,7 @@ impl Database { r#" INSERT INTO jobs (workspace_id, prompt, status, priority, schedule_at, attempts, max_attempts) VALUES ($1, $2, 'queued', $3, $4, 0, $5) - RETURNING id, workspace_id, prompt, status, priority, schedule_at, attempts, max_attempts, error, created_at, updated_at + RETURNING id, workspace_id, prompt, preview_url, is_paused, status, priority, schedule_at, attempts, max_attempts, error, created_at, updated_at "#, ) .bind(workspace_id) @@ -159,7 +159,7 @@ impl Database { pub async fn fetch_job(&self, job_id: Uuid) -> Result> { let job = sqlx::query_as::<_, Job>( r#" - SELECT id, workspace_id, prompt, status, priority, schedule_at, attempts, max_attempts, error, created_at, updated_at + SELECT id, workspace_id, prompt, preview_url, is_paused, status, priority, schedule_at, attempts, max_attempts, error, created_at, updated_at FROM jobs WHERE id = $1 "#, ) @@ -294,8 +294,9 @@ impl Database { updated_at = now() WHERE j.id = $1 AND j.status = 'queued' + AND j.is_paused = false AND (j.schedule_at IS NULL OR j.schedule_at <= now()) - RETURNING j.id, j.workspace_id, j.prompt, j.status, j.priority, j.schedule_at, j.attempts, j.max_attempts, j.error, j.created_at, j.updated_at + RETURNING j.id, j.workspace_id, j.prompt, j.preview_url, j.is_paused, j.status, j.priority, j.schedule_at, j.attempts, j.max_attempts, j.error, j.created_at, j.updated_at "#, ) .bind(job_id) @@ -418,6 +419,7 @@ impl Database { id AS job_id, workspace_id, prompt, + is_paused, priority, schedule_at, queue_rank, @@ -427,6 +429,7 @@ impl Database { id, workspace_id, prompt, + is_paused, priority, schedule_at, created_at, @@ -435,7 +438,7 @@ impl Database { ) AS queue_rank FROM jobs WHERE status = 'queued' - AND (schedule_at IS NULL OR schedule_at <= now()) + AND (schedule_at IS NULL OR schedule_at <= now() OR is_paused = true) ) queued ORDER BY queue_rank ASC LIMIT $1 OFFSET $2 @@ -463,4 +466,49 @@ impl Database { .await?; Ok(result.rows_affected() > 0) } + + pub async fn pause_queued_job(&self, job_id: Uuid) -> Result { + let result = sqlx::query( + r#" + UPDATE jobs + SET is_paused = true, updated_at = now() + WHERE id = $1 + AND status = 'queued' + "#, + ) + .bind(job_id) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } + + pub async fn resume_queued_job(&self, job_id: Uuid) -> Result { + let result = sqlx::query( + r#" + UPDATE jobs + SET is_paused = false, updated_at = now() + WHERE id = $1 + AND status = 'queued' + "#, + ) + .bind(job_id) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } + + pub async fn set_job_preview_url(&self, job_id: Uuid, preview_url: &str) -> Result { + let result = sqlx::query( + r#" + UPDATE jobs + SET preview_url = $2, updated_at = now() + WHERE id = $1 + "#, + ) + .bind(job_id) + .bind(preview_url) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() > 0) + } } diff --git a/otter-core/src/domain.rs b/otter-core/src/domain.rs index bea9e9d..accb7bb 100644 --- a/otter-core/src/domain.rs +++ b/otter-core/src/domain.rs @@ -37,6 +37,8 @@ pub struct Job { pub id: Uuid, pub workspace_id: Uuid, pub prompt: String, + pub preview_url: Option, + pub is_paused: bool, pub status: JobStatus, pub priority: i32, pub schedule_at: Option>, @@ -112,6 +114,7 @@ pub struct QueueItem { pub job_id: Uuid, pub workspace_id: Uuid, pub prompt: String, + pub is_paused: bool, pub priority: i32, pub schedule_at: Option>, pub queue_rank: i64, diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index 863a0ee..ceae79f 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -11,7 +11,7 @@ use bollard::query_parameters::{ ListContainersOptionsBuilder, LogsOptionsBuilder, RemoveContainerOptionsBuilder, RestartContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder, }; -use bollard::{Docker, API_DEFAULT_VERSION}; +use bollard::{ClientVersion, Docker}; use bytes::Bytes; use futures_util::stream::{StreamExt, TryStreamExt}; use tar::Builder; @@ -386,11 +386,17 @@ impl DockerRuntimeManager { } fn connect_docker(socket: &str) -> Result { + // Some Docker daemons reject very old client API versions (e.g. 1.41). + // Pin to a modern baseline compatible with current daemon minimums. + let client_version = &ClientVersion { + major_version: 1, + minor_version: 44, + }; if let Some(path) = socket.strip_prefix("unix://") { - Docker::connect_with_unix(path, 120, API_DEFAULT_VERSION) + Docker::connect_with_unix(path, 120, client_version) .with_context(|| format!("failed to connect docker on unix socket {socket}")) } else { - Docker::connect_with_local_defaults() + Docker::connect_with_unix("/var/run/docker.sock", 120, client_version) .context("failed to connect docker with local defaults") } } diff --git a/otter-core/src/service.rs b/otter-core/src/service.rs index ffebb67..23a6e7a 100644 --- a/otter-core/src/service.rs +++ b/otter-core/src/service.rs @@ -10,7 +10,7 @@ use chrono::Utc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::Mutex; -use tokio::time::sleep; +use tokio::time::{interval, sleep, Duration as TokioDuration, MissedTickBehavior}; use tracing::{info, warn}; use uuid::Uuid; use validator::Validate; @@ -19,7 +19,8 @@ use crate::config::AppConfig; use crate::db::Database; use crate::domain::{ CreateProjectRequest, CreateWorkspaceRequest, EnqueuePromptRequest, Job, JobEvent, JobStatus, - Project, QueueItem, RuntimeContainerInfo, UpdateQueuePositionRequest, Workspace, + Project, QueueItem, RuntimeContainerInfo, RuntimeContainerStatus, UpdateQueuePositionRequest, + Workspace, }; use crate::queue::{Queue, QueueMessage}; use crate::runtime::docker_manager::{DockerRuntimeManager, RuntimeExecResult}; @@ -60,6 +61,7 @@ impl OtterService { )), vibe_executor: Arc::new(VibeExecutor::new( config.vibe_bin.clone(), + config.otter_api_base_url.clone(), config.vibe_model.clone(), config.vibe_provider.clone(), config.vibe_extra_env.clone(), @@ -315,8 +317,10 @@ impl OtterService { .schedule_at .map(|at| at > Utc::now()) .unwrap_or(false); - if should_requeue_for_schedule { - info!(job_id = %message.job_id, "job scheduled in future; requeued message"); + let should_requeue_for_pause = + matches!(job_state.status, JobStatus::Queued) && job_state.is_paused; + if should_requeue_for_schedule || should_requeue_for_pause { + info!(job_id = %message.job_id, "job not runnable yet; requeued message"); self.queue .enqueue( DEFAULT_QUEUE_NAME, @@ -328,7 +332,8 @@ impl OtterService { info!( job_id = %message.job_id, schedule_at = ?job_state.schedule_at, - "job is scheduled in the future; re-queued" + is_paused = job_state.is_paused, + "job is not runnable yet; re-queued" ); } } @@ -439,6 +444,7 @@ impl OtterService { .vibe_executor .run_prompt_streaming( &job.prompt, + job.id, &workspace_path, &isolated_vibe_home, |chunk: VibeOutputChunk| { @@ -458,6 +464,10 @@ impl OtterService { Ok(()) } }, + || async { + let status = self.db.fetch_job(job.id).await?.map(|value| value.status); + Ok(matches!(status, Some(JobStatus::Cancelled))) + }, ) .await?; info!( @@ -586,6 +596,8 @@ impl OtterService { let mut stdout_lines = BufReader::new(stdout).lines(); let mut stderr_lines = BufReader::new(stderr).lines(); let mut line_count: usize = 0; + let mut cancel_poll = interval(TokioDuration::from_millis(400)); + cancel_poll.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { tokio::select! { @@ -625,6 +637,14 @@ impl OtterService { None => break, } } + _ = cancel_poll.tick() => { + let status = self.db.fetch_job(job_id).await?.map(|job| job.status); + if matches!(status, Some(JobStatus::Cancelled)) { + let _ = child.kill().await; + let _ = child.wait().await; + return Err(anyhow!("job cancelled while setup.sh was running")); + } + } } } @@ -799,13 +819,47 @@ impl OtterService { .as_ref() .ok_or_else(|| anyhow!("runtime is not enabled"))?; let key = build_shell_session_key(workspace_id, session_id); - let cwd = { + let mut retry_cwd = { let sessions = self.runtime_shell_cwds.lock().await; sessions.get(&key).cloned() }; - let result = runtime - .exec_in_workspace(workspace_id, command, cwd.as_deref()) - .await?; + let result = match runtime + .exec_in_workspace(workspace_id, command, retry_cwd.as_deref()) + .await + { + Ok(result) => result, + Err(initial_error) => { + let status = runtime.runtime_status(workspace_id).await?; + match status.status { + RuntimeContainerStatus::Running => return Err(initial_error), + RuntimeContainerStatus::Stopped => { + runtime.start_workspace_container(workspace_id).await?; + } + RuntimeContainerStatus::Missing => { + let workspace = self + .db + .fetch_workspace(workspace_id) + .await? + .ok_or_else(|| anyhow!("workspace not found: {workspace_id}"))?; + runtime + .ensure_workspace_container(workspace_id, Path::new(&workspace.root_path)) + .await?; + // New container starts from default cwd; clear any stale cwd from prior session. + let mut sessions = self.runtime_shell_cwds.lock().await; + sessions.remove(&key); + retry_cwd = None; + } + } + runtime + .exec_in_workspace(workspace_id, command, retry_cwd.as_deref()) + .await + .map_err(|retry_error| { + anyhow!( + "failed to execute command in runtime container after recovery (first error: {initial_error}; retry error: {retry_error})" + ) + })? + } + }; { let mut sessions = self.runtime_shell_cwds.lock().await; sessions.insert(key, result.cwd.clone()); @@ -823,4 +877,27 @@ impl OtterService { } Ok(false) } + + pub async fn pause_job(&self, job_id: Uuid) -> Result { + let updated = self.db.pause_queued_job(job_id).await?; + if updated { + self.db + .insert_job_event(job_id, "paused", serde_json::json!({})) + .await?; + } + Ok(updated) + } + + pub async fn resume_job(&self, job_id: Uuid) -> Result { + let updated = self.db.resume_queued_job(job_id).await?; + if updated { + self.db + .insert_job_event(job_id, "resumed", serde_json::json!({})) + .await?; + self.queue + .enqueue(DEFAULT_QUEUE_NAME, &QueueMessage { job_id }) + .await?; + } + Ok(updated) + } } diff --git a/otter-core/src/vibe.rs b/otter-core/src/vibe.rs index 739f1b8..2476bc1 100644 --- a/otter-core/src/vibe.rs +++ b/otter-core/src/vibe.rs @@ -7,10 +7,12 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::mpsc; +use tokio::time::{interval, Duration, MissedTickBehavior}; #[derive(Debug, Clone)] pub struct VibeExecutor { pub vibe_bin: String, + pub otter_api_base_url: String, pub vibe_model: Option, pub vibe_provider: Option, pub vibe_extra_env: Vec<(String, String)>, @@ -40,12 +42,14 @@ pub struct VibeOutputChunk { impl VibeExecutor { pub fn new( vibe_bin: String, + otter_api_base_url: String, vibe_model: Option, vibe_provider: Option, vibe_extra_env: Vec<(String, String)>, ) -> Self { Self { vibe_bin, + otter_api_base_url, vibe_model, vibe_provider, vibe_extra_env, @@ -69,10 +73,11 @@ impl VibeExecutor { pub async fn run_prompt( &self, prompt: &str, + job_id: uuid::Uuid, workspace_path: &Path, isolated_vibe_home: &Path, ) -> Result { - let effective_prompt = compose_vibe_prompt(prompt); + let effective_prompt = compose_vibe_prompt(prompt, job_id, &self.otter_api_base_url); let mut cmd = Command::new(&self.vibe_bin); cmd.arg("--prompt") .arg(&effective_prompt) @@ -108,18 +113,22 @@ impl VibeExecutor { }) } - pub async fn run_prompt_streaming( + pub async fn run_prompt_streaming( &self, prompt: &str, + job_id: uuid::Uuid, workspace_path: &Path, isolated_vibe_home: &Path, mut on_chunk: F, + mut should_cancel: C, ) -> Result where F: FnMut(VibeOutputChunk) -> Fut, Fut: Future>, + C: FnMut() -> CFut, + CFut: Future>, { - let effective_prompt = compose_vibe_prompt(prompt); + let effective_prompt = compose_vibe_prompt(prompt, job_id, &self.otter_api_base_url); let mut cmd = Command::new(&self.vibe_bin); cmd.arg("--prompt") .arg(&effective_prompt) @@ -168,22 +177,42 @@ impl VibeExecutor { Ok::<(), anyhow::Error>(()) }); drop(tx); + let mut cancel_poll = interval(Duration::from_millis(400)); + cancel_poll.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut stdout_buffer = String::new(); let mut stderr_buffer = String::new(); - while let Some(chunk) = rx.recv().await { - match chunk.stream { - VibeStream::Stdout => { - stdout_buffer.push_str(&chunk.line); - stdout_buffer.push('\n'); + loop { + tokio::select! { + maybe_chunk = rx.recv() => { + let Some(chunk) = maybe_chunk else { + break; + }; + match chunk.stream { + VibeStream::Stdout => { + stdout_buffer.push_str(&chunk.line); + stdout_buffer.push('\n'); + } + VibeStream::Stderr => { + stderr_buffer.push_str(&chunk.line); + stderr_buffer.push('\n'); + } + } + if let Err(error) = on_chunk(chunk).await { + let _ = child.kill().await; + let _ = child.wait().await; + return Err(error); + } } - VibeStream::Stderr => { - stderr_buffer.push_str(&chunk.line); - stderr_buffer.push('\n'); + _ = cancel_poll.tick() => { + if should_cancel().await? { + let _ = child.kill().await; + let _ = child.wait().await; + bail!("job cancelled"); + } } } - on_chunk(chunk).await?; } stdout_task @@ -214,7 +243,7 @@ impl VibeExecutor { } } -fn compose_vibe_prompt(user_prompt: &str) -> String { +fn compose_vibe_prompt(user_prompt: &str, job_id: uuid::Uuid, otter_api_base_url: &str) -> String { format!( r#"SYSTEM REQUIREMENTS (ALWAYS APPLY): - Work in a project-specific subfolder under the current workspace. Never develop directly in workspace root. @@ -224,6 +253,10 @@ fn compose_vibe_prompt(user_prompt: &str) -> String { - Build and run the generated app/service inside Docker (do not run directly on host process as the main path). - Verify the container is running and expose the app on a reachable port from the current environment. - When implementation is complete, start the app/service in background and verify it runs. +- Use this exact Job ID when setting demo URL: `{job_id}`. +- ONLY AT THE END, AFTER EVERYTHING ELSE: after you determine the runnable app URL, call this API to set preview URL for this job: + `curl -sS -X POST "{otter_api_base_url}/v1/jobs/{job_id}/preview-url" -H "Content-Type: application/json" -d '{{"preview_url":"http://:"}}'` +- Only send a real reachable HTTP/HTTPS URL as preview_url. - At the end, print clear run instructions: docker build command, docker run command, docker stop/remove command, and where the project lives. - Always include where to access the running app (URL/host port) in the final output. diff --git a/otter-server/src/main.rs b/otter-server/src/main.rs index c9a1ecd..335db2d 100644 --- a/otter-server/src/main.rs +++ b/otter-server/src/main.rs @@ -159,6 +159,11 @@ struct VoiceEnqueueResponse { job: Job, } +#[derive(serde::Deserialize)] +struct SetJobPreviewUrlRequest { + preview_url: String, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -237,6 +242,9 @@ async fn main() -> Result<()> { .route("/v1/voice/prompts", post(enqueue_voice_prompt)) .route("/v1/jobs/{id}", get(get_job)) .route("/v1/jobs/{id}/events", get(get_job_events)) + .route("/v1/jobs/{id}/preview-url", post(set_job_preview_url)) + .route("/v1/jobs/{id}/pause", post(pause_job)) + .route("/v1/jobs/{id}/resume", post(resume_job)) .route("/v1/events/stream", get(stream_job_events)) .route("/v1/jobs/{id}/cancel", post(cancel_job)) .route("/v1/queue", get(get_queue)) @@ -638,21 +646,32 @@ async fn execute_workspace_command( .map(|value| value.chars().take(120).collect::()); if state.service.runtime_enabled() { let session_id = shell_session_id.as_deref().unwrap_or("workspace-shell"); - let result = state + match state .service .runtime_exec_for_workspace(resolved_workspace_id, session_id, &body.command) .await - .map_err(internal_error)?; - return Ok(WorkspaceCommandResponse { - workspace_id: resolved_workspace_id, - command: body.command, - working_directory: result.cwd, - shell_session_id, - exit_code: Some(result.exit_code as i32), - stdout: result.stdout, - stderr: result.stderr, - timed_out: false, - }); + { + Ok(result) => { + return Ok(WorkspaceCommandResponse { + workspace_id: resolved_workspace_id, + command: body.command, + working_directory: result.cwd, + shell_session_id, + exit_code: Some(result.exit_code as i32), + stdout: result.stdout, + stderr: result.stderr, + timed_out: false, + }); + } + Err(error) => { + warn!( + workspace_id = %resolved_workspace_id, + command = %body.command, + error = %error, + "runtime command failed; falling back to host workspace shell" + ); + } + } } let session_key = shell_session_id @@ -942,6 +961,75 @@ async fn cancel_job( Ok(StatusCode::NO_CONTENT) } +async fn pause_job( + State(state): State, + Path(id): Path, +) -> Result { + let paused = state.service.pause_job(id).await.map_err(internal_error)?; + if !paused { + return Err((StatusCode::NOT_FOUND, "queued job not found".into())); + } + Ok(StatusCode::NO_CONTENT) +} + +async fn resume_job( + State(state): State, + Path(id): Path, +) -> Result { + let resumed = state.service.resume_job(id).await.map_err(internal_error)?; + if !resumed { + return Err((StatusCode::NOT_FOUND, "paused queued job not found".into())); + } + Ok(StatusCode::NO_CONTENT) +} + +async fn set_job_preview_url( + State(state): State, + Path(id): Path, + Json(body): Json, +) -> Result { + let preview_url = body.preview_url.trim(); + if preview_url.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + "preview_url must not be empty".to_string(), + )); + } + let parsed = reqwest::Url::parse(preview_url).map_err(|error| { + ( + StatusCode::BAD_REQUEST, + format!("preview_url must be a valid absolute URL: {error}"), + ) + })?; + if parsed.scheme() != "http" && parsed.scheme() != "https" { + return Err(( + StatusCode::BAD_REQUEST, + "preview_url must use http or https".to_string(), + )); + } + + let updated = state + .service + .db + .set_job_preview_url(id, preview_url) + .await + .map_err(internal_error)?; + if !updated { + return Err((StatusCode::NOT_FOUND, "job not found".to_string())); + } + state + .service + .db + .insert_job_event( + id, + "preview_url_set", + serde_json::json!({ "preview_url": preview_url }), + ) + .await + .map_err(internal_error)?; + Ok(StatusCode::NO_CONTENT) +} + async fn get_history( State(state): State, Query(query): Query, From d69f53e2a72c1f2a278c99351d505fb38fca474e Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 02:38:58 +0400 Subject: [PATCH 10/11] docs: refresh API and runtime flow documentation Document preview URL registration, pause/resume queue controls, and updated execution flow including job callback contract and runtime shell behavior. --- README.md | 12 ++++++++++++ docs/api.md | 14 +++++++++++++- docs/prompt-to-result-flow.md | 8 ++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6861920..3a072e3 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ curl http://localhost:8080/healthz - `OTTER_VIBE_MODEL` (optional, example `mistral-large-3`) - `OTTER_VIBE_PROVIDER` (optional, example `mistral`) - `OTTER_VIBE_EXTRA_ENV` (optional, comma-separated `KEY=VALUE` pairs forwarded to vibe process) +- `OTTER_API_BASE_URL` (default `http://otter-server:8080`, used in system prompt instructions so Vibe can post job preview URLs) - `OTTER_DEFAULT_WORKSPACE_PATH` (optional fallback workspace root when enqueue omits `workspace_id`) - `OTTER_MAX_ATTEMPTS` (default `5`) - `OTTER_WORKER_CONCURRENCY` (default `1`) @@ -102,6 +103,9 @@ These are forwarded as `VIBE_MODEL` / `MISTRAL_MODEL` and `VIBE_PROVIDER` for co - `GET /v1/jobs/{id}/events` - `GET /v1/events/stream` (includes `output_chunk` events) - `POST /v1/jobs/{id}/cancel` +- `POST /v1/jobs/{id}/pause` +- `POST /v1/jobs/{id}/resume` +- `POST /v1/jobs/{id}/preview-url` (set demo URL for browser preview, body: `{ "preview_url": "http://host:port" }`) - `GET /v1/queue` - `PATCH /v1/queue/{id}` (update queue position via priority) - `GET /v1/history` @@ -171,7 +175,15 @@ Otter composes a system prompt that enforces safe and repeatable project executi - Build and run the generated app inside Docker as the primary execution path. - Generate an executable `setup.sh` at project root. - Start the generated app/service in background. +- Use the provided Job ID and call Otter preview URL API when runtime URL is known: + - `POST /v1/jobs/{job_id}/preview-url` - Print clear run/stop instructions and project location. - Include app access information (URL and host port) in the final output. After a successful vibe execution, Otter attempts to run `setup.sh` and streams its output back as `output_chunk` events. + +## Queue Pause/Resume + +- Queued jobs can be paused without changing status from `queued`. +- Paused queued jobs are not claimable by workers until resumed. +- Resume re-enqueues the job message so it can run immediately when capacity is available. diff --git a/docs/api.md b/docs/api.md index ca76551..43777dc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -75,9 +75,20 @@ Base URL: `http://:8080` ## Jobs - `GET /v1/jobs/{id}` - - Returns job metadata, latest output payload if available, and `queue_rank` while queued. + - Returns job metadata (including `is_paused` and optional `preview_url`), latest output payload if available, and `queue_rank` while queued. - `POST /v1/jobs/{id}/cancel` - Cancels queued/running jobs. +- `POST /v1/jobs/{id}/pause` + - Pauses a queued job so it remains queued but not runnable by workers. +- `POST /v1/jobs/{id}/resume` + - Resumes a paused queued job and re-enqueues it for execution. +- `POST /v1/jobs/{id}/preview-url` + - Sets a demo URL for job browser preview. + - Body: + ```json + { "preview_url": "http://host:port" } + ``` + - URL must be absolute and use `http` or `https`. - `GET /v1/jobs/{id}/events` - Returns ordered lifecycle events. - `GET /v1/events/stream` @@ -104,3 +115,4 @@ Base URL: `http://:8080` - HTTP request tracing is enabled server-side (status + latency). - Job lifecycle logs are emitted by worker/service layers for enqueue/claim/retry/fail/complete states. +- Runtime shell commands auto-recover when workspace container is missing/stopped before falling back to host workspace shell execution. diff --git a/docs/prompt-to-result-flow.md b/docs/prompt-to-result-flow.md index 7ab1dfa..d068dda 100644 --- a/docs/prompt-to-result-flow.md +++ b/docs/prompt-to-result-flow.md @@ -12,6 +12,7 @@ 3. Otter inserts a `queued` job in PostgreSQL and records `accepted` + `queued` events. 4. Otter enqueues job id in Redis. 5. Worker dequeues job id from Redis, atomically claims that queued DB row, and marks it `running`. + - Paused queued jobs (`is_paused = true`) are skipped and requeued. 6. Worker executes: - `vibe --prompt "" --output json --workdir ` - with `VIBE_HOME=` @@ -20,7 +21,10 @@ - success -> `succeeded` + `job_outputs` row + `completed` event - failure with retries left -> back to `queued` + `retry_queued` event - terminal failure -> `failed` + `failed` event -9. Clients observe state via: +9. Optional preview URL callback: + - Vibe prompt includes the active `job_id` and API callback format. + - Agent can call `POST /v1/jobs/{id}/preview-url` to register runnable demo URL. +10. Clients observe state via: - `GET /v1/jobs/{id}` - `GET /v1/jobs/{id}/events` - `GET /v1/queue` @@ -38,7 +42,7 @@ - API -> Redis: - queue wake-up message containing job id - Worker -> Vibe: - - prompt text + - prompt text (with system requirements including `job_id` + preview URL callback contract) - execution directory (`--workdir`) - isolated trust context (`VIBE_HOME`) - Vibe -> Worker: From 059455f2e3e729bbb6bf550e2b63c5e105021e4d Mon Sep 17 00:00:00 2001 From: Erwin Lejeune Date: Mon, 2 Mar 2026 02:52:35 +0400 Subject: [PATCH 11/11] chore(runtime): refresh lockfile and satisfy clippy cleanups Update Cargo.lock to capture the current runtime dependency graph after Bollard upgrade work, and apply clippy/format-driven cleanups in runtime service and docker manager code paths. --- Cargo.lock | 205 +++++++++++++++++++++++ otter-core/src/runtime/docker_manager.rs | 23 +-- otter-core/src/service.rs | 5 +- 3 files changed, 217 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1a6b0e..4f7365e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -112,8 +113,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -169,6 +172,49 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "227aa051deec8d16bd9c34605e7aaf153f240e35483dd42f6f78903847934738" +dependencies = [ + "base64", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "http", + "http-body-util", + "hyper", + "hyper-named-pipe", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "serde", + "serde_derive", + "serde_json", + "serde_urlencoded", + "thiserror", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.52.1-rc.29.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0a8ca8799131c1837d1282c3f81f31e76ceb0ce426e04a7fe1ccee3287c066" +dependencies = [ + "serde", + "serde_json", + "serde_repr", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -342,6 +388,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -438,6 +490,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -739,6 +802,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.27.7" @@ -779,6 +857,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -1010,6 +1103,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.1" @@ -1189,12 +1288,16 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "bollard", + "bytes", "chrono", "dotenvy", + "futures-util", "redis", "serde", "serde_json", "sqlx", + "tar", "thiserror", "tokio", "tracing", @@ -1663,6 +1766,19 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.37" @@ -1776,6 +1892,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2153,6 +2280,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "thiserror" version = "2.0.18" @@ -2255,6 +2393,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2383,6 +2533,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -2446,6 +2613,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -2692,6 +2865,28 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.2" @@ -3076,6 +3271,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/otter-core/src/runtime/docker_manager.rs b/otter-core/src/runtime/docker_manager.rs index ceae79f..8aaae25 100644 --- a/otter-core/src/runtime/docker_manager.rs +++ b/otter-core/src/runtime/docker_manager.rs @@ -314,9 +314,9 @@ impl DockerRuntimeManager { .rm(true) .build(); - let mut stream = self - .docker - .build_image(options, None, Some(body_full(Bytes::from(context)))); + let mut stream = + self.docker + .build_image(options, None, Some(body_full(Bytes::from(context)))); while let Some(chunk) = stream .try_next() .await @@ -345,19 +345,15 @@ impl DockerRuntimeManager { .await { Ok(response) => Ok(Some(response)), - Err(bollard::errors::Error::DockerResponseServerError { status_code, .. }) - if status_code == 404 => - { - Ok(None) - } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, .. + }) => Ok(None), Err(error) => Err(error.into()), } } pub async fn prune_stale_containers(&self) -> Result<()> { - let list_opts = ListContainersOptionsBuilder::default() - .all(true) - .build(); + let list_opts = ListContainersOptionsBuilder::default().all(true).build(); let containers = self.docker.list_containers(Some(list_opts)).await?; for container in containers { let Some(names) = container.names else { @@ -375,10 +371,7 @@ impl DockerRuntimeManager { .force(true) .v(true) .build(); - let _ = self - .docker - .remove_container(&id, Some(remove_opts)) - .await; + let _ = self.docker.remove_container(&id, Some(remove_opts)).await; } } Ok(()) diff --git a/otter-core/src/service.rs b/otter-core/src/service.rs index 23a6e7a..4739593 100644 --- a/otter-core/src/service.rs +++ b/otter-core/src/service.rs @@ -842,7 +842,10 @@ impl OtterService { .await? .ok_or_else(|| anyhow!("workspace not found: {workspace_id}"))?; runtime - .ensure_workspace_container(workspace_id, Path::new(&workspace.root_path)) + .ensure_workspace_container( + workspace_id, + Path::new(&workspace.root_path), + ) .await?; // New container starts from default cwd; clear any stale cwd from prior session. let mut sessions = self.runtime_shell_cwds.lock().await;