Skip to content
102 changes: 102 additions & 0 deletions fuse-pipe/src/protocol/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,108 @@ impl VolumeRequest {
VolumeRequest::Forget { .. } | VolumeRequest::BatchForget { .. }
)
}

/// Extract the file handle (fh) if this operation uses one.
pub fn fh(&self) -> Option<u64> {
match self {
Self::Read { fh, .. }
| Self::Write { fh, .. }
| Self::Release { fh, .. }
Comment on lines +423 to +427
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include Setattr fh in stale-handle remapping

The new stale-handle recovery path depends on VolumeRequest::fh()/with_fh(), but this helper set does not handle Setattr { fh: Some(...) } requests. Setattr is used for fd-based truncate operations, so after snapshot restore an ftruncate() on a pre-snapshot descriptor can return EBADF permanently because RemapFs never remaps or reopens that handle.

Useful? React with 👍 / 👎.

| Self::Flush { fh, .. }
| Self::Fsync { fh, .. }
| Self::Releasedir { fh, .. }
| Self::Fsyncdir { fh, .. }
| Self::Fallocate { fh, .. }
| Self::Lseek { fh, .. }
| Self::Getlk { fh, .. }
| Self::Setlk { fh, .. }
| Self::Readdirplus { fh, .. } => Some(*fh),
Self::Setattr { fh, .. } => *fh,
Self::CopyFileRange { fh_in, .. } | Self::RemapFileRange { fh_in, .. } => Some(*fh_in),
_ => None,
}
}

/// Clone this request with a different file handle.
pub fn with_fh(&self, new_fh: u64) -> Self {
let mut cloned = self.clone();
match &mut cloned {
Self::Read { fh, .. }
| Self::Write { fh, .. }
| Self::Release { fh, .. }
| Self::Flush { fh, .. }
| Self::Fsync { fh, .. }
| Self::Releasedir { fh, .. }
| Self::Fsyncdir { fh, .. }
| Self::Fallocate { fh, .. }
| Self::Lseek { fh, .. }
| Self::Getlk { fh, .. }
| Self::Setlk { fh, .. }
| Self::Readdirplus { fh, .. } => *fh = new_fh,
Self::Setattr { fh, .. } => *fh = Some(new_fh),
// Only remap fh_in; fh_out references a different file and is NOT
// currently remapped after snapshot restore. This is a known limitation:
// copy_file_range/remap_file_range with a stale fh_out will fail.
// TODO: add fh_out()/with_fh_out() helpers for independent remapping.
Self::CopyFileRange { fh_in, .. } | Self::RemapFileRange { fh_in, .. } => {
*fh_in = new_fh;
}
_ => {}
}
cloned
}

/// Check if this is a directory handle operation.
pub fn is_dir_handle_op(&self) -> bool {
matches!(
self,
Self::Readdirplus { .. } | Self::Releasedir { .. } | Self::Fsyncdir { .. }
)
}

/// Extract the inode from this request (if present).
pub fn ino(&self) -> Option<u64> {
match self {
Self::Getattr { ino }
| Self::Readlink { ino }
| Self::Statfs { ino }
| Self::Read { ino, .. }
| Self::Write { ino, .. }
| Self::Release { ino, .. }
| Self::Flush { ino, .. }
| Self::Fsync { ino, .. }
| Self::Open { ino, .. }
| Self::Opendir { ino, .. }
| Self::Releasedir { ino, .. }
| Self::Fsyncdir { ino, .. }
| Self::Fallocate { ino, .. }
| Self::Lseek { ino, .. }
| Self::Getlk { ino, .. }
| Self::Setlk { ino, .. }
| Self::Readdirplus { ino, .. }
| Self::Access { ino, .. }
| Self::Forget { ino, .. } => Some(*ino),
Self::CopyFileRange { ino_in, .. } | Self::RemapFileRange { ino_in, .. } => {
Some(*ino_in)
}
Self::Lookup { parent, .. }
| Self::Readdir { ino: parent, .. }
| Self::Mkdir { parent, .. }
| Self::Mknod { parent, .. }
| Self::Rmdir { parent, .. }
| Self::Create { parent, .. }
| Self::Unlink { parent, .. }
| Self::Rename { parent, .. }
| Self::Symlink { parent, .. }
| Self::Link { ino: parent, .. } => Some(*parent),
Self::Setattr { ino, .. } => Some(*ino),
Self::Setxattr { ino, .. }
| Self::Getxattr { ino, .. }
| Self::Listxattr { ino, .. }
| Self::Removexattr { ino, .. } => Some(*ino),
Self::BatchForget { .. } => None,
}
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions fuse-pipe/src/protocol/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl VolumeResponse {
matches!(self, VolumeResponse::Error { .. })
}

/// Check if this is a bad file descriptor error (stale handle after restore).
pub fn is_ebadf(&self) -> bool {
self.errno() == Some(libc::EBADF)
}

/// Check if this is a success response.
pub fn is_ok(&self) -> bool {
!self.is_error()
Expand Down
11 changes: 11 additions & 0 deletions fuse-pipe/src/server/pipelined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ impl<H: FilesystemHandler + 'static> AsyncServer<H> {
}
}

/// Create a server from a pre-wrapped `Arc<H>`.
///
/// Use this when the caller needs to retain a reference to the handler
/// (e.g., to call `RemapFs::serialize_table()` while the server is running).
pub fn from_arc(handler: Arc<H>) -> Self {
Self {
handler,
config: ServerConfig::default(),
}
}

/// Serve on a Unix socket.
///
/// This function blocks forever, accepting and handling connections.
Expand Down
120 changes: 118 additions & 2 deletions fuse-pipe/src/server/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct RemapFs<T: FilesystemHandler> {
stable_to_inner: DashMap<u64, u64>,
/// stable_ino → relative path from mount root (for serialization)
paths: DashMap<u64, String>,
/// old_fh → new_fh (stale handles lazily reopened after snapshot restore)
handle_remap: DashMap<u64, u64>,
}

impl<T: FilesystemHandler> RemapFs<T> {
Expand All @@ -48,6 +50,7 @@ impl<T: FilesystemHandler> RemapFs<T> {
inner_to_stable,
stable_to_inner,
paths,
handle_remap: DashMap::new(),
}
}

Expand Down Expand Up @@ -533,8 +536,16 @@ impl<T: FilesystemHandler> RemapFs<T> {
///
/// Paths that no longer exist on the host are skipped (logged as warnings).
pub fn restore_from_table(inner: T, json: &str) -> Self {
let table: std::collections::BTreeMap<u64, String> =
serde_json::from_str(json).unwrap_or_default();
let table: std::collections::BTreeMap<u64, String> = match serde_json::from_str(json) {
Ok(t) => t,
Err(e) => {
error!(
"failed to parse inode table JSON, starting with empty table: {}",
e
);
std::collections::BTreeMap::new()
}
};

let inner_to_stable = DashMap::new();
let stable_to_inner = DashMap::new();
Expand Down Expand Up @@ -599,13 +610,83 @@ impl<T: FilesystemHandler> RemapFs<T> {
inner_to_stable,
stable_to_inner,
paths,
handle_remap: DashMap::new(),
}
}

/// Get a reference to the inner handler.
pub fn inner(&self) -> &T {
&self.inner
}

/// Lazily reopen a stale file handle after snapshot restore.
///
/// When a clone restores from snapshot, old file handles reference the previous
/// VolumeServer's handle table (now gone). This method opens the file by inode
/// on the new server and caches the mapping so subsequent operations on the same
/// stale handle work without re-opening.
fn try_reopen_handle(&self, stable_ino: u64, old_fh: u64, is_dir: bool) -> Option<u64> {
// Check if already remapped (concurrent threads may race here)
if let Some(new_fh) = self.handle_remap.get(&old_fh) {
return Some(*new_fh);
}

let inner_ino = self.to_inner(stable_ino)?;

let resp = if is_dir {
self.inner.handle_request(&VolumeRequest::Opendir {
ino: inner_ino,
flags: 0,
uid: 0,
gid: 0,
pid: 0,
})
} else {
self.inner.handle_request(&VolumeRequest::Open {
ino: inner_ino,
flags: libc::O_RDWR as u32,
uid: 0,
gid: 0,
pid: 0,
})
};

let new_fh = match &resp {
VolumeResponse::Opened { fh, .. } => *fh,
VolumeResponse::Created { fh, .. } => *fh,
_ => return None,
};

// Atomic insert — handles concurrent reopen races.
// If another thread won the race, release our fd to avoid leaking it.
use dashmap::mapref::entry::Entry;
match self.handle_remap.entry(old_fh) {
Entry::Occupied(e) => {
// Another thread already reopened this handle — release ours
let release_req = if is_dir {
VolumeRequest::Releasedir {
ino: inner_ino,
fh: new_fh,
}
} else {
VolumeRequest::Release {
ino: inner_ino,
fh: new_fh,
}
};
self.inner.handle_request(&release_req);
Some(*e.get())
}
Entry::Vacant(e) => {
debug!(
old_fh,
new_fh, stable_ino, inner_ino, is_dir, "reopened stale handle after restore"
);
e.insert(new_fh);
Some(new_fh)
}
}
}
}

impl<T: FilesystemHandler> FilesystemHandler for RemapFs<T> {
Expand All @@ -621,11 +702,46 @@ impl<T: FilesystemHandler> FilesystemHandler for RemapFs<T> {
return VolumeResponse::io_error();
}

// Translate stale file handles from snapshot restore
if let Some(old_fh) = remapped.fh() {
if let Some(new_fh) = self.handle_remap.get(&old_fh) {
remapped = remapped.with_fh(*new_fh);
}
}

// Delegate to inner handler
let response = self
.inner
.handle_request_with_groups(&remapped, supplementary_groups);

// If EBADF and this request uses a file handle, try lazy re-open
if response.is_ebadf() {
if let Some(old_fh) = request.fh() {
if let Some(stable_ino) = request.ino() {
if let Some(new_fh) =
self.try_reopen_handle(stable_ino, old_fh, request.is_dir_handle_op())
{
// Retry with the reopened handle
let retry = remapped.with_fh(new_fh);
let retry_resp = self
.inner
.handle_request_with_groups(&retry, supplementary_groups);
return self.remap_response(request, retry_resp);
}
}
}
}

// Handle Release/Releasedir: clean up handle_remap entry
if matches!(
request,
VolumeRequest::Release { .. } | VolumeRequest::Releasedir { .. }
) {
if let Some(old_fh) = request.fh() {
self.handle_remap.remove(&old_fh);
}
}

// Remap response inodes (inner → stable) and register new mappings
// Use ORIGINAL request (stable inodes) for path computation
self.remap_response(request, response)
Expand Down
6 changes: 6 additions & 0 deletions src/commands/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ pub async fn save_vm_state_with_network(
pub struct CleanupContext {
pub vm_id: String,
pub volume_server_handles: Vec<JoinHandle<()>>,
/// RemapFs references for portable volumes (for inode table serialization).
/// One entry per volume; `Some` for portable volumes, `None` for plain.
/// Dropped during cleanup — the Arc prevents the RemapFs from being freed
/// while the VolumeServer task still holds a reference.
pub remap_refs: Vec<Option<std::sync::Arc<fuse_pipe::RemapFs<fuse_pipe::PassthroughFs>>>>,
pub data_dir: PathBuf,
pub health_cancel_token: Option<tokio_util::sync::CancellationToken>,
pub health_monitor_handle: Option<JoinHandle<()>>,
Expand All @@ -583,6 +588,7 @@ pub async fn cleanup_vm(
let CleanupContext {
vm_id,
volume_server_handles,
remap_refs: _,
data_dir,
health_cancel_token,
health_monitor_handle,
Expand Down
11 changes: 7 additions & 4 deletions src/commands/podman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result<Option<VmContext>> {
})
.collect();

let volume_server_handles = spawn_volume_servers(&volume_configs, &vsock_socket_path)
let volume_servers = spawn_volume_servers(&volume_configs, &vsock_socket_path)
.await
.context("spawning VolumeServers")?;

Expand Down Expand Up @@ -854,7 +854,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result<Option<VmContext>> {
warn!("VM setup failed, cleaning up resources");

// Abort VolumeServer tasks
for handle in volume_server_handles {
for handle in volume_servers.handles {
handle.abort();
}

Expand Down Expand Up @@ -905,7 +905,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result<Option<VmContext>> {
data_dir,
vm_manager,
holder_child,
volume_server_handles,
volume_servers,
network,
network_config,
state_manager,
Expand Down Expand Up @@ -977,6 +977,7 @@ pub async fn run_vm_loop(ctx: &mut VmContext, cancel: CancellationToken) -> Resu
disk_path: &ctx.disk_path,
volume_configs: &ctx.volume_configs,
parent_snapshot_key: None, // Pre-start is the first snapshot, no parent
remap_refs: &ctx.volume_servers.remap_refs,
};
match create_snapshot_interruptible(&snap, &cancel).await {
SnapshotOutcome::Interrupted => {
Expand Down Expand Up @@ -1030,6 +1031,7 @@ pub async fn run_vm_loop(ctx: &mut VmContext, cancel: CancellationToken) -> Resu
disk_path: &ctx.disk_path,
volume_configs: &ctx.volume_configs,
parent_snapshot_key: parent_key.as_deref(),
remap_refs: &ctx.volume_servers.remap_refs,
};
tokio::select! {
outcome = create_snapshot_interruptible(&snap, &cancel) => {
Expand Down Expand Up @@ -1072,7 +1074,8 @@ pub async fn cleanup_vm_context(mut ctx: VmContext) {
super::common::cleanup_vm(
super::common::CleanupContext {
vm_id: ctx.vm_id,
volume_server_handles: ctx.volume_server_handles,
volume_server_handles: ctx.volume_servers.handles,
remap_refs: ctx.volume_servers.remap_refs,
data_dir: ctx.data_dir,
health_cancel_token: Some(ctx.health_cancel_token),
health_monitor_handle: Some(ctx.health_monitor_handle),
Expand Down
Loading