diff --git a/fuse-pipe/src/protocol/request.rs b/fuse-pipe/src/protocol/request.rs index 1eff37fa..f5f686cd 100644 --- a/fuse-pipe/src/protocol/request.rs +++ b/fuse-pipe/src/protocol/request.rs @@ -418,6 +418,108 @@ impl VolumeRequest { VolumeRequest::Forget { .. } | VolumeRequest::BatchForget { .. } ) } + + /// Extract the file handle (fh) if this operation uses one. + pub fn fh(&self) -> Option { + match self { + Self::Read { fh, .. } + | Self::Write { fh, .. } + | Self::Release { fh, .. } + | Self::Flush { fh, .. } + | Self::Fsync { fh, .. } + | Self::Releasedir { fh, .. } + | Self::Fsyncdir { fh, .. } + | Self::Fallocate { fh, .. } + | Self::Lseek { fh, .. } + | Self::Getlk { fh, .. } + | Self::Setlk { fh, .. } + | Self::Readdirplus { fh, .. } => Some(*fh), + Self::Setattr { fh, .. } => *fh, + Self::CopyFileRange { fh_in, .. } | Self::RemapFileRange { fh_in, .. } => Some(*fh_in), + _ => None, + } + } + + /// Clone this request with a different file handle. + pub fn with_fh(&self, new_fh: u64) -> Self { + let mut cloned = self.clone(); + match &mut cloned { + Self::Read { fh, .. } + | Self::Write { fh, .. } + | Self::Release { fh, .. } + | Self::Flush { fh, .. } + | Self::Fsync { fh, .. } + | Self::Releasedir { fh, .. } + | Self::Fsyncdir { fh, .. } + | Self::Fallocate { fh, .. } + | Self::Lseek { fh, .. } + | Self::Getlk { fh, .. } + | Self::Setlk { fh, .. } + | Self::Readdirplus { fh, .. } => *fh = new_fh, + Self::Setattr { fh, .. } => *fh = Some(new_fh), + // Only remap fh_in; fh_out references a different file and is NOT + // currently remapped after snapshot restore. This is a known limitation: + // copy_file_range/remap_file_range with a stale fh_out will fail. + // TODO: add fh_out()/with_fh_out() helpers for independent remapping. + Self::CopyFileRange { fh_in, .. } | Self::RemapFileRange { fh_in, .. } => { + *fh_in = new_fh; + } + _ => {} + } + cloned + } + + /// Check if this is a directory handle operation. + pub fn is_dir_handle_op(&self) -> bool { + matches!( + self, + Self::Readdirplus { .. } | Self::Releasedir { .. } | Self::Fsyncdir { .. } + ) + } + + /// Extract the inode from this request (if present). + pub fn ino(&self) -> Option { + match self { + Self::Getattr { ino } + | Self::Readlink { ino } + | Self::Statfs { ino } + | Self::Read { ino, .. } + | Self::Write { ino, .. } + | Self::Release { ino, .. } + | Self::Flush { ino, .. } + | Self::Fsync { ino, .. } + | Self::Open { ino, .. } + | Self::Opendir { ino, .. } + | Self::Releasedir { ino, .. } + | Self::Fsyncdir { ino, .. } + | Self::Fallocate { ino, .. } + | Self::Lseek { ino, .. } + | Self::Getlk { ino, .. } + | Self::Setlk { ino, .. } + | Self::Readdirplus { ino, .. } + | Self::Access { ino, .. } + | Self::Forget { ino, .. } => Some(*ino), + Self::CopyFileRange { ino_in, .. } | Self::RemapFileRange { ino_in, .. } => { + Some(*ino_in) + } + Self::Lookup { parent, .. } + | Self::Readdir { ino: parent, .. } + | Self::Mkdir { parent, .. } + | Self::Mknod { parent, .. } + | Self::Rmdir { parent, .. } + | Self::Create { parent, .. } + | Self::Unlink { parent, .. } + | Self::Rename { parent, .. } + | Self::Symlink { parent, .. } + | Self::Link { ino: parent, .. } => Some(*parent), + Self::Setattr { ino, .. } => Some(*ino), + Self::Setxattr { ino, .. } + | Self::Getxattr { ino, .. } + | Self::Listxattr { ino, .. } + | Self::Removexattr { ino, .. } => Some(*ino), + Self::BatchForget { .. } => None, + } + } } #[cfg(test)] diff --git a/fuse-pipe/src/protocol/response.rs b/fuse-pipe/src/protocol/response.rs index 7be3a2de..641804f2 100644 --- a/fuse-pipe/src/protocol/response.rs +++ b/fuse-pipe/src/protocol/response.rs @@ -119,6 +119,11 @@ impl VolumeResponse { matches!(self, VolumeResponse::Error { .. }) } + /// Check if this is a bad file descriptor error (stale handle after restore). + pub fn is_ebadf(&self) -> bool { + self.errno() == Some(libc::EBADF) + } + /// Check if this is a success response. pub fn is_ok(&self) -> bool { !self.is_error() diff --git a/fuse-pipe/src/server/pipelined.rs b/fuse-pipe/src/server/pipelined.rs index 5ecf30a6..4c5e0672 100644 --- a/fuse-pipe/src/server/pipelined.rs +++ b/fuse-pipe/src/server/pipelined.rs @@ -47,6 +47,17 @@ impl AsyncServer { } } + /// Create a server from a pre-wrapped `Arc`. + /// + /// Use this when the caller needs to retain a reference to the handler + /// (e.g., to call `RemapFs::serialize_table()` while the server is running). + pub fn from_arc(handler: Arc) -> Self { + Self { + handler, + config: ServerConfig::default(), + } + } + /// Serve on a Unix socket. /// /// This function blocks forever, accepting and handling connections. diff --git a/fuse-pipe/src/server/remap.rs b/fuse-pipe/src/server/remap.rs index 61b1dcbc..d065d750 100644 --- a/fuse-pipe/src/server/remap.rs +++ b/fuse-pipe/src/server/remap.rs @@ -27,6 +27,8 @@ pub struct RemapFs { stable_to_inner: DashMap, /// stable_ino → relative path from mount root (for serialization) paths: DashMap, + /// old_fh → new_fh (stale handles lazily reopened after snapshot restore) + handle_remap: DashMap, } impl RemapFs { @@ -48,6 +50,7 @@ impl RemapFs { inner_to_stable, stable_to_inner, paths, + handle_remap: DashMap::new(), } } @@ -533,8 +536,16 @@ impl RemapFs { /// /// Paths that no longer exist on the host are skipped (logged as warnings). pub fn restore_from_table(inner: T, json: &str) -> Self { - let table: std::collections::BTreeMap = - serde_json::from_str(json).unwrap_or_default(); + let table: std::collections::BTreeMap = match serde_json::from_str(json) { + Ok(t) => t, + Err(e) => { + error!( + "failed to parse inode table JSON, starting with empty table: {}", + e + ); + std::collections::BTreeMap::new() + } + }; let inner_to_stable = DashMap::new(); let stable_to_inner = DashMap::new(); @@ -599,6 +610,7 @@ impl RemapFs { inner_to_stable, stable_to_inner, paths, + handle_remap: DashMap::new(), } } @@ -606,6 +618,75 @@ impl RemapFs { pub fn inner(&self) -> &T { &self.inner } + + /// Lazily reopen a stale file handle after snapshot restore. + /// + /// When a clone restores from snapshot, old file handles reference the previous + /// VolumeServer's handle table (now gone). This method opens the file by inode + /// on the new server and caches the mapping so subsequent operations on the same + /// stale handle work without re-opening. + fn try_reopen_handle(&self, stable_ino: u64, old_fh: u64, is_dir: bool) -> Option { + // Check if already remapped (concurrent threads may race here) + if let Some(new_fh) = self.handle_remap.get(&old_fh) { + return Some(*new_fh); + } + + let inner_ino = self.to_inner(stable_ino)?; + + let resp = if is_dir { + self.inner.handle_request(&VolumeRequest::Opendir { + ino: inner_ino, + flags: 0, + uid: 0, + gid: 0, + pid: 0, + }) + } else { + self.inner.handle_request(&VolumeRequest::Open { + ino: inner_ino, + flags: libc::O_RDWR as u32, + uid: 0, + gid: 0, + pid: 0, + }) + }; + + let new_fh = match &resp { + VolumeResponse::Opened { fh, .. } => *fh, + VolumeResponse::Created { fh, .. } => *fh, + _ => return None, + }; + + // Atomic insert — handles concurrent reopen races. + // If another thread won the race, release our fd to avoid leaking it. + use dashmap::mapref::entry::Entry; + match self.handle_remap.entry(old_fh) { + Entry::Occupied(e) => { + // Another thread already reopened this handle — release ours + let release_req = if is_dir { + VolumeRequest::Releasedir { + ino: inner_ino, + fh: new_fh, + } + } else { + VolumeRequest::Release { + ino: inner_ino, + fh: new_fh, + } + }; + self.inner.handle_request(&release_req); + Some(*e.get()) + } + Entry::Vacant(e) => { + debug!( + old_fh, + new_fh, stable_ino, inner_ino, is_dir, "reopened stale handle after restore" + ); + e.insert(new_fh); + Some(new_fh) + } + } + } } impl FilesystemHandler for RemapFs { @@ -621,11 +702,46 @@ impl FilesystemHandler for RemapFs { return VolumeResponse::io_error(); } + // Translate stale file handles from snapshot restore + if let Some(old_fh) = remapped.fh() { + if let Some(new_fh) = self.handle_remap.get(&old_fh) { + remapped = remapped.with_fh(*new_fh); + } + } + // Delegate to inner handler let response = self .inner .handle_request_with_groups(&remapped, supplementary_groups); + // If EBADF and this request uses a file handle, try lazy re-open + if response.is_ebadf() { + if let Some(old_fh) = request.fh() { + if let Some(stable_ino) = request.ino() { + if let Some(new_fh) = + self.try_reopen_handle(stable_ino, old_fh, request.is_dir_handle_op()) + { + // Retry with the reopened handle + let retry = remapped.with_fh(new_fh); + let retry_resp = self + .inner + .handle_request_with_groups(&retry, supplementary_groups); + return self.remap_response(request, retry_resp); + } + } + } + } + + // Handle Release/Releasedir: clean up handle_remap entry + if matches!( + request, + VolumeRequest::Release { .. } | VolumeRequest::Releasedir { .. } + ) { + if let Some(old_fh) = request.fh() { + self.handle_remap.remove(&old_fh); + } + } + // Remap response inodes (inner → stable) and register new mappings // Use ORIGINAL request (stable inodes) for path computation self.remap_response(request, response) diff --git a/src/commands/common.rs b/src/commands/common.rs index dc808b68..a89fb7dc 100644 --- a/src/commands/common.rs +++ b/src/commands/common.rs @@ -557,6 +557,11 @@ pub async fn save_vm_state_with_network( pub struct CleanupContext { pub vm_id: String, pub volume_server_handles: Vec>, + /// RemapFs references for portable volumes (for inode table serialization). + /// One entry per volume; `Some` for portable volumes, `None` for plain. + /// Dropped during cleanup — the Arc prevents the RemapFs from being freed + /// while the VolumeServer task still holds a reference. + pub remap_refs: Vec>>>, pub data_dir: PathBuf, pub health_cancel_token: Option, pub health_monitor_handle: Option>, @@ -583,6 +588,7 @@ pub async fn cleanup_vm( let CleanupContext { vm_id, volume_server_handles, + remap_refs: _, data_dir, health_cancel_token, health_monitor_handle, diff --git a/src/commands/podman/mod.rs b/src/commands/podman/mod.rs index 83d3f9d9..7ffa1a78 100644 --- a/src/commands/podman/mod.rs +++ b/src/commands/podman/mod.rs @@ -709,7 +709,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result> { }) .collect(); - let volume_server_handles = spawn_volume_servers(&volume_configs, &vsock_socket_path) + let volume_servers = spawn_volume_servers(&volume_configs, &vsock_socket_path) .await .context("spawning VolumeServers")?; @@ -854,7 +854,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result> { warn!("VM setup failed, cleaning up resources"); // Abort VolumeServer tasks - for handle in volume_server_handles { + for handle in volume_servers.handles { handle.abort(); } @@ -905,7 +905,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result> { data_dir, vm_manager, holder_child, - volume_server_handles, + volume_servers, network, network_config, state_manager, @@ -977,6 +977,7 @@ pub async fn run_vm_loop(ctx: &mut VmContext, cancel: CancellationToken) -> Resu disk_path: &ctx.disk_path, volume_configs: &ctx.volume_configs, parent_snapshot_key: None, // Pre-start is the first snapshot, no parent + remap_refs: &ctx.volume_servers.remap_refs, }; match create_snapshot_interruptible(&snap, &cancel).await { SnapshotOutcome::Interrupted => { @@ -1030,6 +1031,7 @@ pub async fn run_vm_loop(ctx: &mut VmContext, cancel: CancellationToken) -> Resu disk_path: &ctx.disk_path, volume_configs: &ctx.volume_configs, parent_snapshot_key: parent_key.as_deref(), + remap_refs: &ctx.volume_servers.remap_refs, }; tokio::select! { outcome = create_snapshot_interruptible(&snap, &cancel) => { @@ -1072,7 +1074,8 @@ pub async fn cleanup_vm_context(mut ctx: VmContext) { super::common::cleanup_vm( super::common::CleanupContext { vm_id: ctx.vm_id, - volume_server_handles: ctx.volume_server_handles, + volume_server_handles: ctx.volume_servers.handles, + remap_refs: ctx.volume_servers.remap_refs, data_dir: ctx.data_dir, health_cancel_token: Some(ctx.health_cancel_token), health_monitor_handle: Some(ctx.health_monitor_handle), diff --git a/src/commands/podman/snapshot.rs b/src/commands/podman/snapshot.rs index 33bbae5f..fa6f4bf5 100644 --- a/src/commands/podman/snapshot.rs +++ b/src/commands/podman/snapshot.rs @@ -39,6 +39,8 @@ pub struct CreateSnapshotParams<'a> { pub disk_path: &'a Path, pub volume_configs: &'a [VolumeConfig], pub parent_snapshot_key: Option<&'a str>, + /// RemapFs references for portable volumes — used to serialize inode tables at snapshot time. + pub remap_refs: &'a [Option>>], } /// Create a podman snapshot from a running VM. @@ -59,6 +61,7 @@ pub async fn create_podman_snapshot(snap: &CreateSnapshotParams<'_>) -> Result<( disk_path, volume_configs, parent_snapshot_key, + remap_refs: _, } = snap; // Snapshots stored in snapshot_dir with snapshot_key as name let snapshot_dir = paths::snapshot_dir().join(snapshot_key); @@ -119,7 +122,29 @@ pub async fn create_podman_snapshot(snap: &CreateSnapshotParams<'_>) -> Result<( disk_path, parent_dir.as_deref(), ) - .await + .await?; + + // Serialize inode tables for portable volumes into the snapshot directory. + // These are loaded by clone VolumeServers via restore_from_table() to preserve + // inode numbering across snapshot/restore — eliminating the TTL glitch window. + for (idx, remap_ref) in snap.remap_refs.iter().enumerate() { + if let Some(remap) = remap_ref { + let port = snap.volume_configs.get(idx).map(|c| c.port).unwrap_or(0); + let json = remap.serialize_table(); + let table_path = snapshot_dir.join(format!("volume-{}-inode-table.json", port)); + if let Err(e) = tokio::fs::write(&table_path, &json).await { + tracing::warn!(port, error = %e, "failed to serialize inode table"); + } else { + tracing::info!( + port, + bytes = json.len(), + "serialized inode table to snapshot" + ); + } + } + } + + Ok(()) } /// Create a snapshot with signal interruption support. diff --git a/src/commands/podman/types.rs b/src/commands/podman/types.rs index bc28061f..9dcb29f0 100644 --- a/src/commands/podman/types.rs +++ b/src/commands/podman/types.rs @@ -17,7 +17,7 @@ pub struct VmContext { pub data_dir: PathBuf, pub vm_manager: VmManager, pub holder_child: Option, - pub volume_server_handles: Vec>, + pub volume_servers: crate::volume::SpawnedVolumes, pub network: Box, pub network_config: NetworkConfig, pub state_manager: StateManager, diff --git a/src/commands/snapshot.rs b/src/commands/snapshot.rs index 3b7756cf..bc1b18fd 100644 --- a/src/commands/snapshot.rs +++ b/src/commands/snapshot.rs @@ -20,7 +20,7 @@ use crate::state::{ }; use crate::storage::SnapshotManager; use crate::uffd::UffdServer; -use crate::volume::{spawn_volume_servers, VolumeConfig}; +use crate::volume::VolumeConfig; use super::common::{ MemoryBackend, RestoreParams, RuntimeConfig, SnapshotRestoreConfig, VSOCK_OUTPUT_PORT, @@ -590,9 +590,34 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { }) .collect(); - let volume_server_handles = spawn_volume_servers(&volume_configs, &clone_vsock_base) - .await - .context("spawning VolumeServers for clone")?; + // Load serialized inode tables from snapshot (if available) for portable volumes. + // This restores the RemapFs inode mappings so clones see the same inodes as the baseline, + // avoiding the 1s TTL glitch window where old inodes return EIO. + let snap_dir = snapshot_config + .memory_path + .parent() + .expect("snapshot memory_path must have parent"); + let mut inode_tables: Vec> = Vec::with_capacity(volume_configs.len()); + for vol in &snapshot_config.metadata.volumes { + if vol.portable { + let table_path = snap_dir.join(format!("volume-{}-inode-table.json", vol.vsock_port)); + let table = tokio::fs::read_to_string(&table_path).await.ok(); + if table.is_some() { + info!(port = vol.vsock_port, "loaded inode table from snapshot"); + } + inode_tables.push(table); + } else { + inode_tables.push(None); + } + } + + let volume_servers = crate::volume::spawn_volume_servers_with_tables( + &volume_configs, + &clone_vsock_base, + &inode_tables, + ) + .await + .context("spawning VolumeServers for clone")?; // Setup TTY/output socket paths (inherited from snapshot metadata) let tty_mode = snapshot_config.metadata.tty; @@ -913,7 +938,7 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { implicit_uffd_cancel.cancel(); // Abort VolumeServer tasks - for handle in volume_server_handles { + for handle in volume_servers.handles { handle.abort(); } @@ -1027,7 +1052,8 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { super::common::cleanup_vm( super::common::CleanupContext { vm_id: vm_id.clone(), - volume_server_handles, + volume_server_handles: volume_servers.handles, + remap_refs: volume_servers.remap_refs, data_dir: data_dir.clone(), health_cancel_token: None, // no health monitor in exec path health_monitor_handle: None, @@ -1165,6 +1191,7 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { disk_path: &disk_path, volume_configs: &volume_configs, parent_snapshot_key: parent_key.as_deref(), + remap_refs: &volume_servers.remap_refs, }; tokio::select! { outcome = create_snapshot_interruptible(&snap, &cancel) => { @@ -1203,7 +1230,8 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { super::common::cleanup_vm( super::common::CleanupContext { vm_id: vm_id.clone(), - volume_server_handles, + volume_server_handles: volume_servers.handles, + remap_refs: volume_servers.remap_refs, data_dir: data_dir.clone(), health_cancel_token: Some(health_cancel_token), health_monitor_handle: Some(health_monitor_handle), diff --git a/src/volume/mod.rs b/src/volume/mod.rs index e7b5d64a..248be21c 100644 --- a/src/volume/mod.rs +++ b/src/volume/mod.rs @@ -21,6 +21,7 @@ use anyhow::{Context, Result}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::task::JoinHandle; use tracing::{error, info}; @@ -58,18 +59,24 @@ pub struct VolumeServer { host_path: PathBuf, } +/// Resolve and validate a volume host path (canonicalize + check is_dir). +fn resolve_volume_path(config: &VolumeConfig) -> Result { + let host_path = config + .host_path + .canonicalize() + .with_context(|| format!("Failed to resolve path: {:?}", config.host_path))?; + + if !host_path.is_dir() { + anyhow::bail!("Volume path is not a directory: {:?}", host_path); + } + + Ok(host_path) +} + impl VolumeServer { /// Create a new volume server. pub fn new(config: VolumeConfig) -> Result { - let host_path = config - .host_path - .canonicalize() - .with_context(|| format!("Failed to resolve path: {:?}", config.host_path))?; - - if !host_path.is_dir() { - anyhow::bail!("Volume path is not a directory: {:?}", host_path); - } - + let host_path = resolve_volume_path(&config)?; Ok(Self { config, host_path }) } @@ -135,6 +142,40 @@ impl VolumeServer { } } + /// Serve with a pre-created `Arc`, returning the reference for serialization. + /// + /// The caller retains the Arc to call `serialize_table()` at snapshot time + /// while the server is running. + async fn serve_vsock_with_remap_arc( + host_path: &Path, + config: &VolumeConfig, + vsock_socket_path: &Path, + remap: Arc>, + ready: Option>, + ) -> Result<()> { + let base_path = vsock_socket_path.to_string_lossy(); + + info!( + port = config.port, + host_path = %host_path.display(), + read_only = config.read_only, + socket = format!("{}_{}", base_path, config.port), + "VolumeServer starting (portable, Arc)" + ); + + let server = fuse_pipe::AsyncServer::from_arc(remap); + server + .serve_vsock_forwarded_with_ready_signal(&base_path, config.port, ready) + .await + .with_context(|| { + format!( + "VolumeServer failed for port {} serving {}", + config.port, + host_path.display() + ) + }) + } + /// Serve volumes over a Unix socket (for testing/development). pub async fn serve_unix(&self, socket_path: &Path) -> Result<()> { let path_str = socket_path.to_string_lossy(); @@ -164,59 +205,111 @@ impl VolumeServer { } } +/// Result of spawning volume servers. Holds task handles and optional RemapFs +/// references for portable volumes (needed for inode table serialization at snapshot time). +pub struct SpawnedVolumes { + pub handles: Vec>, + /// One entry per volume config. `Some(arc)` for portable volumes, `None` for plain. + pub remap_refs: Vec>>>, +} + /// Spawn multiple VolumeServers and wait for all to be ready. /// -/// This is a convenience function that: -/// 1. Creates VolumeServer instances from configs -/// 2. Spawns each server with a ready signal -/// 3. Waits for ALL servers to signal ready (socket bound) -/// 4. Returns the task handles for later cleanup -/// -/// # Arguments -/// * `configs` - List of volume configurations to spawn -/// * `vsock_socket_path` - Base path for vsock sockets -/// -/// # Returns -/// Vector of JoinHandles that should be aborted during cleanup +/// For portable volumes, creates `Arc` and retains references in `SpawnedVolumes` +/// so callers can serialize the inode table at snapshot time. pub async fn spawn_volume_servers( configs: &[VolumeConfig], vsock_socket_path: &Path, -) -> Result>> { +) -> Result { + spawn_volume_servers_with_tables(configs, vsock_socket_path, &[]).await +} + +/// Spawn VolumeServers with optional inode table restoration for portable volumes. +/// +/// When `inode_tables[i]` is `Some(json)`, the portable volume at index `i` uses +/// `RemapFs::restore_from_table()` instead of `RemapFs::new()`, preserving inode +/// numbering from the snapshot baseline. +pub async fn spawn_volume_servers_with_tables( + configs: &[VolumeConfig], + vsock_socket_path: &Path, + inode_tables: &[Option], +) -> Result { if configs.is_empty() { - return Ok(Vec::new()); + return Ok(SpawnedVolumes { + handles: Vec::new(), + remap_refs: Vec::new(), + }); } let mut handles = Vec::with_capacity(configs.len()); + let mut remap_refs = Vec::with_capacity(configs.len()); let mut ready_receivers = Vec::with_capacity(configs.len()); - for config in configs { - let server = VolumeServer::new(config.clone()) - .with_context(|| format!("creating VolumeServer for {}", config.host_path.display()))?; + for (idx, config) in configs.iter().enumerate() { + let host_path = resolve_volume_path(config)?; - // Create oneshot channel for ready signal let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); ready_receivers.push(ready_rx); - let vsock_path = vsock_socket_path.to_path_buf(); let port = config.port; - let handle = tokio::spawn(async move { - if let Err(e) = server - .serve_vsock_with_ready_signal(&vsock_path, Some(ready_tx)) + + if config.portable { + let fs = fuse_pipe::PassthroughFs::new(&host_path); + let table = inode_tables.get(idx).and_then(|t| t.as_ref()); + + let remap = if let Some(json) = table { + info!(port, "restoring RemapFs from serialized inode table"); + Arc::new(fuse_pipe::RemapFs::restore_from_table(fs, json)) + } else { + Arc::new(fuse_pipe::RemapFs::new(fs)) + }; + + remap_refs.push(Some(Arc::clone(&remap))); + + let cfg = config.clone(); + let hp = host_path.clone(); + let vsock_path = vsock_socket_path.to_path_buf(); + let handle = tokio::spawn(async move { + if let Err(e) = VolumeServer::serve_vsock_with_remap_arc( + &hp, + &cfg, + &vsock_path, + remap, + Some(ready_tx), + ) .await - { - error!("VolumeServer error for port {}: {}", port, e); - } - }); + { + error!("VolumeServer error for port {}: {}", port, e); + } + }); + handles.push(handle); + } else { + remap_refs.push(None); + + let server = VolumeServer::new(config.clone()).with_context(|| { + format!("creating VolumeServer for {}", config.host_path.display()) + })?; + + let vsock_path = vsock_socket_path.to_path_buf(); + let handle = tokio::spawn(async move { + if let Err(e) = server + .serve_vsock_with_ready_signal(&vsock_path, Some(ready_tx)) + .await + { + error!("VolumeServer error for port {}: {}", port, e); + } + }); + handles.push(handle); + } info!( port = config.port, host_path = %config.host_path.display(), guest_path = %config.guest_path.display(), read_only = config.read_only, + portable = config.portable, "spawned VolumeServer" ); - - handles.push(handle); } // Wait for ALL VolumeServers to signal ready (socket bound) @@ -228,5 +321,8 @@ pub async fn spawn_volume_servers( info!("all {} VolumeServer(s) ready", configs.len()); - Ok(handles) + Ok(SpawnedVolumes { + handles, + remap_refs, + }) } diff --git a/tests/test_portable_volumes.rs b/tests/test_portable_volumes.rs index af8dbe4e..df5d4bb4 100644 --- a/tests/test_portable_volumes.rs +++ b/tests/test_portable_volumes.rs @@ -587,3 +587,97 @@ async fn test_remap_fs_snapshot_file_replace() -> Result<()> { println!("PASSED: test_remap_fs_snapshot_file_replace"); Ok(()) } + +// ============================================================================= +// Test 18: Open file handles survive snapshot/clone restore +// ============================================================================= + +/// Verify that portable volume clone reads and writes work after snapshot restore. +/// +/// Tests: +/// 1. Clone can read files that existed at snapshot time +/// 2. Clone can write new files (FUSE mount works bidirectionally) +/// 3. Host sees clone's writes (data integrity through clone VolumeServer) +/// +/// Note: The inode table serialize/restore path runs in the pre-start +/// snapshot cache (production path). The CLI `fcvm snapshot create` path +/// used by tests creates snapshots in a separate process that doesn't have +/// access to the running VolumeServer's RemapFs. In the CLI path, inodes +/// are re-discovered via FUSE TTL expiry (~1s) which the test accommodates +/// via retry loops in fuse_read(). +#[tokio::test] +async fn test_open_handle_survives_snapshot_restore() -> Result<()> { + let (vm_name, clone_name, snap_name, _) = common::unique_names("pv-handle"); + let host_dir = format!("/tmp/fcvm-pv-handle-{}", std::process::id()); + tokio::fs::create_dir_all(&host_dir).await?; + + println!("=== test_open_handle_survives_snapshot_restore ==="); + + // Create files that will be used across snapshot + tokio::fs::write(format!("{}/watched.txt", host_dir), "line1\n").await?; + + // Start VM with portable volumes (RW) + let (_child, pid) = start_portable_vm(&vm_name, &host_dir, "/mnt/test", false).await?; + common::poll_health_by_pid(pid, 180).await?; + println!(" Baseline VM healthy (PID: {})", pid); + + // Read file to populate RemapFs inode table before snapshot + let content = fuse_read(pid, "/mnt/test/watched.txt", 30).await?; + assert!( + content.contains("line1"), + "initial read: {}", + content.trim() + ); + println!(" Initial read: '{}'", content.trim()); + + // Take snapshot + common::create_snapshot_by_pid(pid, &snap_name).await?; + println!(" Snapshot created"); + + // Start clone + let (_serve, serve_pid) = common::start_memory_server(&snap_name).await?; + let (_clone, clone_pid) = common::spawn_clone(serve_pid, &clone_name).await?; + common::poll_health_by_pid(clone_pid, 180).await?; + println!(" Clone healthy (PID: {})", clone_pid); + + // Verify snapshot content is readable on clone (inode table re-discovered via TTL) + let direct = fuse_read(clone_pid, "/mnt/test/watched.txt", 30).await?; + assert!( + direct.contains("line1"), + "clone should read snapshot content, got: {}", + direct.trim() + ); + println!(" Clone reads snapshot content: '{}'", direct.trim()); + + // Verify clone can write new files (proves FUSE mount fully operational) + common::exec_in_container( + clone_pid, + &["sh -c 'echo clone_wrote > /mnt/test/new_file.txt'"], + ) + .await?; + let new_content = fuse_read(clone_pid, "/mnt/test/new_file.txt", 10).await?; + assert!( + new_content.contains("clone_wrote"), + "clone write failed, got: {}", + new_content.trim() + ); + println!(" Clone write works: '{}'", new_content.trim()); + + // Verify host sees the clone's write + let host_content = tokio::fs::read_to_string(format!("{}/new_file.txt", host_dir)).await?; + assert!( + host_content.contains("clone_wrote"), + "host should see clone write, got: {}", + host_content.trim() + ); + println!(" Host sees clone write: '{}'", host_content.trim()); + + // Cleanup + common::kill_process(clone_pid).await; + common::kill_process(serve_pid).await; + common::kill_process(pid).await; + tokio::fs::remove_dir_all(&host_dir).await.ok(); + + println!("PASSED: test_open_handle_survives_snapshot_restore"); + Ok(()) +}