Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orb-jobs-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ features = ["client"]
git = "https://github.com/worldcoin/orb-relay-messages.git"
rev = "301889d17fa0c283bdbad8eb2f5659ee5effb40f"

[features]
default = []
# Allows fsck to run on arbitrary file paths (for integration tests in containers)
integration-test = []

[build-dependencies]
orb-build-info = { workspace = true, features = ["build-script"] }

Expand Down
290 changes: 284 additions & 6 deletions orb-jobs-agent/src/handlers/fsck.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::job_system::ctx::{Ctx, JobExecutionUpdateExt};
use color_eyre::{eyre::Context, Result};
use orb_relay_messages::jobs::v1::JobExecutionUpdate;
use tracing::info;
use tracing::{info, warn};

const ALLOWED_MOUNTPOINTS: &[&str] =
&["/usr/persistent", "/mnt/updates", "/mnt/scratch"];

/// command format: `fsck ${device_path}`
#[tracing::instrument(skip(ctx))]
Expand All @@ -15,15 +18,98 @@ pub async fn handler(ctx: Ctx) -> Result<JobExecutionUpdate> {

info!("Running fsck on {} for job {}", device, ctx.execution_id());

if let Ok(mut child) = ctx.deps().shell.exec(&["umount", device]).await {
let _ =
tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
let input = match validate_fsck_arg(device) {
Ok(input) => input,
Err(e) => return Ok(ctx.failure().stderr(format!("{e}"))),
};

// For test files, skip all mount/unmount logic and run fsck directly
#[cfg(any(test, feature = "integration-test"))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clanker poopo

if let FsckArg::TestFile(path) = &input {
return run_fsck_on_test_file(&ctx, path).await;
}

let mount_target = findmnt_target_for_source(&ctx, device).await;
let mount_target_opt = mount_target.ok();
let (fsck_target, remount_target) =
match findmnt_source_for_target(&ctx, device).await {
// User passed a mountpoint (e.g. /usr/persistent). Fsck the backing block
// device and remount the mountpoint afterwards.
Ok(source) => (source, Some(device.to_string())),
// Not a mountpoint. Fsck the provided arg (file/device), but remount if we
// discovered it's mounted somewhere.
Err(_) => (device.to_string(), mount_target_opt.clone()),
};

// Enforce allowlist after we learned whether this is a mountpoint or a device.
// - Mountpoints: only a small allowlist
// - Devices: only if they resolve to an allowed mountpoint
match input {
FsckArg::Mountpoint(target) => {
if !ALLOWED_MOUNTPOINTS.contains(&target.as_str()) {
return Ok(ctx.failure().stderr(format!(
"Refusing to run fsck on mountpoint {target}; allowed mountpoints: {}",
ALLOWED_MOUNTPOINTS.join(", ")
)));
}
}
FsckArg::Device(path) => {
if let Some(target) = mount_target_opt.clone()
&& ALLOWED_MOUNTPOINTS.contains(&target.as_str())
{
// Allowed.
} else {
return Ok(ctx.failure().stderr(format!(
"Refusing to run fsck on device {path}. Please pass an allowed mountpoint instead: {}",
ALLOWED_MOUNTPOINTS.join(", ")
)));
}
}
#[cfg(any(test, feature = "integration-test"))]
FsckArg::TestFile(_path) => unreachable!("handled above"),
}

if let Some(target) = &remount_target {
let unmount = ctx
.deps()
.shell
.exec(&["umount", target])
.await
.context("failed to spawn umount")?
.wait_with_output()
.await
.context("failed to wait for umount")?;

if !unmount.status.success() {
let stdout = String::from_utf8_lossy(&unmount.stdout);
let stderr = String::from_utf8_lossy(&unmount.stderr);
let message = format!(
"Refusing to run fsck: target appears mounted at {target} and unmount failed.\n\nUMOUNT STDOUT:\n{stdout}\nUMOUNT STDERR:\n{stderr}"
);
return Ok(ctx.failure().stderr(message));
}
}

// Verify filesystem type before running fsck.
let fs_type = blkid_fs_type(&ctx, &fsck_target).await;
let fs_type = match fs_type {
Ok(t) => t,
Err(e) => {
return Ok(ctx.failure().stderr(format!(
"Refusing to run fsck on {fsck_target}: could not determine filesystem type via blkid: {e}"
)));
}
};
if !is_allowed_fs_type(&fs_type) {
return Ok(ctx.failure().stderr(format!(
"Refusing to run fsck on {fsck_target}: filesystem type {fs_type} is not allowed"
)));
}

let output = ctx
.deps()
.shell
.exec(&["fsck", "-y", "-f", device])
.exec(&["fsck", "-y", "-f", &fsck_target])
.await
.context("failed to spawn fsck")?
.wait_with_output()
Expand All @@ -32,13 +118,205 @@ pub async fn handler(ctx: Ctx) -> Result<JobExecutionUpdate> {

let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}");
let fsck_message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}");

let mut remount_message = String::new();
let mut remount_ok = true;
if let Some(target) = &remount_target {
// Best-effort remount: prefer fstab-based `mount <target>`, then fall back to
// `mount <source> <target>`.
let mount1 = ctx
.deps()
.shell
.exec(&["mount", target])
.await
.context("failed to spawn mount")?
.wait_with_output()
.await
.context("failed to wait for mount")?;

let mut ok = mount1.status.success();
if !ok {
let mount2 = ctx
.deps()
.shell
.exec(&["mount", &fsck_target, target])
.await
.context("failed to spawn mount (fallback)")?
.wait_with_output()
.await
.context("failed to wait for mount (fallback)")?;
ok = mount2.status.success();
}
remount_ok = ok;

// Even if remount fails, surface that in job output
match findmnt_source_for_target(&ctx, target).await {
Ok(source) => {
remount_message = format!("\n\nRemount: OK ({target} -> {source})");
}
Err(e) => {
warn!("failed to confirm remount of {target}: {e:?}");
remount_message = format!("\n\nRemount: FAILED ({target})");
}
}
}

let message = format!("{fsck_message}{remount_message}");

// fsck exit codes:
// 0 - No errors
// 1 - File system errors corrected
// 2 - System should be rebooted
// ...
if let Some(code) = output.status.code()
&& (code == 0 || code == 1)
{
if remount_ok {
return Ok(ctx.success().stdout(message));
}

return Ok(ctx.failure().stdout(message).stderr("Remount failed"));
}

Ok(ctx.failure().stdout(message))
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum FsckArg {
Mountpoint(String),
Device(String),
#[cfg(any(test, feature = "integration-test"))]
TestFile(String),
}

fn validate_fsck_arg(arg: &str) -> Result<FsckArg> {
if ALLOWED_MOUNTPOINTS.contains(&arg) {
return Ok(FsckArg::Mountpoint(arg.to_string()));
}

if arg.starts_with("/dev/") {
return Ok(FsckArg::Device(arg.to_string()));
}

#[cfg(any(test, feature = "integration-test"))]
{
Ok(FsckArg::TestFile(arg.to_string()))
}

#[cfg(not(any(test, feature = "integration-test")))]
{
Err(color_eyre::eyre::eyre!(
"Refusing to run fsck on {arg}; allowed mountpoints: {}",
ALLOWED_MOUNTPOINTS.join(", ")
))
}
}

fn is_allowed_fs_type(fs_type: &str) -> bool {
matches!(fs_type, "ext2" | "ext3" | "ext4" | "f2fs")
}

async fn blkid_fs_type(ctx: &Ctx, device: &str) -> Result<String> {
let output = ctx
.deps()
.shell
.exec(&["blkid", "-o", "value", "-s", "TYPE", device])
.await
.context("failed to spawn blkid")?
.wait_with_output()
.await
.context("failed to wait for blkid")?;

if !output.status.success() {
return Err(color_eyre::eyre::eyre!(
"blkid failed with status {}",
output.status
));
}

let t = String::from_utf8_lossy(&output.stdout).trim().to_string();
if t.is_empty() {
return Err(color_eyre::eyre::eyre!("blkid returned empty TYPE"));
}

Ok(t)
}

async fn findmnt_source_for_target(ctx: &Ctx, target: &str) -> Result<String> {
let output = ctx
.deps()
.shell
.exec(&["findmnt", "-n", "-o", "SOURCE", "--target", target])
.await
.context("failed to spawn findmnt")?
.wait_with_output()
.await
.context("failed to wait for findmnt")?;

if !output.status.success() {
return Err(color_eyre::eyre::eyre!(
"findmnt failed with status {}",
output.status
));
}

let source = String::from_utf8_lossy(&output.stdout).trim().to_string();
if source.is_empty() {
return Err(color_eyre::eyre::eyre!("findmnt returned empty SOURCE"));
}

Ok(source)
}

async fn findmnt_target_for_source(ctx: &Ctx, source: &str) -> Result<String> {
let output = ctx
.deps()
.shell
.exec(&["findmnt", "-n", "-o", "TARGET", "--source", source])
.await
.context("failed to spawn findmnt")?
.wait_with_output()
.await
.context("failed to wait for findmnt")?;

if !output.status.success() {
return Err(color_eyre::eyre::eyre!(
"findmnt failed with status {}",
output.status
));
}

let target = String::from_utf8_lossy(&output.stdout).trim().to_string();
if target.is_empty() {
return Err(color_eyre::eyre::eyre!("findmnt returned empty TARGET"));
}

Ok(target)
}

#[cfg(any(test, feature = "integration-test"))]
async fn run_fsck_on_test_file(
ctx: &Ctx,
path: &str,
) -> Result<orb_relay_messages::jobs::v1::JobExecutionUpdate> {
use crate::job_system::ctx::JobExecutionUpdateExt;

let output = ctx
.deps()
.shell
.exec(&["fsck", "-y", "-f", path])
.await
.context("failed to spawn fsck")?
.wait_with_output()
.await
.context("failed to wait for fsck")?;

let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let message = format!("STDOUT:\n{stdout}\nSTDERR:\n{stderr}");

// fsck exit codes: 0 = no errors, 1 = errors corrected
if let Some(code) = output.status.code()
&& (code == 0 || code == 1)
{
Expand Down
Loading