diff --git a/orb-jobs-agent/Cargo.toml b/orb-jobs-agent/Cargo.toml index 5b36833fa..42ef97399 100644 --- a/orb-jobs-agent/Cargo.toml +++ b/orb-jobs-agent/Cargo.toml @@ -64,6 +64,11 @@ features = ["client"] git = "https://github.com/worldcoin/orb-relay-messages.git" rev = "301889d17fa0c283bdbad8eb2f5659ee5effb40f" +[features] +default = [] +# Allows fsck to run on arbitrary file paths (for integration tests in containers) +integration-test = [] + [build-dependencies] orb-build-info = { workspace = true, features = ["build-script"] } diff --git a/orb-jobs-agent/src/handlers/fsck.rs b/orb-jobs-agent/src/handlers/fsck.rs index 988a5efbd..bc780f275 100644 --- a/orb-jobs-agent/src/handlers/fsck.rs +++ b/orb-jobs-agent/src/handlers/fsck.rs @@ -1,7 +1,10 @@ use crate::job_system::ctx::{Ctx, JobExecutionUpdateExt}; use color_eyre::{eyre::Context, Result}; use orb_relay_messages::jobs::v1::JobExecutionUpdate; -use tracing::info; +use tracing::{info, warn}; + +const ALLOWED_MOUNTPOINTS: &[&str] = + &["/usr/persistent", "/mnt/updates", "/mnt/scratch"]; /// command format: `fsck ${device_path}` #[tracing::instrument(skip(ctx))] @@ -15,15 +18,98 @@ pub async fn handler(ctx: Ctx) -> Result { info!("Running fsck on {} for job {}", device, ctx.execution_id()); - if let Ok(mut child) = ctx.deps().shell.exec(&["umount", device]).await { - let _ = - tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await; + let input = match validate_fsck_arg(device) { + Ok(input) => input, + Err(e) => return Ok(ctx.failure().stderr(format!("{e}"))), + }; + + // For test files, skip all mount/unmount logic and run fsck directly + #[cfg(any(test, feature = "integration-test"))] + if let FsckArg::TestFile(path) = &input { + return run_fsck_on_test_file(&ctx, path).await; + } + + let mount_target = findmnt_target_for_source(&ctx, device).await; + let mount_target_opt = mount_target.ok(); + let (fsck_target, remount_target) = + match findmnt_source_for_target(&ctx, device).await { + // User passed a mountpoint (e.g. /usr/persistent). Fsck the backing block + // device and remount the mountpoint afterwards. + Ok(source) => (source, Some(device.to_string())), + // Not a mountpoint. Fsck the provided arg (file/device), but remount if we + // discovered it's mounted somewhere. + Err(_) => (device.to_string(), mount_target_opt.clone()), + }; + + // Enforce allowlist after we learned whether this is a mountpoint or a device. + // - Mountpoints: only a small allowlist + // - Devices: only if they resolve to an allowed mountpoint + match input { + FsckArg::Mountpoint(target) => { + if !ALLOWED_MOUNTPOINTS.contains(&target.as_str()) { + return Ok(ctx.failure().stderr(format!( + "Refusing to run fsck on mountpoint {target}; allowed mountpoints: {}", + ALLOWED_MOUNTPOINTS.join(", ") + ))); + } + } + FsckArg::Device(path) => { + if let Some(target) = mount_target_opt.clone() + && ALLOWED_MOUNTPOINTS.contains(&target.as_str()) + { + // Allowed. + } else { + return Ok(ctx.failure().stderr(format!( + "Refusing to run fsck on device {path}. Please pass an allowed mountpoint instead: {}", + ALLOWED_MOUNTPOINTS.join(", ") + ))); + } + } + #[cfg(any(test, feature = "integration-test"))] + FsckArg::TestFile(_path) => unreachable!("handled above"), + } + + if let Some(target) = &remount_target { + let unmount = ctx + .deps() + .shell + .exec(&["umount", target]) + .await + .context("failed to spawn umount")? + .wait_with_output() + .await + .context("failed to wait for umount")?; + + if !unmount.status.success() { + let stdout = String::from_utf8_lossy(&unmount.stdout); + let stderr = String::from_utf8_lossy(&unmount.stderr); + let message = format!( + "Refusing to run fsck: target appears mounted at {target} and unmount failed.\n\nUMOUNT STDOUT:\n{stdout}\nUMOUNT STDERR:\n{stderr}" + ); + return Ok(ctx.failure().stderr(message)); + } + } + + // Verify filesystem type before running fsck. + let fs_type = blkid_fs_type(&ctx, &fsck_target).await; + let fs_type = match fs_type { + Ok(t) => t, + Err(e) => { + return Ok(ctx.failure().stderr(format!( + "Refusing to run fsck on {fsck_target}: could not determine filesystem type via blkid: {e}" + ))); + } + }; + if !is_allowed_fs_type(&fs_type) { + return Ok(ctx.failure().stderr(format!( + "Refusing to run fsck on {fsck_target}: filesystem type {fs_type} is not allowed" + ))); } let output = ctx .deps() .shell - .exec(&["fsck", "-y", "-f", device]) + .exec(&["fsck", "-y", "-f", &fsck_target]) .await .context("failed to spawn fsck")? .wait_with_output() @@ -32,13 +118,205 @@ pub async fn handler(ctx: Ctx) -> Result { let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); - let message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}"); + let fsck_message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}"); + + let mut remount_message = String::new(); + let mut remount_ok = true; + if let Some(target) = &remount_target { + // Best-effort remount: prefer fstab-based `mount `, then fall back to + // `mount `. + let mount1 = ctx + .deps() + .shell + .exec(&["mount", target]) + .await + .context("failed to spawn mount")? + .wait_with_output() + .await + .context("failed to wait for mount")?; + + let mut ok = mount1.status.success(); + if !ok { + let mount2 = ctx + .deps() + .shell + .exec(&["mount", &fsck_target, target]) + .await + .context("failed to spawn mount (fallback)")? + .wait_with_output() + .await + .context("failed to wait for mount (fallback)")?; + ok = mount2.status.success(); + } + remount_ok = ok; + + // Even if remount fails, surface that in job output + match findmnt_source_for_target(&ctx, target).await { + Ok(source) => { + remount_message = format!("\n\nRemount: OK ({target} -> {source})"); + } + Err(e) => { + warn!("failed to confirm remount of {target}: {e:?}"); + remount_message = format!("\n\nRemount: FAILED ({target})"); + } + } + } + + let message = format!("{fsck_message}{remount_message}"); // fsck exit codes: // 0 - No errors // 1 - File system errors corrected // 2 - System should be rebooted // ... + if let Some(code) = output.status.code() + && (code == 0 || code == 1) + { + if remount_ok { + return Ok(ctx.success().stdout(message)); + } + + return Ok(ctx.failure().stdout(message).stderr("Remount failed")); + } + + Ok(ctx.failure().stdout(message)) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum FsckArg { + Mountpoint(String), + Device(String), + #[cfg(any(test, feature = "integration-test"))] + TestFile(String), +} + +fn validate_fsck_arg(arg: &str) -> Result { + if ALLOWED_MOUNTPOINTS.contains(&arg) { + return Ok(FsckArg::Mountpoint(arg.to_string())); + } + + if arg.starts_with("/dev/") { + return Ok(FsckArg::Device(arg.to_string())); + } + + #[cfg(any(test, feature = "integration-test"))] + { + Ok(FsckArg::TestFile(arg.to_string())) + } + + #[cfg(not(any(test, feature = "integration-test")))] + { + Err(color_eyre::eyre::eyre!( + "Refusing to run fsck on {arg}; allowed mountpoints: {}", + ALLOWED_MOUNTPOINTS.join(", ") + )) + } +} + +fn is_allowed_fs_type(fs_type: &str) -> bool { + matches!(fs_type, "ext2" | "ext3" | "ext4" | "f2fs") +} + +async fn blkid_fs_type(ctx: &Ctx, device: &str) -> Result { + let output = ctx + .deps() + .shell + .exec(&["blkid", "-o", "value", "-s", "TYPE", device]) + .await + .context("failed to spawn blkid")? + .wait_with_output() + .await + .context("failed to wait for blkid")?; + + if !output.status.success() { + return Err(color_eyre::eyre::eyre!( + "blkid failed with status {}", + output.status + )); + } + + let t = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if t.is_empty() { + return Err(color_eyre::eyre::eyre!("blkid returned empty TYPE")); + } + + Ok(t) +} + +async fn findmnt_source_for_target(ctx: &Ctx, target: &str) -> Result { + let output = ctx + .deps() + .shell + .exec(&["findmnt", "-n", "-o", "SOURCE", "--target", target]) + .await + .context("failed to spawn findmnt")? + .wait_with_output() + .await + .context("failed to wait for findmnt")?; + + if !output.status.success() { + return Err(color_eyre::eyre::eyre!( + "findmnt failed with status {}", + output.status + )); + } + + let source = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if source.is_empty() { + return Err(color_eyre::eyre::eyre!("findmnt returned empty SOURCE")); + } + + Ok(source) +} + +async fn findmnt_target_for_source(ctx: &Ctx, source: &str) -> Result { + let output = ctx + .deps() + .shell + .exec(&["findmnt", "-n", "-o", "TARGET", "--source", source]) + .await + .context("failed to spawn findmnt")? + .wait_with_output() + .await + .context("failed to wait for findmnt")?; + + if !output.status.success() { + return Err(color_eyre::eyre::eyre!( + "findmnt failed with status {}", + output.status + )); + } + + let target = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if target.is_empty() { + return Err(color_eyre::eyre::eyre!("findmnt returned empty TARGET")); + } + + Ok(target) +} + +#[cfg(any(test, feature = "integration-test"))] +async fn run_fsck_on_test_file( + ctx: &Ctx, + path: &str, +) -> Result { + use crate::job_system::ctx::JobExecutionUpdateExt; + + let output = ctx + .deps() + .shell + .exec(&["fsck", "-y", "-f", path]) + .await + .context("failed to spawn fsck")? + .wait_with_output() + .await + .context("failed to wait for fsck")?; + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}"); + + // fsck exit codes: 0 = no errors, 1 = errors corrected if let Some(code) = output.status.code() && (code == 0 || code == 1) { diff --git a/orb-jobs-agent/tests/fsck.rs b/orb-jobs-agent/tests/fsck.rs index f3f506a22..e9955c8e7 100644 --- a/orb-jobs-agent/tests/fsck.rs +++ b/orb-jobs-agent/tests/fsck.rs @@ -2,6 +2,7 @@ use color_eyre::Result; use common::{fake_orb::FakeOrb, fixture::JobAgentFixture}; use orb_jobs_agent::shell::Shell; use orb_relay_messages::jobs::v1::JobExecutionStatus; +use std::sync::{Arc, Mutex}; mod common; @@ -114,7 +115,7 @@ async fn fsck_real_corrupted_image() { .wait_for_completion() .await; - // 6. Verify result + // Verify result let jobs = fx.execution_updates.read().await; let result = jobs.last().unwrap(); @@ -151,3 +152,112 @@ async fn fsck_fails_missing_arg_unit() { assert_eq!(result.status, JobExecutionStatus::Failed as i32); assert!(result.std_err.contains("Missing device argument")); } + +#[tokio::test] +async fn fsck_remounts_mountpoint_unit() { + #[derive(Clone, Debug)] + struct RecordingShell { + calls: Arc>>>, + } + + impl RecordingShell { + fn new() -> Self { + Self { + calls: Arc::new(Mutex::new(Vec::new())), + } + } + } + + #[async_trait::async_trait] + impl Shell for RecordingShell { + async fn exec(&self, cmd: &[&str]) -> Result { + self.calls + .lock() + .unwrap() + .push(cmd.iter().map(|s| s.to_string()).collect::>()); + + let mut c = tokio::process::Command::new("sh"); + c.arg("-c"); + + match cmd { + // findmnt -n -o TARGET --source /usr/persistent -> fail (not a SOURCE) + ["findmnt", "-n", "-o", "TARGET", "--source", "/usr/persistent"] => { + c.arg("exit 1"); + } + // findmnt -n -o SOURCE --target /usr/persistent -> /dev/loop0 + ["findmnt", "-n", "-o", "SOURCE", "--target", "/usr/persistent"] => { + c.arg("printf '/dev/loop0\\n'"); + } + // blkid -o value -s TYPE /dev/loop0 -> ext4 + ["blkid", "-o", "value", "-s", "TYPE", "/dev/loop0"] => { + c.arg("printf 'ext4\\n'"); + } + // umount /usr/persistent -> ok + ["umount", "/usr/persistent"] => { + c.arg("exit 0"); + } + // fsck -y -f /dev/loop0 -> ok with some output + ["fsck", "-y", "-f", "/dev/loop0"] => { + c.arg("echo 'clean'; exit 0"); + } + // mount /usr/persistent -> ok + ["mount", "/usr/persistent"] => { + c.arg("exit 0"); + } + // default: succeed + _ => { + c.arg("exit 0"); + } + } + + Ok(c.stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?) + } + } + + let shell = RecordingShell::new(); + let calls = shell.calls.clone(); + + let fx = JobAgentFixture::new().await; + fx.program().shell(shell).spawn().await; + + fx.enqueue_job("fsck /usr/persistent") + .await + .wait_for_completion() + .await; + + let jobs = fx.execution_updates.read().await; + let result = jobs.last().unwrap(); + assert_eq!( + result.status, + JobExecutionStatus::Succeeded as i32, + "expected fsck job to succeed; stdout: {} stderr: {}", + result.std_out, + result.std_err + ); + + let calls = calls.lock().unwrap(); + let called = calls + .iter() + .map(|v| v.join(" ")) + .collect::>() + .join("\n"); + + assert!( + called.contains("findmnt -n -o SOURCE --target /usr/persistent"), + "expected mountpoint->source resolution via findmnt. got:\n{called}" + ); + assert!( + called.contains("umount /usr/persistent"), + "expected unmount of mountpoint. got:\n{called}" + ); + assert!( + called.contains("fsck -y -f /dev/loop0"), + "expected fsck on SOURCE, not on mountpoint. got:\n{called}" + ); + assert!( + called.contains("mount /usr/persistent"), + "expected remount of mountpoint. got:\n{called}" + ); +}