diff --git a/Containerfile b/Containerfile index 798d77ea..d813c543 100644 --- a/Containerfile +++ b/Containerfile @@ -15,7 +15,7 @@ RUN cargo install cargo-nextest cargo-audit cargo-deny --locked RUN apt-get update && apt-get install -y \ fuse3 libfuse3-dev autoconf automake libtool perl libclang-dev clang cmake \ musl-tools iproute2 iptables passt dnsmasq qemu-utils e2fsprogs btrfs-progs \ - parted fdisk podman skopeo git curl sudo procps zstd busybox-static cpio uidmap \ + parted fdisk podman skopeo git curl sudo procps zstd busybox-static cpio uidmap iputils-ping \ flex bison bc libelf-dev libssl-dev libseccomp-dev \ && rm -rf /var/lib/apt/lists/* diff --git a/Containerfile.nested b/Containerfile.nested index 79213d99..4f5084d0 100644 --- a/Containerfile.nested +++ b/Containerfile.nested @@ -17,7 +17,7 @@ FROM ubuntu:24.04 RUN apt-get update && apt-get install -y --no-install-recommends \ iproute2 iptables podman python3 kmod procps fuse3 curl nginx \ - fuse-overlayfs sudo iperf3 rsync btrfs-progs \ + fuse-overlayfs sudo iperf3 rsync btrfs-progs iputils-ping \ && rm -rf /var/lib/apt/lists/* # Configure podman to use fuse-overlayfs (required for nested containers) diff --git a/benches/exec.rs b/benches/exec.rs index 549751a4..79ab4d4b 100644 --- a/benches/exec.rs +++ b/benches/exec.rs @@ -501,20 +501,18 @@ impl CloneFixture { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); let stdout = String::from_utf8_lossy(&output.stdout); - // Dump serve log for diagnostics - let serve_log = "/tmp/fcvm-bench-serve-clone-exec.log"; - if let Ok(logs) = std::fs::read_to_string(serve_log) { - let tail: Vec<&str> = logs.lines().rev().take(30).collect(); - eprintln!("=== Last 30 lines of serve log ==="); - for line in tail.into_iter().rev() { - eprintln!("{}", line); - } - } + let serve_log_path = "/tmp/fcvm-bench-serve-clone-exec.log"; + let serve_log_content = std::fs::read_to_string(serve_log_path).unwrap_or_default(); panic!( - "clone exec failed after {:.1}s:\nstderr: {}\nstdout: {}", + "clone exec failed after {:.1}s:\n\ + stderr: {}\n\ + stdout: {}\n\ + \n=== full serve log ({}) ===\n{}", elapsed.as_secs_f64(), stderr, - stdout + stdout, + serve_log_path, + serve_log_content, ); } @@ -637,16 +635,69 @@ impl CloneFixture { .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); - // Last 30 lines of clone log (full, not filtered) - let log_tail: String = clone_log - .lines() - .rev() - .take(30) - .collect::>() - .into_iter() - .rev() - .collect::>() - .join("\n"); + // Get holder PID for namespace diagnostics + let holder_diag = Command::new(&fcvm) + .args(["ls", "--json", "--pid", &clone_pid.to_string()]) + .output() + .ok() + .and_then(|o| { + let stdout = String::from_utf8_lossy(&o.stdout); + serde_json::from_str::>(&stdout).ok() + }) + .and_then(|vms| { + vms.first() + .and_then(|v| v["holder_pid"].as_u64()) + .map(|hp| { + let hp_str = hp.to_string(); + let mut diag = String::new(); + + // ARP cache in namespace + if let Ok(o) = Command::new("nsenter") + .args(["-t", &hp_str, "-n", "ip", "neigh", "show"]) + .output() + { + diag.push_str(&format!( + "\n=== ARP cache (ns {}) ===\n{}", + hp, + String::from_utf8_lossy(&o.stdout) + )); + } + + // Namespace sockets + if let Ok(o) = Command::new("nsenter") + .args(["-t", &hp_str, "-n", "ss", "-tnp"]) + .output() + { + diag.push_str(&format!( + "\n=== namespace sockets (ns {}) ===\n{}", + hp, + String::from_utf8_lossy(&o.stdout) + )); + } + + // Bridge links + if let Ok(o) = Command::new("nsenter") + .args(["-t", &hp_str, "-n", "bridge", "link"]) + .output() + { + diag.push_str(&format!( + "\n=== bridge links (ns {}) ===\n{}", + hp, + String::from_utf8_lossy(&o.stdout) + )); + } + + diag + }) + }) + .unwrap_or_default(); + + // VM listening sockets + let vm_ss = Command::new(&fcvm) + .args(["exec", "--pid", &clone_pid.to_string(), "--", "ss", "-tnl"]) + .output() + .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) + .unwrap_or_else(|e| format!("exec ss failed: {}", e)); panic!( "clone HTTP failed after 10 attempts\n\ @@ -657,7 +708,9 @@ impl CloneFixture { \n=== listening sockets on {} ===\n{}\ \n=== pasta processes ===\n{}\ \n=== stale process counts ===\n{}\ - \n=== clone log (last 30 lines) ===\n{}", + {}\ + \n=== VM listening sockets ===\n{}\ + \n=== full clone log ({}) ===\n{}", loopback_ip, health_port, last_response.len(), @@ -667,7 +720,10 @@ impl CloneFixture { ss_check, pasta_check, stale_check, - log_tail, + holder_diag, + vm_ss, + clone_log_path, + clone_log, ); } diff --git a/src/commands/snapshot.rs b/src/commands/snapshot.rs index 3b7756cf..d12cb817 100644 --- a/src/commands/snapshot.rs +++ b/src/commands/snapshot.rs @@ -724,7 +724,8 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { // With bridge mode, guest IP is always 10.0.2.100 on pasta network // Each clone runs in its own namespace, so no IP conflict let net = PastaNetwork::new(vm_id.clone(), tap_device.clone(), port_mappings.clone()) - .with_loopback_ip(loopback_ip); + .with_loopback_ip(loopback_ip) + .with_restore_mode(); Box::new(net) } }; @@ -1081,13 +1082,14 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { } } - // Verify pasta's L2 forwarding path has ARP resolved before starting health monitor. + // Verify pasta's L2 forwarding path is ready before starting health monitor. // After snapshot restore, pasta may not have learned the guest's MAC yet. - // This probes each forwarded port to trigger and verify ARP resolution — - // no guest service needs to be running, just the guest's kernel. - if let Err(e) = network.verify_port_forwarding().await { - warn!(vm_id = %vm_id, error = %e, "port forwarding verification failed"); - } + // This pings the guest to trigger ARP resolution, then probes each forwarded + // port to confirm end-to-end forwarding works. + network + .verify_port_forwarding() + .await + .context("port forwarding verification failed after snapshot restore")?; // Spawn health monitor task with startup snapshot trigger support let health_monitor_handle = crate::health::spawn_health_monitor_full( diff --git a/src/network/pasta.rs b/src/network/pasta.rs index b246e141..a3ed3fdb 100644 --- a/src/network/pasta.rs +++ b/src/network/pasta.rs @@ -75,6 +75,7 @@ pub struct PastaNetwork { pid_file: Option, loopback_ip: Option, // Unique loopback IP for port forwarding (127.x.y.z) holder_pid: Option, // Namespace PID (set in post_start) + restore_mode: bool, // Skip port probe in post_start (VM not loaded yet) } impl PastaNetwork { @@ -90,6 +91,7 @@ impl PastaNetwork { pid_file: None, loopback_ip: None, holder_pid: None, + restore_mode: false, } } @@ -106,6 +108,19 @@ impl PastaNetwork { self } + /// Skip port forwarding probe in post_start() for snapshot restore. + /// + /// During snapshot restore, post_start() runs BEFORE the VM snapshot is loaded + /// into Firecracker. Probing ports at that point forces pasta to attempt L2 + /// forwarding to a non-existent guest, which can poison pasta's internal + /// connection tracking and cause subsequent connections to return 0 bytes. + /// The proper verification happens later via verify_port_forwarding() after + /// the VM is resumed and fc-agent has sent its gratuitous ARP. + pub fn with_restore_mode(mut self) -> Self { + self.restore_mode = true; + self + } + /// Get the loopback IP assigned to this VM for port forwarding pub fn loopback_ip(&self) -> Option<&str> { self.loopback_ip.as_deref() @@ -604,7 +619,13 @@ impl NetworkManager for PastaNetwork { // The PID file only means pasta spawned, not that ports are bound. // Health checks use nsenter (bridge path), so without this check // "healthy" doesn't mean port forwarding works. - if !self.port_mappings.is_empty() { + // + // Skip in restore mode: during snapshot restore, post_start() runs BEFORE + // the VM snapshot is loaded. Probing ports now forces pasta to attempt L2 + // forwarding to a non-existent guest, poisoning its connection state and + // causing subsequent connections to return 0 bytes. The port check happens + // later via verify_port_forwarding() after the VM is actually running. + if !self.restore_mode && !self.port_mappings.is_empty() { self.wait_for_port_forwarding().await?; } @@ -641,14 +662,14 @@ impl NetworkManager for PastaNetwork { /// Verify pasta's L2 forwarding path is ready after snapshot restore. /// /// After snapshot restore, pasta needs the guest's MAC address to forward - /// L2 frames. fc-agent sends a gratuitous ARP (ping to gateway) during - /// restore, which broadcasts the guest's MAC to all bridge ports including - /// pasta0. We verify this by checking the namespace's ARP table — if the - /// namespace kernel learned the guest's MAC, pasta received the same - /// broadcast frame. + /// L2 frames. We actively ping the guest from the namespace to trigger a + /// normal ARP exchange. With arp_accept=0 (Linux default), the guest's + /// gratuitous arping does NOT create neighbor entries — only updates + /// existing ones. The active ping forces the namespace kernel to send an + /// ARP request that the guest replies to, creating a REACHABLE entry. /// - /// This runs after fc-agent's output vsock reconnects, so the gratuitous - /// ARP has already been sent. Typically resolves on the first check. + /// Once ARP is resolved, we probe each forwarded port to confirm pasta's + /// loopback port forwarding is end-to-end functional. async fn verify_port_forwarding(&self) -> Result<()> { if self.port_mappings.is_empty() { return Ok(()); @@ -662,38 +683,40 @@ impl NetworkManager for PastaNetwork { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); let nsenter_prefix = self.build_nsenter_prefix(holder_pid); + // Ping the guest from inside the namespace to trigger ARP resolution. + // A successful ping proves ARP resolved AND the guest is reachable. + // Use 200ms timeout for ~16 retries within the 5s deadline. loop { let output = Command::new(&nsenter_prefix[0]) .args(&nsenter_prefix[1..]) - .args(["ip", "neigh", "show", GUEST_IP, "dev", BRIDGE_DEVICE]) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) + .args(["ping", "-c", "1", "-W", "0.2", GUEST_IP]) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) .output() .await - .context("checking ARP table in namespace")?; - - let stdout = String::from_utf8_lossy(&output.stdout); - // Entry looks like: "10.0.2.100 lladdr aa:bb:cc:dd:ee:ff REACHABLE" - // If lladdr is present, the guest's MAC is known. - if stdout.contains("lladdr") { - info!(guest_ip = GUEST_IP, arp = %stdout.trim(), "ARP resolved"); - // ARP is resolved but pasta's loopback port forwarding may not be - // ready yet. Probe each mapped port on the loopback IP to confirm - // end-to-end forwarding works before declaring ready. + .context("running ping via nsenter in namespace")?; + + if output.status.success() { + info!( + guest_ip = GUEST_IP, + "guest reachable via ping, ARP resolved" + ); self.wait_for_port_forwarding().await?; return Ok(()); } if std::time::Instant::now() > deadline { + let stderr = String::from_utf8_lossy(&output.stderr); + let stderr = stderr.trim(); anyhow::bail!( - "ARP for guest {} not resolved within 5s on {}", + "ARP for guest {} not resolved within 5s on {}: ping stderr: {}", GUEST_IP, - BRIDGE_DEVICE + BRIDGE_DEVICE, + if stderr.is_empty() { "(empty)" } else { stderr } ); } - debug!(guest_ip = GUEST_IP, "ARP not yet resolved, waiting"); - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + debug!(guest_ip = GUEST_IP, "ping to guest failed, retrying"); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index f1bc76ec..da6ed882 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1526,6 +1526,188 @@ pub fn find_available_high_port() -> anyhow::Result { find_available_port(10000 + offset, 50000 - offset) } +/// Get the loopback IP for a VM by PID (from `fcvm ls --json`). +pub async fn get_loopback_ip(pid: u32) -> anyhow::Result { + let fcvm_path = find_fcvm_binary()?; + let output = tokio::process::Command::new(&fcvm_path) + .args(["ls", "--json", "--pid", &pid.to_string()]) + .output() + .await + .context("getting VM state for loopback IP")?; + + let stdout = String::from_utf8_lossy(&output.stdout); + serde_json::from_str::>(&stdout) + .ok() + .and_then(|v| v.first().cloned()) + .and_then(|v| { + v.get("config")? + .get("network")? + .get("loopback_ip")? + .as_str() + .map(|s| s.to_string()) + }) + .ok_or_else(|| anyhow::anyhow!("loopback_ip not found for VM PID {}", pid)) +} + +/// Result of a single HTTP check via curl. +pub struct CurlResult { + pub success: bool, + pub body_len: usize, + pub error: String, +} + +/// Make a single HTTP request via curl and return the result. +pub async fn curl_check(ip: &str, port: u16, timeout_secs: u32) -> CurlResult { + let url = format!("http://{}:{}", ip, port); + match tokio::process::Command::new("curl") + .args(["-sS", "--max-time", &timeout_secs.to_string(), &url]) + .output() + .await + { + Ok(output) => CurlResult { + success: output.status.success(), + body_len: output.stdout.len(), + error: if output.status.success() { + String::new() + } else { + String::from_utf8_lossy(&output.stderr).to_string() + }, + }, + Err(e) => CurlResult { + success: false, + body_len: 0, + error: format!("curl failed: {}", e), + }, + } +} + +/// Make HTTP requests via curl with retries until success or deadline. +/// +/// After snapshot restore, the network channel (L2 + pasta) is ready but the +/// guest application may need a moment. This retries with 500ms backoff +/// and prints extensive diagnostics on failure. +pub async fn curl_check_retry( + ip: &str, + port: u16, + timeout_secs: u32, + clone_pid: Option, +) -> CurlResult { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs as u64); + let mut attempt = 0u32; + let mut last_result = CurlResult { + success: false, + body_len: 0, + error: "no attempt made".to_string(), + }; + + while std::time::Instant::now() < deadline { + attempt += 1; + last_result = curl_check(ip, port, 1).await; + if last_result.success && last_result.body_len > 0 { + if attempt > 1 { + println!( + " curl_check_retry: succeeded on attempt {} after retries", + attempt + ); + } + return last_result; + } + println!( + " curl_check_retry attempt {}: {}:{} -> success={} body={} err={}", + attempt, ip, port, last_result.success, last_result.body_len, last_result.error + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + // All retries exhausted — dump diagnostics + println!( + "\n curl_check_retry FAILED after {} attempts to {}:{}", + attempt, ip, port + ); + println!(" last error: {}", last_result.error); + + // Verbose curl to see exactly what happens at TCP level + let url = format!("http://{}:{}", ip, port); + if let Ok(verbose) = tokio::process::Command::new("curl") + .args(["-v", "--max-time", "2", &url]) + .output() + .await + { + let stderr = String::from_utf8_lossy(&verbose.stderr); + println!(" curl -v output:"); + for line in stderr.lines() { + println!(" {}", line); + } + } + + if let Some(pid) = clone_pid { + dump_clone_network_diagnostics(pid).await; + } + + last_result +} + +/// Dump network diagnostics for a clone VM: pasta processes, ARP cache, +/// namespace sockets, bridge state, and VM listening sockets. +pub async fn dump_clone_network_diagnostics(pid: u32) { + println!(" --- Network diagnostics for clone PID {} ---", pid); + + // Check if pasta is still running + run_cmd_diagnostic("pasta processes", "pgrep", &["-a", "pasta"]).await; + + // Get holder PID from state for nsenter + let fcvm_path = find_fcvm_binary().unwrap_or_default(); + let holder_pid = get_holder_pid(&fcvm_path, pid).await; + + if let Some(hp) = holder_pid { + let hp_str = hp.to_string(); + run_nsenter_diagnostic(&hp_str, &["ip", "neigh", "show"], "ARP cache").await; + run_nsenter_diagnostic(&hp_str, &["ss", "-tnp"], "namespace sockets").await; + run_nsenter_diagnostic(&hp_str, &["bridge", "link"], "bridge links").await; + } + + // Check what the VM sees (listening sockets) + let pid_str = pid.to_string(); + run_cmd_diagnostic( + "VM listening sockets", + &fcvm_path.to_string_lossy(), + &["exec", "--pid", &pid_str, "--", "ss", "-tnl"], + ) + .await; +} + +async fn get_holder_pid(fcvm_path: &std::path::PathBuf, pid: u32) -> Option { + let out = tokio::process::Command::new(fcvm_path) + .args(["ls", "--json"]) + .output() + .await + .ok()?; + let json_str = String::from_utf8_lossy(&out.stdout); + let vms: Vec = serde_json::from_str(&json_str).ok()?; + let vm = vms.iter().find(|v| v["pid"].as_u64() == Some(pid as u64))?; + vm["holder_pid"].as_u64() +} + +async fn run_nsenter_diagnostic(holder_pid: &str, cmd: &[&str], label: &str) { + let mut args = vec!["-t", holder_pid, "--net"]; + args.extend(cmd); + run_cmd_diagnostic(label, "nsenter", &args).await; +} + +async fn run_cmd_diagnostic(label: &str, program: &str, args: &[&str]) { + if let Ok(out) = tokio::process::Command::new(program) + .args(args) + .output() + .await + { + let stdout = String::from_utf8_lossy(&out.stdout); + println!(" {}:", label); + for line in stdout.lines() { + println!(" {}", line); + } + } +} + /// Wait for a TCP server to be ready by attempting to connect. /// /// # Arguments diff --git a/tests/test_clone_port_forward_stress.rs b/tests/test_clone_port_forward_stress.rs new file mode 100644 index 00000000..383bb4a5 --- /dev/null +++ b/tests/test_clone_port_forward_stress.rs @@ -0,0 +1,405 @@ +//! Stress test for clone port forwarding. +//! +//! This test exercises the pasta port forwarding path under stress: +//! - Multiple clones spawned from the same snapshot +//! - Concurrent rapid HTTP requests to all clones simultaneously +//! - Checks for 0-byte responses (pasta connection tracking poisoning) +//! +//! Background: During snapshot restore, `post_start()` calls +//! `wait_for_port_forwarding()` BEFORE the VM snapshot is loaded. +//! pasta accepts the TCP connection (it's listening on loopback), +//! but can't forward to the non-existent guest. This may poison +//! pasta's internal forwarding state, causing subsequent connections +//! to return 0 bytes instead of actual data. + +#![cfg(feature = "integration-slow")] + +mod common; + +use anyhow::{Context, Result}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +/// Number of clones to spawn +const NUM_CLONES: usize = 3; + +/// Number of HTTP requests per clone +const REQUESTS_PER_CLONE: usize = 20; + +/// Stress test: multiple clones with port forwarding, concurrent HTTP requests +/// +/// Reproduces the "connect succeeded but 0 bytes" pattern seen in CI bench-vm +/// failures. The hypothesis: pasta's `wait_for_port_forwarding()` probe during +/// restore (before guest exists) poisons pasta's internal forwarding state. +#[tokio::test] +async fn test_clone_port_forward_stress_rootless() -> Result<()> { + let (baseline_name, _, snapshot_name, _) = common::unique_names("pf-stress"); + + println!("\n╔═══════════════════════════════════════════════════════════════╗"); + println!("║ Clone Port Forward Stress Test (rootless) ║"); + println!( + "║ {} clones × {} requests each (concurrent) ║", + NUM_CLONES, REQUESTS_PER_CLONE + ); + println!("╚═══════════════════════════════════════════════════════════════╝\n"); + + let fcvm_path = common::find_fcvm_binary()?; + + // Allocate port before baseline so it's baked into the snapshot + let host_port = common::find_available_high_port().context("finding available port")?; + let publish_arg = format!("{}:80", host_port); + + // Step 1: Start baseline VM with nginx + port forwarding + println!( + "Step 1: Starting baseline VM (nginx, --publish {})...", + publish_arg + ); + let (_baseline_child, baseline_pid) = common::spawn_fcvm_with_logs( + &[ + "podman", + "run", + "--name", + &baseline_name, + "--network", + "rootless", + "--publish", + &publish_arg, + "--health-check", + "http://localhost:80", + common::TEST_IMAGE, + ], + &baseline_name, + ) + .await + .context("spawning baseline VM")?; + + println!(" Waiting for baseline VM to become healthy..."); + common::poll_health_by_pid(baseline_pid, 90).await?; + println!(" ✓ Baseline VM healthy (PID: {})", baseline_pid); + + // Verify baseline port forwarding works before snapshotting + let baseline_ip = common::get_loopback_ip(baseline_pid).await?; + println!(" Baseline loopback IP: {}", baseline_ip); + + let baseline_check = common::curl_check(&baseline_ip, host_port, 5).await; + println!( + " Baseline HTTP check: {} ({} bytes)", + if baseline_check.success { + "✓" + } else { + "✗ FAIL" + }, + baseline_check.body_len + ); + assert!( + baseline_check.success && baseline_check.body_len > 0, + "Baseline nginx must respond with data before snapshot" + ); + + // Step 2: Create snapshot + println!("\nStep 2: Creating snapshot..."); + let output = tokio::process::Command::new(&fcvm_path) + .args([ + "snapshot", + "create", + "--pid", + &baseline_pid.to_string(), + "--tag", + &snapshot_name, + ]) + .output() + .await + .context("running snapshot create")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Snapshot creation failed: {}", stderr); + } + println!(" ✓ Snapshot created"); + + // Kill baseline - only need snapshot for clones + common::kill_process(baseline_pid).await; + println!(" Killed baseline VM"); + + // Step 3: Start memory server + println!("\nStep 3: Starting memory server..."); + let (_serve_child, serve_pid) = + common::spawn_fcvm_with_logs(&["snapshot", "serve", &snapshot_name], "uffd-pf-stress") + .await + .context("spawning memory server")?; + + common::poll_serve_ready(&snapshot_name, serve_pid, 30).await?; + println!(" ✓ Memory server ready (PID: {})", serve_pid); + + // Step 4: Spawn all clones concurrently + println!("\nStep 4: Spawning {} clones concurrently...", NUM_CLONES); + let serve_pid_str = serve_pid.to_string(); + + struct CloneInfo { + name: String, + pid: u32, + loopback_ip: String, + _child: tokio::process::Child, + } + + // Spawn all clones concurrently using JoinSet + let mut spawn_set = tokio::task::JoinSet::new(); + + for i in 0..NUM_CLONES { + let clone_name = format!("pf-stress-clone-{}-{}", i, std::process::id()); + println!(" Spawning clone {} ({})...", i, clone_name); + let pid_str = serve_pid_str.clone(); + spawn_set.spawn(async move { + let (child, clone_pid) = common::spawn_fcvm_with_logs( + &["snapshot", "run", "--pid", &pid_str, "--name", &clone_name], + &clone_name, + ) + .await + .context(format!("spawning clone {}", clone_name))?; + + common::poll_health_by_pid(clone_pid, 120).await?; + let loopback_ip = common::get_loopback_ip(clone_pid).await?; + Ok::<_, anyhow::Error>((clone_name, clone_pid, loopback_ip, child)) + }); + } + + let mut clones: Vec = Vec::new(); + while let Some(result) = spawn_set.join_next().await { + let (name, pid, loopback_ip, child) = result??; + println!( + " ✓ Clone {} healthy (PID: {}, IP: {})", + name, pid, loopback_ip + ); + clones.push(CloneInfo { + name, + pid, + loopback_ip, + _child: child, + }); + } + + // Step 5: Verify each clone's port forwarding before the stress storm + // Use curl_check_retry: the L2 channel is ready (verify_port_forwarding passed) + // but the guest application may need a moment after snapshot restore. + println!("\nStep 5: Pre-storm verification of each clone (with retries)..."); + for clone in &clones { + let check = + common::curl_check_retry(&clone.loopback_ip, host_port, 10, Some(clone.pid)).await; + println!( + " Clone {} ({}:{}): {} ({} bytes)", + clone.name, + clone.loopback_ip, + host_port, + if check.success { "OK" } else { "FAIL" }, + check.body_len, + ); + assert!( + check.success && check.body_len > 0, + "Pre-storm curl to clone {} failed after retries: {}", + clone.name, + check.error + ); + } + + // Step 6: Concurrent HTTP requests to all clones simultaneously + println!( + "\nStep 6: Sending {} HTTP requests to each of {} clones (concurrently)...", + REQUESTS_PER_CLONE, NUM_CLONES + ); + + let total_success = Arc::new(AtomicU32::new(0)); + let total_zero_bytes = Arc::new(AtomicU32::new(0)); + let total_errors = Arc::new(AtomicU32::new(0)); + + let start = Instant::now(); + + // Spawn concurrent tasks for all clones + let mut handles = Vec::new(); + for clone in &clones { + let ip = clone.loopback_ip.clone(); + let name = clone.name.clone(); + let success = Arc::clone(&total_success); + let zero = Arc::clone(&total_zero_bytes); + let errors = Arc::clone(&total_errors); + + let handle = tokio::spawn(async move { + let mut clone_success = 0u32; + let mut clone_zero = 0u32; + let mut clone_error = 0u32; + + for req in 0..REQUESTS_PER_CLONE { + let result = common::curl_check(&ip, host_port, 5).await; + + if result.success && result.body_len > 0 { + clone_success += 1; + success.fetch_add(1, Ordering::Relaxed); + } else if result.success && result.body_len == 0 { + clone_zero += 1; + zero.fetch_add(1, Ordering::Relaxed); + println!(" ⚠ Clone {} request {}: 0-byte response!", name, req); + } else { + clone_error += 1; + errors.fetch_add(1, Ordering::Relaxed); + if clone_error <= 3 { + println!( + " ✗ Clone {} request {} to {}:{}: error ({})", + name, req, ip, host_port, result.error + ); + } + // On first error, dump diagnostics + if clone_error == 1 { + let port_str = host_port.to_string(); + + // 1. Check listening sockets for our port + let ss = tokio::process::Command::new("ss") + .args(["-tlnp"]) + .output() + .await; + if let Ok(out) = ss { + let stdout = String::from_utf8_lossy(&out.stdout); + let matching: Vec<&str> = stdout + .lines() + .filter(|l| l.contains(&port_str) || l.starts_with("State")) + .collect(); + println!( + " DIAG clone {} port {} ss output: {:?}", + name, host_port, matching + ); + } + + // 2. Verbose curl to see connection details + let verbose = tokio::process::Command::new("curl") + .args([ + "-v", + "--max-time", + "2", + &format!("http://{}:{}", ip, host_port), + ]) + .output() + .await; + if let Ok(out) = verbose { + let stderr = String::from_utf8_lossy(&out.stderr); + println!( + " DIAG clone {} verbose curl stderr: {}", + name, + stderr.chars().take(500).collect::() + ); + } + + // 3. Check if pasta is still alive (look for pasta process) + let pgrep = tokio::process::Command::new("pgrep") + .args(["-a", "pasta"]) + .output() + .await; + if let Ok(out) = pgrep { + let stdout = String::from_utf8_lossy(&out.stdout); + let matching: Vec<&str> = stdout + .lines() + .filter(|l| l.contains(&ip) || l.contains(&port_str)) + .collect(); + println!( + " DIAG clone {} pasta processes matching {}:{}: {:?}", + name, ip, host_port, matching + ); + // Also show all pasta processes for context + let all: Vec<&str> = stdout.lines().collect(); + println!(" DIAG clone {} all pasta processes: {:?}", name, all); + } + + // 4. Check connections (not just listening) + let ss_all = tokio::process::Command::new("ss") + .args(["-tnp"]) + .output() + .await; + if let Ok(out) = ss_all { + let stdout = String::from_utf8_lossy(&out.stdout); + let matching: Vec<&str> = stdout + .lines() + .filter(|l| l.contains(&ip) || l.starts_with("State")) + .collect(); + println!( + " DIAG clone {} connections to {}: {:?}", + name, ip, matching + ); + } + } + } + } + + (name, clone_success, clone_zero, clone_error) + }); + handles.push(handle); + } + + // Wait for all concurrent tasks to complete + for handle in handles { + let (name, ok, zero, err) = handle.await?; + println!( + " Clone {}: {}/{} OK, {} zero-byte, {} errors", + name, ok, REQUESTS_PER_CLONE, zero, err + ); + } + + let elapsed = start.elapsed(); + let total_requests = (NUM_CLONES * REQUESTS_PER_CLONE) as u32; + let success = total_success.load(Ordering::Relaxed); + let zero_bytes = total_zero_bytes.load(Ordering::Relaxed); + let errs = total_errors.load(Ordering::Relaxed); + + // Cleanup + println!("\nCleaning up..."); + for clone in &clones { + common::kill_process(clone.pid).await; + } + common::kill_process(serve_pid).await; + println!(" Killed all clones and memory server"); + + // Results + println!("\n╔═══════════════════════════════════════════════════════════════╗"); + println!("║ RESULTS ║"); + println!("╠═══════════════════════════════════════════════════════════════╣"); + println!( + "║ Total requests: {:4} ({:.1}s concurrent) ║", + total_requests, + elapsed.as_secs_f64() + ); + println!( + "║ Successful: {:4} ║", + success + ); + println!( + "║ Zero-byte: {:4} (pasta poisoning pattern) ║", + zero_bytes + ); + println!( + "║ Errors: {:4} ║", + errs + ); + println!("╚═══════════════════════════════════════════════════════════════╝"); + + if zero_bytes > 0 { + anyhow::bail!( + "PASTA POISONING DETECTED: {} out of {} requests returned 0 bytes. \ + This confirms that wait_for_port_forwarding() during restore \ + (before guest exists) corrupts pasta's forwarding state.", + zero_bytes, + total_requests + ); + } + + if errs > 0 { + anyhow::bail!( + "Port forwarding errors: {} out of {} requests failed", + errs, + total_requests + ); + } + + println!("\n✅ CLONE PORT FORWARD STRESS TEST PASSED!"); + println!( + " All {} requests across {} clones returned valid data", + total_requests, NUM_CLONES + ); + Ok(()) +} diff --git a/tests/test_snapshot_clone.rs b/tests/test_snapshot_clone.rs index e2bfb6fe..b31ed4ee 100644 --- a/tests/test_snapshot_clone.rs +++ b/tests/test_snapshot_clone.rs @@ -1503,55 +1503,27 @@ async fn test_clone_port_forward_rootless() -> Result<()> { println!("\nStep 5: Testing port forwarding..."); // Get clone's loopback IP from state (rootless uses 127.x.y.z) - let output = tokio::process::Command::new(&fcvm_path) - .args(["ls", "--json", "--pid", &clone_pid.to_string()]) - .output() - .await - .context("getting clone state")?; - - let stdout = String::from_utf8_lossy(&output.stdout); - let loopback_ip: String = serde_json::from_str::>(&stdout) - .ok() - .and_then(|v| v.first().cloned()) - .and_then(|v| { - v.get("config")? - .get("network")? - .get("loopback_ip")? - .as_str() - .map(|s| s.to_string()) - }) - .unwrap_or_default(); + let loopback_ip = common::get_loopback_ip(clone_pid).await?; println!(" Clone loopback IP: {}", loopback_ip); // Test: Access via loopback IP and forwarded port - // verify_port_forwarding() runs after snapshot restore and confirms end-to-end - // data flow through pasta's loopback → bridge → guest path before health monitor. + // verify_port_forwarding() confirmed the L2 channel is ready (ping + TCP connect). + // Use retry because the guest application may need a moment after restore. println!( - " Testing access via loopback {}:{}...", + " Testing access via loopback {}:{} (with retries)...", loopback_ip, host_port ); - let loopback_result = tokio::process::Command::new("curl") - .args([ - "-s", - "--max-time", - "5", - &format!("http://{}:{}", loopback_ip, host_port), - ]) - .output() - .await - .context("curl loopback port forward")?; - - let loopback_works = loopback_result.status.success() && !loopback_result.stdout.is_empty(); + let loopback_check = + common::curl_check_retry(&loopback_ip, host_port, 10, Some(clone_pid)).await; + let loopback_works = loopback_check.success && loopback_check.body_len > 0; if loopback_works { - let response = String::from_utf8_lossy(&loopback_result.stdout); - println!(" Loopback access: ✓ OK ({} bytes)", response.len()); - } else { - println!(" Loopback access: ✗ FAIL"); println!( - " stderr: {}", - String::from_utf8_lossy(&loopback_result.stderr) + " Loopback access: ✓ OK ({} bytes)", + loopback_check.body_len ); + } else { + println!(" Loopback access: ✗ FAIL ({})", loopback_check.error); } // Cleanup @@ -1691,53 +1663,26 @@ async fn test_clone_port_forward_routed() -> Result<()> { println!("\nStep 5: Testing port forwarding..."); // Get clone's loopback IP from state (routed uses TCP proxy + loopback like rootless) - let output = tokio::process::Command::new(&fcvm_path) - .args(["ls", "--json", "--pid", &clone_pid.to_string()]) - .output() - .await - .context("getting clone state")?; - - let stdout = String::from_utf8_lossy(&output.stdout); - let loopback_ip: String = serde_json::from_str::>(&stdout) - .ok() - .and_then(|v| v.first().cloned()) - .and_then(|v| { - v.get("config")? - .get("network")? - .get("loopback_ip")? - .as_str() - .map(|s| s.to_string()) - }) - .unwrap_or_default(); + let loopback_ip = common::get_loopback_ip(clone_pid).await?; println!(" Clone loopback IP: {}", loopback_ip); // Test: Access via loopback IP and forwarded port + // Use retry because the guest application may need a moment after restore. println!( - " Testing access via loopback {}:{}...", + " Testing access via loopback {}:{} (with retries)...", loopback_ip, host_port ); - let loopback_result = tokio::process::Command::new("curl") - .args([ - "-s", - "--max-time", - "5", - &format!("http://{}:{}", loopback_ip, host_port), - ]) - .output() - .await - .context("curl loopback port forward")?; - - let loopback_works = loopback_result.status.success() && !loopback_result.stdout.is_empty(); + let loopback_check = + common::curl_check_retry(&loopback_ip, host_port, 10, Some(clone_pid)).await; + let loopback_works = loopback_check.success && loopback_check.body_len > 0; if loopback_works { - let response = String::from_utf8_lossy(&loopback_result.stdout); - println!(" Loopback access: ✓ OK ({} bytes)", response.len()); - } else { - println!(" Loopback access: ✗ FAIL"); println!( - " stderr: {}", - String::from_utf8_lossy(&loopback_result.stderr) + " Loopback access: ✓ OK ({} bytes)", + loopback_check.body_len ); + } else { + println!(" Loopback access: ✗ FAIL ({})", loopback_check.error); } // Cleanup