diff --git a/coast-daemon/src/handlers/run/provision.rs b/coast-daemon/src/handlers/run/provision.rs index dc2cc00..fb0e8d0 100644 --- a/coast-daemon/src/handlers/run/provision.rs +++ b/coast-daemon/src/handlers/run/provision.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tracing::{debug, info, warn}; @@ -19,13 +19,20 @@ use super::{ secrets, shared_services_setup, }; +const MAX_CONTAINER_PORT_RETRY_ATTEMPTS: usize = 3; + pub(super) struct ProvisionResult { pub container_id: String, pub pre_allocated_ports: Vec<(String, u16, u16)>, } +type PreAllocatedPort = (String, u16, u16); +type ExternalWorktreeDir = (usize, std::path::PathBuf); +type DindContainerManager = + coast_docker::container::ContainerManager; + struct CoastfileResources { - pre_allocated_ports: Vec<(String, u16, u16)>, + pre_allocated_ports: Vec, volume_mounts: Vec, mcp_servers: Vec, mcp_clients: Vec, @@ -56,14 +63,14 @@ pub(super) async fn provision_instance( let secret_files_for_exec = secret_plan.files_for_exec.clone(); let has_volume_mounts = !resources.volume_mounts.is_empty(); - let container_id = create_container( + let (container_id, pre_allocated_ports) = create_container( docker, validated, req, &code_path, &resources, secret_plan, - &resources.pre_allocated_ports, + resources.pre_allocated_ports.clone(), progress, ) .await?; @@ -99,7 +106,7 @@ pub(super) async fn provision_instance( Ok(ProvisionResult { container_id, - pre_allocated_ports: resources.pre_allocated_ports, + pre_allocated_ports, }) } @@ -119,6 +126,25 @@ struct InstanceConfig<'a> { progress: &'a tokio::sync::mpsc::Sender, } +struct ContainerCreateContext<'a> { + req: &'a RunRequest, + code_path: &'a std::path::Path, + resources: &'a CoastfileResources, + secret_plan: &'a secrets::SecretInjectionPlan, + image_cache_path: Option<&'a std::path::Path>, + artifact_dir: Option<&'a std::path::Path>, + coast_image: Option<&'a str>, + override_dir: Option<&'a std::path::Path>, + dind_extra_hosts: &'a [String], + shared_caddy_pki_host_dir: &'a std::path::Path, + external_worktree_dirs: &'a [ExternalWorktreeDir], +} + +enum ContainerCreateAttempt { + Ready(String), + Retry(CoastError), +} + async fn setup_shared_services(ctx: &InstanceConfig<'_>) -> Result<()> { info!(instance = %ctx.req.name, "provision: setting up shared services"); let shared_service_routing = if ctx.resources.shared_services.is_empty() { @@ -335,12 +361,12 @@ async fn load_coastfile_resources( "loaded artifact Coastfile for run resources" ); - for (port_name, port_num) in &coastfile.ports { - let dynamic_port = crate::port_manager::allocate_dynamic_port()?; - result - .pre_allocated_ports - .push((port_name.clone(), *port_num, dynamic_port)); - } + let logical_ports = coastfile + .ports + .iter() + .map(|(port_name, port_num)| (port_name.clone(), *port_num)) + .collect::>(); + result.pre_allocated_ports = allocate_pre_allocated_ports(&logical_ports)?; for vol_config in &coastfile.volumes { let resolved_name = @@ -436,9 +462,9 @@ async fn create_container( code_path: &std::path::Path, resources: &CoastfileResources, secret_plan: secrets::SecretInjectionPlan, - pre_allocated_ports: &[(String, u16, u16)], + initial_pre_allocated_ports: Vec, progress: &tokio::sync::mpsc::Sender, -) -> Result { +) -> Result<(String, Vec)> { let image_cache_dir = paths::image_cache_dir(); let image_cache_path = if image_cache_dir.exists() { Some(image_cache_dir.as_path()) @@ -472,35 +498,108 @@ async fn create_container( let dind_extra_hosts = vec!["host.docker.internal:host-gateway".to_string()]; - let mut env_vars = secret_plan.env_vars; - merge_dynamic_port_env_vars(&mut env_vars, pre_allocated_ports); + let creating_step = if validated.has_compose { 3 } else { 2 }; + emit( + progress, + BuildProgressEvent::started("Creating container", creating_step, validated.total_steps), + ); - let mut config = coast_docker::dind::build_dind_config(coast_docker::dind::DindConfigParams { - env_vars, - bind_mounts: secret_plan.bind_mounts, - volume_mounts: resources.volume_mounts.clone(), + let runtime = coast_docker::dind::DindRuntime::with_client(docker.clone()); + let manager = coast_docker::container::ContainerManager::new(runtime); + let mut pre_allocated_ports = initial_pre_allocated_ports; + let external_worktree_dirs = if let Ok(cf) = + coast_core::coastfile::Coastfile::from_file(&artifact_dir_path.join("coastfile.toml")) + { + cf.external_worktree_dirs() + } else { + Vec::new() + }; + let create_ctx = ContainerCreateContext { + req, + code_path, + resources, + secret_plan: &secret_plan, image_cache_path, artifact_dir: artifact_dir_opt, coast_image: coast_image.as_deref(), override_dir: override_dir_opt, - extra_hosts: dind_extra_hosts, - ..coast_docker::dind::DindConfigParams::new(&req.project, &req.name, code_path) - }); - append_shared_caddy_pki_bind_mount(&mut config, &shared_caddy_pki_host_dir); + dind_extra_hosts: &dind_extra_hosts, + shared_caddy_pki_host_dir: &shared_caddy_pki_host_dir, + external_worktree_dirs: &external_worktree_dirs, + }; - if let Ok(cf) = - coast_core::coastfile::Coastfile::from_file(&artifact_dir_path.join("coastfile.toml")) - { - for (idx, resolved) in cf.external_worktree_dirs() { - config.bind_mounts.push(coast_docker::runtime::BindMount { - host_path: resolved, - container_path: coast_core::coastfile::Coastfile::external_mount_path(idx), - read_only: false, - propagation: None, - }); + let mut last_error: Option = None; + + for attempt in 1..=MAX_CONTAINER_PORT_RETRY_ATTEMPTS { + let container_id = match create_and_start_container_attempt( + &manager, + &create_ctx, + &pre_allocated_ports, + attempt, + ) + .await? + { + ContainerCreateAttempt::Ready(container_id) => container_id, + ContainerCreateAttempt::Retry(error) => { + pre_allocated_ports = reallocate_pre_allocated_ports(&pre_allocated_ports)?; + last_error = Some(error); + continue; + } + }; + + match manager.wait_for_inner_daemon(&container_id).await { + Ok(()) => { + emit( + progress, + BuildProgressEvent::done("Creating container", "ok"), + ); + return Ok((container_id, pre_allocated_ports)); + } + Err(error) => { + return Err(wrap_container_create_error(req, &error)); + } } } + let error = last_error.unwrap_or_else(|| { + CoastError::docker(format!( + "Failed to create coast container for instance '{}' after retrying dynamic port allocation.", + req.name + )) + }); + + Err(wrap_container_create_error(req, &error)) +} + +fn build_container_config( + ctx: &ContainerCreateContext<'_>, + pre_allocated_ports: &[PreAllocatedPort], +) -> coast_docker::runtime::ContainerConfig { + let mut env_vars = ctx.secret_plan.env_vars.clone(); + merge_dynamic_port_env_vars(&mut env_vars, pre_allocated_ports); + + let mut config = coast_docker::dind::build_dind_config(coast_docker::dind::DindConfigParams { + env_vars, + bind_mounts: ctx.secret_plan.bind_mounts.clone(), + volume_mounts: ctx.resources.volume_mounts.clone(), + image_cache_path: ctx.image_cache_path, + artifact_dir: ctx.artifact_dir, + coast_image: ctx.coast_image, + override_dir: ctx.override_dir, + extra_hosts: ctx.dind_extra_hosts.to_vec(), + ..coast_docker::dind::DindConfigParams::new(&ctx.req.project, &ctx.req.name, ctx.code_path) + }); + append_shared_caddy_pki_bind_mount(&mut config, ctx.shared_caddy_pki_host_dir); + + for (idx, resolved) in ctx.external_worktree_dirs { + config.bind_mounts.push(coast_docker::runtime::BindMount { + host_path: resolved.clone(), + container_path: coast_core::coastfile::Coastfile::external_mount_path(*idx), + read_only: false, + propagation: None, + }); + } + for (_name, canonical, dynamic) in pre_allocated_ports { config .published_ports @@ -510,27 +609,121 @@ async fn create_container( }); } - let creating_step = if validated.has_compose { 3 } else { 2 }; - emit( - progress, - BuildProgressEvent::started("Creating container", creating_step, validated.total_steps), - ); + config +} - let runtime = coast_docker::dind::DindRuntime::with_client(docker.clone()); - let manager = coast_docker::container::ContainerManager::new(runtime); - let container_id = manager.create_and_start(&config).await.map_err(|e| { - CoastError::docker(format!( - "Failed to create coast container for instance '{}': {}. \ - Ensure Docker is running and the docker:dind image is available.", - req.name, e - )) - })?; +async fn create_and_start_container_attempt( + manager: &DindContainerManager, + ctx: &ContainerCreateContext<'_>, + pre_allocated_ports: &[PreAllocatedPort], + attempt: usize, +) -> Result { + let config = build_container_config(ctx, pre_allocated_ports); + let container_id = match manager.create(&config).await { + Ok(container_id) => container_id, + Err(error) if should_retry_container_create_error(&error, attempt) => { + warn!( + instance = %ctx.req.name, + attempt, + error = %error, + "docker rejected published ports during container creation; reallocating and retrying" + ); + return Ok(ContainerCreateAttempt::Retry(error)); + } + Err(error) => { + return Err(wrap_container_create_error(ctx.req, &error)); + } + }; - emit( - progress, - BuildProgressEvent::done("Creating container", "ok"), - ); - Ok(container_id) + match manager.start(&container_id).await { + Ok(()) => Ok(ContainerCreateAttempt::Ready(container_id)), + Err(error) => { + cleanup_failed_container_start(manager, ctx.req, &container_id).await; + if should_retry_container_create_error(&error, attempt) { + warn!( + instance = %ctx.req.name, + attempt, + error = %error, + "docker rejected published ports during container startup; reallocating and retrying" + ); + return Ok(ContainerCreateAttempt::Retry(error)); + } + + Err(wrap_container_create_error(ctx.req, &error)) + } + } +} + +fn should_retry_container_create_error(error: &CoastError, attempt: usize) -> bool { + attempt < MAX_CONTAINER_PORT_RETRY_ATTEMPTS && is_retryable_port_publish_error(error) +} + +async fn cleanup_failed_container_start( + manager: &DindContainerManager, + req: &RunRequest, + container_id: &str, +) { + if let Err(cleanup_error) = manager.remove(container_id).await { + warn!( + instance = %req.name, + container_id, + error = %cleanup_error, + "failed to remove partially created container after startup failure" + ); + } +} + +fn allocate_pre_allocated_ports(logical_ports: &[(String, u16)]) -> Result> { + allocate_pre_allocated_ports_excluding(logical_ports, &HashSet::new()) +} + +fn allocate_pre_allocated_ports_excluding( + logical_ports: &[(String, u16)], + excluded_dynamic_ports: &HashSet, +) -> Result> { + let mut used_dynamic_ports = excluded_dynamic_ports.clone(); + let mut pre_allocated_ports = Vec::with_capacity(logical_ports.len()); + + for (logical_name, canonical_port) in logical_ports { + let dynamic_port = + crate::port_manager::allocate_dynamic_port_excluding(&used_dynamic_ports)?; + used_dynamic_ports.insert(dynamic_port); + pre_allocated_ports.push((logical_name.clone(), *canonical_port, dynamic_port)); + } + + Ok(pre_allocated_ports) +} + +fn reallocate_pre_allocated_ports( + pre_allocated_ports: &[PreAllocatedPort], +) -> Result> { + let logical_ports = pre_allocated_ports + .iter() + .map(|(logical_name, canonical_port, _dynamic_port)| { + (logical_name.clone(), *canonical_port) + }) + .collect::>(); + let excluded_dynamic_ports = pre_allocated_ports + .iter() + .map(|(_, _, dynamic_port)| *dynamic_port) + .collect::>(); + allocate_pre_allocated_ports_excluding(&logical_ports, &excluded_dynamic_ports) +} + +fn is_retryable_port_publish_error(error: &CoastError) -> bool { + let message = error.to_string(); + message.contains("ports are not available") + || message.contains("/forwards/expose") + || message.contains("port is already allocated") + || message.contains("bind: address already in use") +} + +fn wrap_container_create_error(req: &RunRequest, error: &CoastError) -> CoastError { + CoastError::docker(format!( + "Failed to create coast container for instance '{}': {}. \ + Ensure Docker is running and the docker:dind image is available.", + req.name, error + )) } fn append_shared_caddy_pki_bind_mount( @@ -941,6 +1134,69 @@ mod tests { assert!(script.contains("chmod 600")); } + #[test] + fn test_allocate_pre_allocated_ports_uses_unique_dynamic_ports() { + let mappings = allocate_pre_allocated_ports(&[ + ("web".to_string(), 3000), + ("postgres".to_string(), 5432), + ("redis".to_string(), 6379), + ("mailpit".to_string(), 8025), + ]) + .unwrap(); + + assert_eq!(mappings.len(), 4); + + let unique_dynamic_ports = mappings + .iter() + .map(|(_, _, dynamic)| *dynamic) + .collect::>(); + + assert_eq!(unique_dynamic_ports.len(), mappings.len()); + } + + #[test] + fn test_reallocate_pre_allocated_ports_excludes_previous_attempt_ports() { + let initial = vec![ + ( + "web".to_string(), + 3000, + crate::port_manager::allocate_dynamic_port().unwrap(), + ), + ( + "postgres".to_string(), + 5432, + crate::port_manager::allocate_dynamic_port().unwrap(), + ), + ]; + + let remapped = reallocate_pre_allocated_ports(&initial).unwrap(); + let initial_ports = initial + .iter() + .map(|(_, _, dynamic_port)| *dynamic_port) + .collect::>(); + + assert!(remapped + .iter() + .all(|(_, _, dynamic_port)| !initial_ports.contains(dynamic_port))); + } + + #[test] + fn test_retryable_port_publish_error_matches_wsl_forwarding_failure() { + let error = CoastError::docker( + "Failed to start coast container 'abc'. Is Docker running? Error: Docker responded \ + with status code 500: ports are not available: exposing port TCP \ + 127.0.0.1:53987 -> 127.0.0.1:0: /forwards/expose returned unexpected status: 500", + ); + + assert!(is_retryable_port_publish_error(&error)); + } + + #[test] + fn test_retryable_port_publish_error_ignores_unrelated_docker_failures() { + let error = CoastError::docker("Failed to pull image 'docker:dind': unauthorized"); + assert!(!is_retryable_port_publish_error(&error)); + } + #[tokio::test] async fn test_load_coastfile_resources_reads_ports_and_volume_mounts() { let dir = tempfile::tempdir().unwrap(); diff --git a/coast-daemon/src/port_manager.rs b/coast-daemon/src/port_manager.rs index 2569cc4..df7e1ea 100644 --- a/coast-daemon/src/port_manager.rs +++ b/coast-daemon/src/port_manager.rs @@ -8,7 +8,7 @@ /// Port forwarding is the mechanism behind `coast checkout` (instant port swap) /// and always-on dynamic ports. The daemon spawns socat processes that forward /// traffic from host ports to coast container ports. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::Hasher; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::os::unix::process::CommandExt; @@ -262,15 +262,33 @@ impl Default for PortForwarder { /// Returns `CoastError::Port` if no free port can be found after /// `MAX_ALLOCATION_ATTEMPTS` attempts. pub fn allocate_dynamic_port() -> Result { + allocate_dynamic_port_excluding(&HashSet::new()) +} + +/// Allocate a dynamic port while excluding a known set of unusable ports. +/// +/// This is used by provisioning retries so we don't immediately hand back a +/// host port that Docker has already rejected. +pub fn allocate_dynamic_port_excluding(excluded_ports: &HashSet) -> Result { // Start from a pseudo-random offset to reduce collisions when multiple // allocations happen in quick succession. let range_size = (PORT_RANGE_END - PORT_RANGE_START + 1) as u32; let start_offset = (std::process::id() ^ (timestamp_nanos() as u32)) % range_size; - for i in 0..MAX_ALLOCATION_ATTEMPTS { + let mut inspected_candidates = 0u32; + for i in 0..range_size { let offset = (start_offset + i) % range_size; let port = PORT_RANGE_START + offset as u16; + if excluded_ports.contains(&port) { + continue; + } + + inspected_candidates += 1; + if inspected_candidates > MAX_ALLOCATION_ATTEMPTS { + break; + } + if is_port_available(port) { debug!(port = port, "Allocated dynamic port"); return Ok(port); @@ -1006,6 +1024,19 @@ mod tests { ); } + #[test] + fn test_allocate_dynamic_port_excluding_skips_previously_rejected_port() { + let first_port = allocate_dynamic_port().unwrap(); + let excluded_ports = HashSet::from([first_port]); + + let second_port = allocate_dynamic_port_excluding(&excluded_ports).unwrap(); + + assert_ne!( + first_port, second_port, + "allocator should not return an excluded port" + ); + } + #[test] fn test_is_port_available_on_free_port() { // Binding to port 0 asks the OS to assign any free port. This always