diff --git a/Cargo.lock b/Cargo.lock index ce779a4..3f2f7c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -177,9 +177,9 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "blkpg" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21aa4a3bddef07013ae260fc8c7188f84bdbf14b166dab645390c10214f8fff6" +checksum = "2805ecc65cf36103b63db643d30171de035761a9aa0b3a982b753f67dfa6496f" dependencies = [ "rustix", ] @@ -393,7 +393,7 @@ dependencies = [ [[package]] name = "easyto-init" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "base64", @@ -413,7 +413,6 @@ dependencies = [ "serde_yml", "signal-hook", "simple_logger", - "ureq", ] [[package]] @@ -430,19 +429,19 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "fastrand" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "flate2" @@ -462,9 +461,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ "percent-encoding", ] @@ -511,7 +510,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.7+wasi-0.2.4", ] [[package]] @@ -626,9 +637,9 @@ checksum = "4240c3e8edeb2b2d0c939d6bf4bb09683eb40fb69ccfba11909b623e8dc16ec0" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "libyml" @@ -638,9 +649,9 @@ checksum = "64804cc6a5042d4f05379909ba25b503ec04e2c082151d62122d5dcaa274b961" [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "log" @@ -656,14 +667,15 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "minaws" -version = "0.1.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b341908e5aea14b5af3f5dd4b5a862facccb5409f2c9d20bdc606e475feee8b9" +checksum = "7b123d65d6982687151ddc170249e3aca8a16877ac6d1a6f16d0c234f6364d51" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-runtime-api", "chrono", + "form_urlencoded", "once_cell", "serde", "serde-xml-rs", @@ -725,9 +737,9 @@ dependencies = [ [[package]] name = "nvme-amz" -version = "0.2.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01f09775dbc0b567e6cd35ce486577c7f7cfa04567ab1004c8f70c35d1e1ee65" +checksum = "e86f8fc89295a1f669fd46e10e4fc34963875af2404b8ff2a717e4515c5834ff" dependencies = [ "rustix", ] @@ -805,6 +817,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "ring" version = "0.17.8" @@ -813,7 +831,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -828,15 +846,15 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.36" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f55e80d50763938498dd5ebb18647174e0c76dc38c5505294bb224624f30f36" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1039,12 +1057,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.12.0" +version = "3.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" dependencies = [ - "cfg-if", "fastrand", + "getrandom 0.3.3", "once_cell", "rustix", "windows-sys 0.59.0", @@ -1225,7 +1243,7 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ - "getrandom", + "getrandom 0.2.15", ] [[package]] @@ -1246,6 +1264,24 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.7+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" +dependencies = [ + "wasip2", +] + +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.5" @@ -1403,6 +1439,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + [[package]] name = "xml-rs" version = "0.8.22" diff --git a/Cargo.toml b/Cargo.toml index 4b1c61e..cf409b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,26 +1,25 @@ [package] name = "easyto-init" -version = "0.1.0" +version = "0.2.0" edition = "2021" [dependencies] anyhow = "1.0.87" base64 = "0.22.1" -blkpg = "0.1.1" +blkpg = "0.2.0" chrono = { default-features = false, version = "0.4.38", features = ["serde", "std"] } crossbeam = "0.8.4" gpt = "4.0.0" log = "0.4.22" -nvme-amz = { version = "0.2.0", features = ["ioctl-rustix"] } -rustix = { default-features = false, version = "0.38.34", features = ["fs", "process", "mount", "runtime", "system", "thread"] } +nvme-amz = { version = "0.4.0", features = ["ioctl-rustix"] } +rustix = { default-features = false, version = "1.1.2", features = ["fs", "process", "mount", "net", "runtime", "system", "thread"] } serde = { default-features = false, version = "1.0.205" } serde_json = { default-features = false, version = "1.0.122" } serde-xml-rs = "0.6.0" serde_yml = "0.0.11" signal-hook = "0.3.17" simple_logger = { default-features = false, version = "5.0.0", features = ["timestamps"] } -ureq = "2.10.1" -minaws = { version = "0.1.0" } +minaws = { version = "0.2.2" } k8s-expand = { version = "0.1.0" } [dev-dependencies] diff --git a/src/aws/asm.rs b/src/aws/asm.rs index f2b7fe8..e40e9ad 100644 --- a/src/aws/asm.rs +++ b/src/aws/asm.rs @@ -14,9 +14,9 @@ pub struct AsmClient { } impl AsmClient { - pub fn new(credentials: Credentials, region: &str) -> Result { + pub fn new(credentials: Credentials, region: &str) -> Self { let api = secretsmanager::Api::new(region, credentials); - Ok(Self { api: api.into() }) + Self { api: api.into() } } pub fn from_imds(imds: &Imds, region: &str) -> Result { diff --git a/src/aws/ec2.rs b/src/aws/ec2.rs new file mode 100644 index 0000000..a41af61 --- /dev/null +++ b/src/aws/ec2.rs @@ -0,0 +1,107 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::{anyhow, Result}; +use crossbeam::utils::Backoff; +use log::debug; +use minaws::{ + ec2::{self, AttachVolumeInput, Filter}, + imds::{Credentials, Imds}, +}; + +use crate::vmspec::EbsVolumeAttachment; + +#[derive(Debug)] +pub struct Ec2Client { + api: Arc, +} + +impl Ec2Client { + pub fn new(credentials: Credentials, region: &str) -> Self { + let api = ec2::Api::new(region, credentials); + Self { api: api.into() } + } + + pub fn from_imds(imds: &Imds, region: &str) -> Result { + let credentials = imds.get_credentials()?; + let api = ec2::Api::new(region, credentials); + Ok(Self { api: api.into() }) + } + + pub fn ensure_ebs_volume_attached( + &self, + attachment: &EbsVolumeAttachment, + device: &str, + availability_zone: &str, + instance_id: &str, + ) -> Result<()> { + let volume_id = self.wait_for_ebs_volume(attachment, availability_zone)?; + self.api + .attach_volume(AttachVolumeInput { + device: device.into(), + instance_id: instance_id.into(), + volume_id: volume_id.clone(), + }) + .map_err(|e| anyhow!("unable to attach EBS volume: {}", e))?; + Ok(()) + } + + fn wait_for_ebs_volume( + &self, + attachment: &EbsVolumeAttachment, + availability_zone: &str, + ) -> Result { + let mut filters: Vec = vec![ + Filter { + name: "status".into(), + values: vec!["available".into()], + }, + Filter { + name: "availability-zone".into(), + values: vec![availability_zone.into()], + }, + ]; + for tag in &attachment.tags { + if tag.value.is_none() { + filters.push(Filter { + name: "tag-key".into(), + values: vec![tag.key.clone()], + }); + } else { + filters.push(Filter { + name: format!("tag:{}", tag.key.clone()), + values: vec![tag.value.clone().unwrap()], + }); + } + } + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(attachment.timeout.unwrap_or(300)); + let backoff = Backoff::new(); + loop { + let result = self.api.describe_volumes(ec2::DescribeVolumesInput { + filters: Some(filters.clone()), + ..Default::default() + }); + match result { + Err(e) => debug!("error describing EBS volumes: {}", e), + Ok(vol_out) => { + if let Some(ref volume_set) = vol_out.volumes { + if let Some(items) = &volume_set.items { + if let Some(volume) = items.first() { + if let Some(volume_id) = &volume.volume_id { + debug!("found matching EBS volume: {:?}", volume); + return Ok(volume_id.clone()); + } + } + } + } + debug!("no EBS volume found matching filters: {:?}", filters); + } + } + if start.elapsed() > timeout { + return Err(anyhow!("timeout waiting for EBS volume to be available")); + } + debug!("waiting for EBS volume to be available"); + backoff.snooze(); + } + } +} diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 283468e..5c128b0 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -1,3 +1,4 @@ pub mod asm; +pub mod ec2; pub mod s3; pub mod ssm; diff --git a/src/aws/s3.rs b/src/aws/s3.rs index 3aeeb78..acf8d0f 100644 --- a/src/aws/s3.rs +++ b/src/aws/s3.rs @@ -18,9 +18,9 @@ pub struct S3Client { } impl S3Client { - pub fn new(credentials: Credentials, region: &str) -> Result { + pub fn new(credentials: Credentials, region: &str) -> Self { let api = s3::Api::new(region, credentials); - Ok(Self { api: api.into() }) + Self { api: api.into() } } pub fn from_imds(imds: &Imds, region: &str) -> Result { @@ -146,10 +146,7 @@ impl Read for S3Object { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.download().map_err(|e| { let s3_url = format!("s3://{}/{}", self.bucket, self.key); - io::Error::new( - io::ErrorKind::Other, - format!("unable to download S3 object {}: {}", s3_url, e), - ) + io::Error::other(format!("unable to download S3 object {}: {}", s3_url, e)) })?; debug!("reading from S3 object s3://{}/{}", self.bucket, self.key); self.object.as_mut().unwrap().body.read(buf) diff --git a/src/aws/ssm.rs b/src/aws/ssm.rs index fdfc7c3..4758b69 100644 --- a/src/aws/ssm.rs +++ b/src/aws/ssm.rs @@ -14,9 +14,9 @@ pub struct SsmClient { } impl SsmClient { - pub fn new(credentials: Credentials, region: &str) -> Result { + pub fn new(credentials: Credentials, region: &str) -> Self { let api = ssm::Api::new(region, credentials); - Ok(Self { api: api.into() }) + Self { api: api.into() } } pub fn from_imds(imds: &Imds, region: &str) -> Result { diff --git a/src/fs.rs b/src/fs.rs index 8861f91..b8dd380 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,4 +1,5 @@ use std::{ + ffi::CString, fs::create_dir, path::{Path, PathBuf, MAIN_SEPARATOR_STR}, }; @@ -6,8 +7,8 @@ use std::{ use anyhow::{anyhow, Result}; use log::debug; use rustix::{ - fs::{chmod, chown, Gid, Mode, MountFlags, Uid}, - mount::mount, + fs::{chmod, chown, Gid, Mode, Uid}, + mount::{mount, MountFlags}, }; #[derive(Debug)] @@ -30,14 +31,10 @@ impl<'a> Mount<'a> { pub fn execute(&self) -> Result<()> { let path = Path::new(&self.target); mkdir_p(path, self.mode)?; - mount( - self.source, - path, - self.fs_type, - self.flags, - self.options.unwrap_or_default(), - ) - .map_err(|e| anyhow!("unable to mount {} on {:?}: {}", self.source, path, e))?; + let options_cstring = self.options.map(|s| CString::new(s).unwrap()); + let options_cstr = options_cstring.as_deref(); + mount(self.source, path, self.fs_type, self.flags, options_cstr) + .map_err(|e| anyhow!("unable to mount {} on {:?}: {}", self.source, path, e))?; Ok(()) } } diff --git a/src/init.rs b/src/init.rs index a56bedd..a39d10a 100644 --- a/src/init.rs +++ b/src/init.rs @@ -12,22 +12,25 @@ use anyhow::{anyhow, Result}; use base64::prelude::*; use crossbeam::channel::{bounded, Select}; use crossbeam::sync::WaitGroup; +use crossbeam::utils::Backoff; use k8s_expand::{expand, mapping_func_for}; use log::{debug, error, info, Level}; use minaws::imds::{Credentials, Imds}; -use rustix::fs::{chown, remount, stat, symlink, unmount, Gid, Mode, Uid, UnmountFlags}; +use rustix::fs::{chown, stat, symlink, Gid, Mode, Uid}; use rustix::io::Errno; -use rustix::mount::{mount, MountFlags}; +use rustix::mount::{mount, mount_remount, unmount, MountFlags, UnmountFlags}; use rustix::process::{chdir, umask}; use rustix::runtime::execve; use rustix::thread::{set_thread_gid, set_thread_uid}; use crate::aws::asm::AsmClient; +use crate::aws::ec2::Ec2Client; use crate::aws::s3::S3Client; use crate::aws::ssm::SsmClient; use crate::fs::{mkdir_p, Link, Mount}; use crate::service::Supervisor; use crate::system::{device_has_fs, link_nvme_devices, resize_root_volume}; +use crate::uevent::start_uevent_listener; use crate::vmspec::{ EbsVolumeSource, EnvFromSources, ImdsEnvSource, NameValue, NameValues, NameValuesExt, S3EnvSource, S3VolumeSource, SecretsManagerEnvSource, SecretsManagerVolumeSource, SsmEnvSource, @@ -53,6 +56,10 @@ pub fn initialize() -> Result<()> { base_mounts()?; base_links()?; + + // Start listener to link newly attached NVMe devices. + start_uevent_listener()?; + // Run initial scan and link of existing NVMe devices. link_nvme_devices()?; let config_file_path = Path::new(constants::DIR_ET).join(constants::FILE_METADATA); @@ -79,34 +86,28 @@ pub fn initialize() -> Result<()> { let credentials = imds_client .get_credentials() .map_err(|e| anyhow!("unable to get AWS credentials from IMDS: {}", e))?; + + let ec2_client = try_get_ec2_client(&vmspec, credentials.clone(), &aws_region); + let s3_client = try_get_s3_client(&vmspec, credentials.clone(), &aws_region); + let asm_client = try_get_asm_client(&vmspec, credentials.clone(), &aws_region); + let ssm_client = try_get_ssm_client(&vmspec, credentials.clone(), &aws_region); for volume in &vmspec.volumes { debug!("Processing volume {:?}", volume); if let Some(source) = &volume.ebs { - handle_volume_ebs(source)?; + handle_volume_ebs(ec2_client.as_ref().unwrap(), &imds_client, source)?; } if let Some(source) = &volume.s3 { - handle_volume_s3( - Path::new(base_dir), - source, - credentials.clone(), - &aws_region, - )?; + handle_volume_s3(s3_client.as_ref().unwrap(), Path::new(base_dir), source)?; } if let Some(source) = &volume.secrets_manager { handle_volume_secretsmanager( + asm_client.as_ref().unwrap(), Path::new(base_dir), source, - credentials.clone(), - &aws_region, )?; } if let Some(source) = &volume.ssm { - handle_volume_ssm( - Path::new(base_dir), - source, - credentials.clone(), - &aws_region, - )?; + handle_volume_ssm(ssm_client.as_ref().unwrap(), Path::new(base_dir), source)?; } } @@ -139,6 +140,70 @@ pub fn initialize() -> Result<()> { Ok(()) } +fn try_get_ec2_client( + vmspec: &VmSpec, + credentials: Credentials, + aws_region: &str, +) -> Option { + let ebs_count = vmspec + .volumes + .iter() + .filter(|vol| vol.ebs.is_some()) + .count(); + if ebs_count > 0 { + Some(Ec2Client::new(credentials.clone(), aws_region)) + } else { + None + } +} + +fn try_get_s3_client( + vmspec: &VmSpec, + credentials: Credentials, + aws_region: &str, +) -> Option { + let s3_count = vmspec.volumes.iter().filter(|vol| vol.s3.is_some()).count(); + if s3_count > 0 { + Some(S3Client::new(credentials.clone(), aws_region)) + } else { + None + } +} + +fn try_get_asm_client( + vmspec: &VmSpec, + credentials: Credentials, + aws_region: &str, +) -> Option { + let asm_count = vmspec + .volumes + .iter() + .filter(|vol| vol.secrets_manager.is_some()) + .count(); + if asm_count > 0 { + Some(AsmClient::new(credentials.clone(), aws_region)) + } else { + None + } +} + +fn try_get_ssm_client( + vmspec: &VmSpec, + credentials: Credentials, + aws_region: &str, +) -> Option { + let ssm_count = vmspec + .volumes + .iter() + .filter(|vol| vol.ssm.is_some()) + .count(); + if ssm_count > 0 { + Some(SsmClient::new(credentials.clone(), aws_region)) + } else { + None + } +} + fn base_links() -> Result<()> { let ls = vec![ Link { @@ -278,66 +343,113 @@ fn parse_mode(mode: &str) -> Result { Ok(Mode::from(m)) } -fn handle_volume_ebs(volume: &EbsVolumeSource) -> Result<()> { +fn wait_for_device(device: &str, timeout: Duration) -> Result<()> { + let start = std::time::Instant::now(); + let path = Path::new(device); + let backoff = Backoff::new(); + loop { + match path.try_exists() { + Ok(true) => break, + _ => backoff.snooze(), + } + if start.elapsed() > timeout { + return Err(anyhow!("timeout waiting for device {} to exist", device)); + } + } + Ok(()) +} + +fn handle_volume_ebs( + ec2_client: &Ec2Client, + imds_client: &Imds, + volume: &EbsVolumeSource, +) -> Result<()> { info!("Handling volume {:?}", volume); if volume.device.is_empty() { - return Err(anyhow!("volume must have a device")); + return Err(anyhow!("EBS volume must have a device")); } - if volume.fs_type.is_none() { - return Err(anyhow!("volume must have a filesystem type")); + if let Some(mnt) = &volume.mount { + if mnt.destination.is_empty() { + return Err(anyhow!("EBS volume mount must have a destination")); + } + if mnt + .fs_type + .as_ref() + .is_none_or(|fs_type| fs_type.is_empty()) + { + return Err(anyhow!("EBS volume mount must have a filesystem type")); + } + } + + if let Some(ref attachment) = volume.attachment { + let az_path = Path::new("placement/availability-zone"); + let availability_zone = imds_client.get_metadata(az_path)?; + let id_path = Path::new("instance-id"); + let instance_id = imds_client.get_metadata(id_path)?; + ec2_client + .ensure_ebs_volume_attached( + attachment, + &volume.device, + &availability_zone, + &instance_id, + ) + .map_err(|e| { + anyhow!( + "unable to ensure EBS volume {} is attached: {}", + &volume.device, + e + ) + })?; + info!("EBS volume {} is attached", &volume.device); + // Wait for uevent listener to create the device link. + wait_for_device( + &volume.device, + Duration::from_secs(attachment.timeout.unwrap_or(300)), + )?; + info!("EBS volume device {} is available", &volume.device); } - if volume.mount.destination.is_empty() { - return Err(anyhow!("volume must have a mount point")); + if volume.mount.is_none() { + return Ok(()); } - let mode = parse_mode(volume.mount.mode.as_ref().unwrap())?; + let mnt = volume.mount.as_ref().unwrap(); + + let mode = parse_mode(mnt.mode.as_ref().unwrap())?; debug!("Parsed mode, before: {:?}, after: {:?}", volume, mode); - mkdir_p(&volume.mount.destination, mode)?; - debug!("Created mount point {:?}", volume.mount.destination); + mkdir_p(&mnt.destination, mode)?; + debug!("Created mount point {:?}", mnt.destination); - let (owner, group) = unsafe { - ( - volume.mount.user_id.map(|u| Uid::from_raw(u)), - volume.mount.group_id.map(|g| Gid::from_raw(g)), - ) - }; - chown(&volume.mount.destination, owner, group).map_err(|e| { - anyhow!( - "unable to change ownership of {}: {}", - &volume.mount.destination, - e - ) - })?; - debug!( - "Changed ownership of mount point {:?}", - volume.mount.destination + let (owner, group) = ( + mnt.user_id.map(Uid::from_raw), + mnt.group_id.map(Gid::from_raw), ); + chown(&mnt.destination, owner, group) + .map_err(|e| anyhow!("unable to change ownership of {}: {}", &mnt.destination, e))?; + debug!("Changed ownership of mount point {:?}", mnt.destination); - try_mkfs(&volume.device, volume.fs_type.as_ref().unwrap())?; + let fs_type = mnt.fs_type.as_ref().unwrap(); + try_mkfs(&volume.device, fs_type)?; mount( &volume.device, - &volume.mount.destination, - volume.fs_type.as_ref().unwrap(), + &mnt.destination, + fs_type, MountFlags::empty(), - "", + None, ) .map_err(|e| { anyhow!( "unable to mount {} on {}: {}", &volume.device, - &volume.mount.destination, + &mnt.destination, e ) })?; - info!( - "Mounted volume {} on {}", - &volume.device, &volume.mount.destination - ); + info!("Mounted volume {} on {}", &volume.device, &mnt.destination); Ok(()) } @@ -367,13 +479,11 @@ fn try_mkfs(device: &str, fs_type: &str) -> Result<()> { } fn handle_volume_ssm( + ssm_client: &SsmClient, base_dir: &Path, volume: &SsmVolumeSource, - credentials: Credentials, - region: &str, ) -> Result<()> { - let client = SsmClient::new(credentials, region)?; - match client.get_parameter_list(&volume.path) { + match ssm_client.get_parameter_list(&volume.path) { Ok(mut parameters) => { debug!("SSM parameters: {:?}", parameters); for parameter in parameters.iter_mut() { @@ -395,13 +505,11 @@ fn handle_volume_ssm( } fn handle_volume_secretsmanager( + asm_client: &AsmClient, base_dir: &Path, volume: &SecretsManagerVolumeSource, - credentials: Credentials, - region: &str, ) -> Result<()> { - let client = AsmClient::new(credentials, region)?; - match client.get_secret_list(&volume.secret_id) { + match asm_client.get_secret_list(&volume.secret_id) { Ok(mut secrets) => { debug!("Secrets Manager secrets: {:?}", secrets); for secret in secrets.iter_mut() { @@ -422,16 +530,9 @@ fn handle_volume_secretsmanager( } } -fn handle_volume_s3( - base_dir: &Path, - volume: &S3VolumeSource, - credentials: Credentials, - region: &str, -) -> Result<()> { - let client = S3Client::new(credentials, region) - .map_err(|e| anyhow!("unable to create S3 client: {}", e))?; +fn handle_volume_s3(s3_client: &S3Client, base_dir: &Path, volume: &S3VolumeSource) -> Result<()> { let s3_url = format!("s3://{}/{}", volume.bucket, volume.key_prefix); - match client.get_object_list(&volume.bucket, &volume.key_prefix) { + match s3_client.get_object_list(&volume.bucket, &volume.key_prefix) { Ok(mut objects) => { debug!("S3 objects: {:?}", objects); for object in objects.iter_mut() { @@ -512,11 +613,11 @@ fn resolve_env_from_s3( region: &str, ) -> Result { let get_bytes = || { - let client = S3Client::new(credentials.clone(), region)?; + let client = S3Client::new(credentials.clone(), region); client.get_object_bytes(&source.bucket, &source.key) }; let get_map = || { - let client = S3Client::new(credentials.clone(), region)?; + let client = S3Client::new(credentials.clone(), region); client.get_object_map(&source.bucket, &source.key) }; resolve_env_from( @@ -532,7 +633,7 @@ fn resolve_env_from_secretsmanager( credentials: Credentials, region: &str, ) -> Result { - let client = &AsmClient::new(credentials, region)?; + let client = &AsmClient::new(credentials, region); let get_bytes = || client.get_secret_value(&source.secret_id); let get_map = || client.get_secret_map(&source.secret_id); resolve_env_from( @@ -548,7 +649,7 @@ fn resolve_env_from_ssm( credentials: Credentials, region: &str, ) -> Result { - let client = &SsmClient::new(credentials, region)?; + let client = &SsmClient::new(credentials, region); let get_bytes = || client.get_parameter_value(&source.path); let get_map = || client.get_parameter_map(&source.path); resolve_env_from( @@ -636,19 +737,17 @@ fn replace_init(vmspec: VmSpec, command: Vec, env: NameValues) -> Result } if let Some(true) = vmspec.security.readonly_root_fs { - remount(constants::DIR_ROOT, MountFlags::RDONLY, "") + mount_remount(constants::DIR_ROOT, MountFlags::RDONLY, "") .map_err(|e| anyhow!("unable to remount root filesystem as readonly: {}", e))?; } chdir(&vmspec.working_dir) .map_err(|e| anyhow!("unable to chdir to {}: {}", &vmspec.working_dir, e))?; - let (uid, gid) = unsafe { - ( - Uid::from_raw(vmspec.security.run_as_user_id.unwrap()), - Gid::from_raw(vmspec.security.run_as_group_id.unwrap()), - ) - }; + let (uid, gid) = ( + Uid::from_raw(vmspec.security.run_as_user_id.unwrap()), + Gid::from_raw(vmspec.security.run_as_group_id.unwrap()), + ); // This calls setgid and setuid only for the current thread, but since this thread // is calling execve(), the new process will inherit the new user and group. set_thread_gid(gid).map_err(|e| { @@ -703,8 +802,17 @@ fn supervise(vmspec: VmSpec, command: Vec, env: NameValues) -> Result<() let mount_points: Vec = vmspec .volumes .iter() - .filter(|v| v.ebs.is_some()) - .map(|v| v.ebs.as_ref().unwrap().mount.destination.clone()) + .filter(|v| v.ebs.is_some() && v.ebs.as_ref().unwrap().mount.is_some()) + .map(|v| { + v.ebs + .as_ref() + .unwrap() + .mount + .as_ref() + .unwrap() + .destination + .clone() + }) .collect(); let mut supervisor = Supervisor::new(vmspec, command, env)?; @@ -722,7 +830,7 @@ fn supervise(vmspec: VmSpec, command: Vec, env: NameValues) -> Result<() fn unmount_all(mount_points: &[String]) -> Result<()> { let mut error_count = 0; - if let Err(e) = remount(constants::DIR_ROOT, MountFlags::RDONLY, "") { + if let Err(e) = mount_remount(constants::DIR_ROOT, MountFlags::RDONLY, "") { error_count += 1; error!( "unable to remount {} as read-only: {}", diff --git a/src/lib.rs b/src/lib.rs index dda6fad..bfcbdf4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,5 +7,6 @@ pub mod login; pub mod rdev; pub mod service; pub mod system; +pub mod uevent; pub mod vmspec; pub mod writable; diff --git a/src/login.rs b/src/login.rs index 0e26707..c1d39fd 100644 --- a/src/login.rs +++ b/src/login.rs @@ -136,7 +136,7 @@ pub fn create_home_dir(home_dir: &Path, uid: u32, gid: u32) -> Result<()> { mkdir(parent, Mode::from_bits(0o755).unwrap())?; mkdir(home_dir, Mode::from_bits(0o700).unwrap())?; mkdir(ssh_dir, Mode::from_bits(0o700).unwrap())?; - let (uid, gid) = unsafe { (Uid::from_raw(uid), Gid::from_raw(gid)) }; + let (uid, gid) = (Uid::from_raw(uid), Gid::from_raw(gid)); chown(home_dir, Some(uid), Some(gid))?; chown(ssh_dir, Some(uid), Some(gid))?; umask(old_mask); diff --git a/src/service.rs b/src/service.rs index d633938..46a033e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -12,11 +12,12 @@ use std::{ use anyhow::{anyhow, Result}; use crossbeam::channel::{bounded, Receiver, Select, Sender}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use minaws::imds::Imds; use rustix::{ - fs::{chmod, chown, remount, stat, Dir, FileType, Gid, Mode, MountFlags, Uid}, + fs::{chmod, chown, stat, Dir, FileType, Gid, Mode, Uid}, io::Errno, + mount::{mount_remount, MountFlags}, process::{kill_process, wait, Signal, WaitOptions}, thread::Pid, }; @@ -79,8 +80,8 @@ impl Default for ServiceBase { args: Vec::new(), working_dir: "/".into(), env: Vec::new(), - gid: unsafe { Gid::from_raw(0) }, - uid: unsafe { Uid::from_raw(0) }, + gid: Gid::from_raw(0), + uid: Uid::from_raw(0), init: None, stop_rx: err_recv, stop_tx: err_send, @@ -230,7 +231,7 @@ impl Chrony { let chrony_run_path = Path::new(constants::DIR_ET_RUN).join("chrony"); mkdir_p(&chrony_run_path, Mode::from(0o750))?; - let (uid, gid) = unsafe { (Uid::from_raw(user.uid), (Gid::from_raw(user.gid))) }; + let (uid, gid) = (Uid::from_raw(user.uid), (Gid::from_raw(user.gid))); chown(&chrony_run_path, Some(uid), Some(gid))?; Ok(()) @@ -301,7 +302,7 @@ impl Ssh { .ok_or_else(|| anyhow!("user {} not found", login_user))?; let ssh_dir = Path::new(&user.home_dir).join(".ssh"); - let (uid, gid) = unsafe { (Uid::from_raw(user.uid), (Gid::from_raw(user.gid))) }; + let (uid, gid) = (Uid::from_raw(user.uid), (Gid::from_raw(user.gid))); Self::ssh_write_pub_key(&ssh_dir, uid, gid)?; let rsa_key_path = Path::new(constants::DIR_ET_ETC) @@ -402,7 +403,7 @@ impl SupervisorBase { } fn kill(&self) -> Result<()> { - self.signal(Signal::Kill) + self.signal(Signal::KILL) } // Return the PIDs of all current non-kernel processes excluding init. @@ -462,7 +463,7 @@ impl SupervisorBase { let init_rx = service_ref.lock().unwrap().init_rx().clone(); let _ = init_rx.recv(); } - remount(constants::DIR_ROOT, MountFlags::RDONLY, "")?; + mount_remount(constants::DIR_ROOT, MountFlags::RDONLY, "")?; } start_main(self.main_ref.clone()) @@ -500,7 +501,7 @@ impl SupervisorBase { } info!("Shutting down all processes"); - if let Err(e) = self.signal(Signal::Term) { + if let Err(e) = self.signal(Signal::TERM) { error!("Error sending TERM signal: {}", e); } @@ -538,12 +539,10 @@ pub struct Supervisor { impl Supervisor { pub fn new(vmspec: VmSpec, command: Vec, env: NameValues) -> Result { - let (uid, gid) = unsafe { - ( - Uid::from_raw(vmspec.security.run_as_user_id.unwrap()), - Gid::from_raw(vmspec.security.run_as_group_id.unwrap()), - ) - }; + let (uid, gid) = ( + Uid::from_raw(vmspec.security.run_as_user_id.unwrap()), + Gid::from_raw(vmspec.security.run_as_group_id.unwrap()), + ); let working_dir = vmspec.working_dir.clone(); let main = Main::new(command, working_dir, env, gid, uid); @@ -702,7 +701,7 @@ fn start_main(service_ref: Arc>) -> Result<()> { .lock() .unwrap() .stop_tx() - .send(wait_result.map_err(Into::into)); + .send(wait_result); } } }); @@ -750,7 +749,7 @@ fn start_service(service_ref: Arc>) -> Result<()> { .lock() .unwrap() .stop_tx() - .send(wait_result.map_err(Into::into)); + .send(wait_result); return; } wait_result @@ -786,7 +785,7 @@ fn find_enabled_services( } else if entry_name == "ssh" { services.push(Arc::new(Mutex::new(Ssh::new()))); } else { - info!("Unknown service {}", entry_name); + warn!("Unknown service {}", entry_name); } } Ok(services) diff --git a/src/system.rs b/src/system.rs index 5b7cfe8..b823aa0 100644 --- a/src/system.rs +++ b/src/system.rs @@ -77,45 +77,48 @@ pub fn link_nvme_devices() -> Result<()> { ) })?; let device_name = entry.file_name().to_string_lossy().to_string(); - let device_path = Path::new("/dev").join(&device_name); - let device_fd = File::open(&device_path) - .map_err(|e| anyhow!("unable to open {:?}: {}", &device_path, e))?; - if let Ok(nvme) = Nvme::try_from(device_fd) { - debug!("nvme device: {:?}", nvme); - let ec2_device_name = nvme.name(); - let device_link_path = Path::new("/dev").join(ec2_device_name); - debug!("linking {} to {:?}", &device_name, &device_link_path); - symlink(&device_name, &device_link_path).map_err(|e| { - anyhow!( + let disk_device = DeviceInfo { + name: device_name.clone(), + part_num: None, + }; + link_nvme_device(&disk_device)?; + let partition_devices = disk_partitions(&device_name) + .map_err(|e| anyhow!("unable to get partitions of {:?}: {}", &device_name, e))?; + for partition_device in partition_devices { + link_nvme_device(&partition_device)?; + } + } + Ok(()) +} + +pub fn link_nvme_device(device: &DeviceInfo) -> Result<()> { + let device_path = Path::new("/dev").join(&device.name); + let device_fd = File::open(&device_path) + .map_err(|e| anyhow!("unable to open {:?}: {}", &device_path, e))?; + if let Ok(nvme) = Nvme::try_from(device_fd) { + debug!("nvme device: {:?}", nvme); + let ec2_device_name = nvme.name(); + let link_device_name = &device + .part_num + .as_ref() + .map(|n| { + if has_digit_suffix(ec2_device_name) { + format!("{}p{}", ec2_device_name, n) + } else { + format!("{}{}", ec2_device_name, n) + } + }) + .unwrap_or(ec2_device_name.into()); + let link_path = Path::new("/dev").join(link_device_name); + debug!("linking {} to {:?}", &device.name, &link_path); + if let Err(e) = symlink(&device.name, &link_path) { + if e.kind() != ErrorKind::AlreadyExists { + return Err(anyhow!( "unable to link {} to {:?}: {}", - &device_name, - &device_link_path, + &device.name, + &link_path, e - ) - })?; - - // Link partitions too if they exist. - let partitions = disk_partitions(&device_name) - .map_err(|e| anyhow!("unable to get partitions of {:?}: {}", &device_name, e))?; - for partition in partitions { - let partition_name = if has_digit_suffix(ec2_device_name) { - format!("{}p{}", &ec2_device_name, &partition.partition) - } else { - format!("{}{}", &ec2_device_name, &partition.partition) - }; - let partition_link_path = Path::new("/dev").join(&partition_name); - debug!( - "linking {} to {:?}", - &partition.device, &partition_link_path - ); - symlink(&partition.device, &partition_link_path).map_err(|e| { - anyhow!( - "unable to link {} to {:?}: {}", - &partition.device, - &partition_link_path, - e - ) - })?; + )); } } } @@ -295,12 +298,12 @@ fn grow_filesystem(path: &PathBuf) -> Result<()> { } #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)] -struct PartitionInfo { - device: String, - partition: String, +pub struct DeviceInfo { + pub name: String, + pub part_num: Option, } -fn disk_partitions(device: &str) -> Result> { +fn disk_partitions(device: &str) -> Result> { let mut partitions = Vec::new(); let sys_device_dir = Path::new(SYS_BLOCK_PATH).join(device); let dir_fd = File::open(&sys_device_dir)?; @@ -320,9 +323,9 @@ fn disk_partitions(device: &str) -> Result> { let mut contents = String::new(); f.read_to_string(&mut contents)?; contents.truncate(contents.trim_end().len()); - partitions.push(PartitionInfo { - device: name, - partition: contents, + partitions.push(DeviceInfo { + name, + part_num: Some(contents), }); } }; @@ -331,7 +334,7 @@ fn disk_partitions(device: &str) -> Result> { } fn has_digit_suffix(string: &str) -> bool { - string.chars().last().map_or(false, |c| c.is_ascii_digit()) + string.chars().last().is_some_and(|c| c.is_ascii_digit()) } #[cfg(test)] diff --git a/src/uevent.rs b/src/uevent.rs new file mode 100644 index 0000000..97ecf0d --- /dev/null +++ b/src/uevent.rs @@ -0,0 +1,93 @@ +use std::thread; + +use anyhow::{anyhow, Result}; +use log::{debug, error}; +use rustix::fd::AsFd; +use rustix::net::netlink::{SocketAddrNetlink, KOBJECT_UEVENT}; +use rustix::net::{bind, recv, socket, AddressFamily, RecvFlags, SocketType}; + +use crate::system::{link_nvme_device, DeviceInfo}; + +const DELIM: &str = "="; +const DEVNAME: &str = "DEVNAME"; +const PARTN: &str = "PARTN"; +const SUBSYSTEM: &str = "SUBSYSTEM"; +const SUBSYSTEM_BLOCK: &str = "block"; + +pub fn start_uevent_listener() -> Result<()> { + let fd = socket( + AddressFamily::NETLINK, + SocketType::DGRAM, + Some(KOBJECT_UEVENT), + )?; + let addr = SocketAddrNetlink::new(0, 1); + bind(fd.as_fd(), &addr)?; + thread::spawn(|| { + debug!("starting uevent listener"); + recv_messages(fd); + }); + Ok(()) +} + +fn recv_messages(fd: Fd) { + let mut buf = [0u8; 4096]; + loop { + match recv(fd.as_fd(), &mut buf, RecvFlags::empty()) { + Ok((len, _)) => match handle_message(&buf, len) { + Ok(Some(dev)) => { + if let Err(e) = link_nvme_device(&dev) { + error!("error linking device {:?}: {}", &dev, e); + } + } + Ok(None) => (), + Err(e) => error!("error handling netlink message: {}", e), + }, + Err(e) => error!("error receiving netlink message: {}", e), + } + } +} + +fn handle_message(buf: &[u8], len: usize) -> Result> { + let mut devname = String::new(); + let mut partn = String::new(); + + // Only handle "add@" messages. + if len < 4 { + return Err(anyhow!("unexpected length of netlink message: {}", len)); + } + if buf[..4] != [b'a', b'd', b'd', b'@'] { + return Ok(None); + } + + for var in buf[..len].split(|&b| b == 0) { + if var.is_empty() { + continue; + } + let message = String::from_utf8_lossy(var); + debug!("uevent message: {}", message); + let fields: Vec<&str> = message.split(DELIM).collect(); + if fields.len() != 2 { + continue; + } + if fields[0] == SUBSYSTEM { + if fields[1] != SUBSYSTEM_BLOCK { + return Ok(None); + } + continue; + } + if fields[0] == DEVNAME { + devname = fields[1].into(); + continue; + } + if fields[0] == PARTN { + partn = fields[1].into(); + } + } + if devname.is_empty() { + return Ok(None); + } + Ok(Some(DeviceInfo { + name: devname, + part_num: if !partn.is_empty() { Some(partn) } else { None }, + })) +} diff --git a/src/vmspec.rs b/src/vmspec.rs index a76806b..c902350 100644 --- a/src/vmspec.rs +++ b/src/vmspec.rs @@ -175,14 +175,16 @@ impl VmSpec { fn update_defaults(&mut self) { for volume in &mut self.volumes { if let Some(ebs) = &mut volume.ebs { - if ebs.mount.group_id.is_none() { - ebs.mount.group_id = self.security.run_as_group_id; - } - if ebs.mount.user_id.is_none() { - ebs.mount.user_id = self.security.run_as_user_id; - } - if ebs.mount.mode.is_none() { - ebs.mount.mode = Some("0755".into()); + if let Some(mount) = &mut ebs.mount { + if mount.group_id.is_none() { + mount.group_id = self.security.run_as_group_id; + } + if mount.user_id.is_none() { + mount.user_id = self.security.run_as_user_id; + } + if mount.mode.is_none() { + mount.mode = Some("0755".into()); + } } } if let Some(s3) = &mut volume.s3 { @@ -417,12 +419,21 @@ pub type Volumes = Vec; #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct EbsVolumeSource { + pub attachment: Option, pub device: String, - #[serde(rename = "fs-type")] - pub fs_type: Option, - #[serde(rename = "make-fs")] - pub make_fs: Option, - pub mount: Mount, + pub mount: Option, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct EbsVolumeAttachment { + pub tags: Vec, + pub timeout: Option, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct AwsTag { + pub key: String, + pub value: Option, } #[derive(Clone, Debug, Default, Deserialize, Serialize)] @@ -452,6 +463,8 @@ pub struct SsmVolumeSource { #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Mount { pub destination: String, + #[serde(rename = "fs-type")] + pub fs_type: Option, #[serde(rename = "group-id")] pub group_id: Option, pub mode: Option, diff --git a/src/writable.rs b/src/writable.rs index b4add71..92c42e0 100644 --- a/src/writable.rs +++ b/src/writable.rs @@ -27,7 +27,7 @@ where }; let dest_dir = final_dest.parent().ok_or(anyhow!("no parent directory"))?; - let (uid, gid) = unsafe { (Uid::from_raw(user_id), Gid::from_raw(group_id)) }; + let (uid, gid) = (Uid::from_raw(user_id), Gid::from_raw(group_id)); mkdir_p_own(dest_dir, mode_dir, Some(uid), Some(gid))?; let mut f = File::options()