From d123ca300c04b468e693ab1d2faaa98f27444e7e Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Sun, 12 Jan 2025 15:57:22 -0800 Subject: [PATCH 1/4] fix(vm): set timeout on VCPU thread creation Signed-off-by: Changyuan Lyu --- alioth/src/board/board.rs | 29 ++++++++++++++++++++--------- alioth/src/vm.rs | 6 +++++- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/alioth/src/board/board.rs b/alioth/src/board/board.rs index 44663b59..5d00dad5 100644 --- a/alioth/src/board/board.rs +++ b/alioth/src/board/board.rs @@ -92,6 +92,8 @@ pub enum Error { VmExit { msg: String }, #[snafu(display("Failed to configure firmware"))] Firmware { error: std::io::Error }, + #[snafu(display("Failed to notify the VMM thread"))] + NotifyVmm, } type Result = std::result::Result; @@ -250,12 +252,10 @@ where fn run_vcpu_inner( &self, id: u32, - event_tx: &Sender, + vcpu: &mut V::Vcpu, boot_rx: &Receiver<()>, ) -> Result<(), Error> { - let mut vcpu = self.vm.create_vcpu(id).context(error::CreateVcpu { id })?; - event_tx.send(id).unwrap(); - self.init_vcpu(id, &mut vcpu)?; + self.init_vcpu(id, vcpu)?; boot_rx.recv().unwrap(); if self.state.load(Ordering::Acquire) != STATE_RUNNING { return Ok(()); @@ -274,14 +274,14 @@ where } self.add_pci_devs()?; let init_state = self.load_payload()?; - self.init_boot_vcpu(&mut vcpu, &init_state)?; + self.init_boot_vcpu(vcpu, &init_state)?; self.create_firmware_data(&init_state)?; } - self.init_ap(id, &mut vcpu, &vcpus)?; + self.init_ap(id, vcpu, &vcpus)?; self.coco_finalize(id, &vcpus)?; drop(vcpus); - let reboot = self.vcpu_loop(&mut vcpu, id)?; + let reboot = self.vcpu_loop(vcpu, id)?; let new_state = if reboot { STATE_REBOOT_PENDING @@ -334,7 +334,16 @@ where _ => break Ok(()), } - self.reset_vcpu(id, &mut vcpu)?; + self.reset_vcpu(id, vcpu)?; + } + } + + fn create_vcpu(&self, id: u32, event_tx: &Sender) -> Result { + let vcpu = self.vm.create_vcpu(id).context(error::CreateVcpu { id })?; + if event_tx.send(id).is_err() { + error::NotifyVmm.fail() + } else { + Ok(vcpu) } } @@ -344,7 +353,9 @@ where event_tx: Sender, boot_rx: Receiver<()>, ) -> Result<(), Error> { - let ret = self.run_vcpu_inner(id, &event_tx, &boot_rx); + let mut vcpu = self.create_vcpu(id, &event_tx)?; + + let ret = self.run_vcpu_inner(id, &mut vcpu, &boot_rx); self.state.store(STATE_SHUTDOWN, Ordering::Release); event_tx.send(id).unwrap(); ret diff --git a/alioth/src/vm.rs b/alioth/src/vm.rs index 842b8e6e..ebaeb9ad 100644 --- a/alioth/src/vm.rs +++ b/alioth/src/vm.rs @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::Arc; use std::thread; +use std::time::Duration; use parking_lot::{Condvar, Mutex, RwLock}; use snafu::{ResultExt, Snafu}; @@ -165,7 +166,10 @@ where .name(format!("vcpu_{}", vcpu_id)) .spawn(move || board.run_vcpu(vcpu_id, event_tx, boot_rx)) .context(error::VcpuThread { id: vcpu_id })?; - event_rx.recv().unwrap(); + if event_rx.recv_timeout(Duration::from_secs(2)).is_err() { + let err = std::io::ErrorKind::TimedOut.into(); + Err(err).context(error::VcpuThread { id: vcpu_id })?; + } vcpus.push((handle, boot_tx)); } drop(vcpus); From 7af3d30d163e1a3da3746302264dd5c4abfb6c6b Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Sun, 12 Jan 2025 19:22:20 -0800 Subject: [PATCH 2/4] fix(board): avoid infinite wait in sync_vcpus() Signed-off-by: Changyuan Lyu --- alioth/src/board/board.rs | 162 ++++++++++++++++++++++++------------- alioth/src/board/x86_64.rs | 6 +- alioth/src/vm.rs | 36 ++------- 3 files changed, 113 insertions(+), 91 deletions(-) diff --git a/alioth/src/board/board.rs b/alioth/src/board/board.rs index 5d00dad5..19424930 100644 --- a/alioth/src/board/board.rs +++ b/alioth/src/board/board.rs @@ -20,7 +20,6 @@ mod x86_64; #[cfg(target_os = "linux")] use std::collections::HashMap; use std::ffi::CStr; -use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; use std::thread::JoinHandle; @@ -94,14 +93,26 @@ pub enum Error { Firmware { error: std::io::Error }, #[snafu(display("Failed to notify the VMM thread"))] NotifyVmm, + #[snafu(display("Another VCPU thread has signaled failure"))] + PeerFailure, } type Result = std::result::Result; -pub const STATE_CREATED: u8 = 0; -pub const STATE_RUNNING: u8 = 1; -pub const STATE_SHUTDOWN: u8 = 2; -pub const STATE_REBOOT_PENDING: u8 = 3; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BoardState { + Created, + Running, + Shutdown, + RebootPending, +} + +#[derive(Debug)] +struct MpSync { + state: BoardState, + fatal: bool, + count: u32, +} pub const PCIE_MMIO_64_SIZE: u64 = 1 << 40; @@ -129,9 +140,7 @@ where pub vcpus: Arc>>, pub arch: ArchBoard, pub config: BoardConfig, - pub state: AtomicU8, pub payload: RwLock>, - pub mp_sync: Arc<(Mutex, Condvar)>, pub io_devs: RwLock)>>, #[cfg(target_arch = "aarch64")] pub mmio_devs: RwLock)>>, @@ -142,12 +151,53 @@ where pub vfio_ioases: Mutex, Arc>>, #[cfg(target_os = "linux")] pub vfio_containers: Mutex, Arc>>, + + mp_sync: Mutex, + cond_var: Condvar, } impl Board where V: Vm, { + pub fn new(vm: V, memory: Memory, arch: ArchBoard, config: BoardConfig) -> Self { + Board { + vm, + memory, + arch, + config, + payload: RwLock::new(None), + vcpus: Arc::new(RwLock::new(Vec::new())), + io_devs: RwLock::new(Vec::new()), + #[cfg(target_arch = "aarch64")] + mmio_devs: RwLock::new(Vec::new()), + pci_bus: PciBus::new(), + #[cfg(target_arch = "x86_64")] + fw_cfg: Mutex::new(None), + #[cfg(target_os = "linux")] + vfio_ioases: Mutex::new(HashMap::new()), + #[cfg(target_os = "linux")] + vfio_containers: Mutex::new(HashMap::new()), + + mp_sync: Mutex::new(MpSync { + state: BoardState::Created, + count: 0, + fatal: false, + }), + cond_var: Condvar::new(), + } + } + + pub fn boot(&self) -> Result<()> { + let vcpus = self.vcpus.read(); + let mut mp_sync = self.mp_sync.lock(); + mp_sync.state = BoardState::Running; + for (_, boot_tx) in vcpus.iter() { + boot_tx.send(()).unwrap(); + } + Ok(()) + } + fn load_payload(&self) -> Result { let payload = self.payload.read(); let Some(payload) = payload.as_ref() else { @@ -221,10 +271,10 @@ where break Ok(true); } VmExit::Interrupted => { - let state = self.state.load(Ordering::Acquire); - match state { - STATE_SHUTDOWN => VmEntry::Shutdown, - STATE_REBOOT_PENDING => VmEntry::Reboot, + let mp_sync = self.mp_sync.lock(); + match mp_sync.state { + BoardState::Shutdown => VmEntry::Shutdown, + BoardState::RebootPending => VmEntry::Reboot, _ => VmEntry::None, } } @@ -237,16 +287,25 @@ where } } - fn sync_vcpus(&self, vcpus: &VcpuGuard) { - let (lock, cvar) = &*self.mp_sync; - let mut count = lock.lock(); - *count += 1; - if *count == vcpus.len() as u32 { - *count = 0; - cvar.notify_all(); + fn sync_vcpus(&self, vcpus: &VcpuGuard) -> Result<()> { + let mut mp_sync = self.mp_sync.lock(); + if mp_sync.fatal { + return error::PeerFailure.fail(); + } + + mp_sync.count += 1; + if mp_sync.count == vcpus.len() as u32 { + mp_sync.count = 0; + self.cond_var.notify_all(); } else { - cvar.wait(&mut count) + self.cond_var.wait(&mut mp_sync) + } + + if mp_sync.fatal { + return error::PeerFailure.fail(); } + + Ok(()) } fn run_vcpu_inner( @@ -257,7 +316,7 @@ where ) -> Result<(), Error> { self.init_vcpu(id, vcpu)?; boot_rx.recv().unwrap(); - if self.state.load(Ordering::Acquire) != STATE_RUNNING { + if self.mp_sync.lock().state != BoardState::Running { return Ok(()); } loop { @@ -279,37 +338,28 @@ where } self.init_ap(id, vcpu, &vcpus)?; self.coco_finalize(id, &vcpus)?; + self.sync_vcpus(&vcpus)?; drop(vcpus); let reboot = self.vcpu_loop(vcpu, id)?; - let new_state = if reboot { - STATE_REBOOT_PENDING - } else { - STATE_SHUTDOWN - }; let vcpus = self.vcpus.read(); - match self.state.compare_exchange( - STATE_RUNNING, - new_state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(STATE_RUNNING) => { - for (vcpu_id, (handle, _)) in vcpus.iter().enumerate() { - if id != vcpu_id as u32 { - log::info!("vcpu{id} to kill {vcpu_id}"); - V::stop_vcpu(vcpu_id as u32, handle).context(error::StopVcpu { id })?; - } + let mut mp_sync = self.mp_sync.lock(); + if mp_sync.state == BoardState::Running { + mp_sync.state = if reboot { + BoardState::RebootPending + } else { + BoardState::Shutdown + }; + for (vcpu_id, (handle, _)) in vcpus.iter().enumerate() { + if id != vcpu_id as u32 { + log::info!("VCPU-{id}: stopping VCPU-{vcpu_id}"); + V::stop_vcpu(vcpu_id as u32, handle).context(error::StopVcpu { id })?; } } - Err(s) if s == new_state => {} - Ok(s) | Err(s) => { - log::error!("unexpected state: {s}"); - } } - - self.sync_vcpus(&vcpus); + drop(mp_sync); + self.sync_vcpus(&vcpus)?; if id == 0 { let devices = self.pci_bus.segment.devices.read(); @@ -319,22 +369,13 @@ where } self.memory.reset()?; } + self.reset_vcpu(id, vcpu)?; - if new_state == STATE_SHUTDOWN { + let mut mp_sync = self.mp_sync.lock(); + if mp_sync.state == BoardState::Shutdown { break Ok(()); } - - match self.state.compare_exchange( - STATE_REBOOT_PENDING, - STATE_RUNNING, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(STATE_REBOOT_PENDING) | Err(STATE_RUNNING) => {} - _ => break Ok(()), - } - - self.reset_vcpu(id, vcpu)?; + mp_sync.state = BoardState::Running; } } @@ -356,7 +397,14 @@ where let mut vcpu = self.create_vcpu(id, &event_tx)?; let ret = self.run_vcpu_inner(id, &mut vcpu, &boot_rx); - self.state.store(STATE_SHUTDOWN, Ordering::Release); + if ret.is_err() && !matches!(ret, Err(Error::PeerFailure { .. })) { + log::warn!("VCPU-{id} reported error, unblocking other VCPUs..."); + let mut mp_sync = self.mp_sync.lock(); + mp_sync.fatal = true; + if mp_sync.count > 0 { + self.cond_var.notify_all(); + } + } event_tx.send(id).unwrap(); ret } diff --git a/alioth/src/board/x86_64.rs b/alioth/src/board/x86_64.rs index 062a4fee..33604405 100644 --- a/alioth/src/board/x86_64.rs +++ b/alioth/src/board/x86_64.rs @@ -207,7 +207,7 @@ where Some(Coco::AmdSnp { .. }) => {} _ => return Ok(()), } - self.sync_vcpus(vcpus); + self.sync_vcpus(vcpus)?; if id == 0 { return Ok(()); } @@ -319,7 +319,7 @@ where pub fn coco_finalize(&self, id: u32, vcpus: &VcpuGuard) -> Result<()> { if let Some(coco) = &self.config.coco { - self.sync_vcpus(vcpus); + self.sync_vcpus(vcpus)?; if id == 0 { match coco { Coco::AmdSev { policy } => { @@ -334,7 +334,7 @@ where } } } - self.sync_vcpus(vcpus); + self.sync_vcpus(vcpus)?; } Ok(()) } diff --git a/alioth/src/vm.rs b/alioth/src/vm.rs index ebaeb9ad..cff20c8e 100644 --- a/alioth/src/vm.rs +++ b/alioth/src/vm.rs @@ -12,24 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(target_os = "linux")] -use std::collections::HashMap; #[cfg(target_os = "linux")] use std::path::Path; -use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::Arc; use std::thread; use std::time::Duration; -use parking_lot::{Condvar, Mutex, RwLock}; +#[cfg(target_os = "linux")] +use parking_lot::Mutex; use snafu::{ResultExt, Snafu}; #[cfg(target_arch = "aarch64")] use crate::arch::layout::PL011_START; #[cfg(target_arch = "x86_64")] use crate::arch::layout::{PORT_COM1, PORT_FW_CFG_SELECTOR}; -use crate::board::{ArchBoard, Board, BoardConfig, STATE_CREATED, STATE_RUNNING}; +use crate::board::{ArchBoard, Board, BoardConfig}; #[cfg(target_arch = "x86_64")] use crate::device::fw_cfg::{FwCfg, FwCfgItemParam}; #[cfg(target_arch = "aarch64")] @@ -45,7 +43,6 @@ use crate::loader::Payload; use crate::mem::Memory; #[cfg(target_arch = "aarch64")] use crate::mem::{MemRegion, MemRegionType}; -use crate::pci::bus::PciBus; use crate::pci::{Bdf, PciDevice}; #[cfg(target_os = "linux")] use crate::vfio::bindings::VfioIommu; @@ -134,26 +131,7 @@ where let memory = Memory::new(vm_memory); let arch = ArchBoard::new(&hv, &vm, &config)?; - let board = Arc::new(Board { - vm, - memory, - arch, - config, - state: AtomicU8::new(STATE_CREATED), - payload: RwLock::new(None), - vcpus: Arc::new(RwLock::new(Vec::new())), - mp_sync: Arc::new((Mutex::new(0), Condvar::new())), - io_devs: RwLock::new(Vec::new()), - #[cfg(target_arch = "aarch64")] - mmio_devs: RwLock::new(Vec::new()), - pci_bus: PciBus::new(), - #[cfg(target_arch = "x86_64")] - fw_cfg: Mutex::new(None), - #[cfg(target_os = "linux")] - vfio_ioases: Mutex::new(HashMap::new()), - #[cfg(target_os = "linux")] - vfio_containers: Mutex::new(HashMap::new()), - }); + let board = Arc::new(Board::new(vm, memory, arch, config)); let (event_tx, event_rx) = mpsc::channel(); @@ -291,11 +269,7 @@ where } pub fn boot(&self) -> Result<(), Error> { - let vcpus = self.board.vcpus.read(); - self.board.state.store(STATE_RUNNING, Ordering::Release); - for (_, boot_tx) in vcpus.iter() { - boot_tx.send(()).unwrap(); - } + self.board.boot()?; Ok(()) } From 32a42d600d0d108d8bf6b488f7065e4400f6cd0b Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Sun, 12 Jan 2025 19:58:18 -0800 Subject: [PATCH 3/4] fix(board): stop other VCPUs before returning errors Signed-off-by: Changyuan Lyu --- alioth/src/board/board.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/alioth/src/board/board.rs b/alioth/src/board/board.rs index 19424930..69419798 100644 --- a/alioth/src/board/board.rs +++ b/alioth/src/board/board.rs @@ -341,12 +341,12 @@ where self.sync_vcpus(&vcpus)?; drop(vcpus); - let reboot = self.vcpu_loop(vcpu, id)?; + let maybe_reboot = self.vcpu_loop(vcpu, id); let vcpus = self.vcpus.read(); let mut mp_sync = self.mp_sync.lock(); if mp_sync.state == BoardState::Running { - mp_sync.state = if reboot { + mp_sync.state = if matches!(maybe_reboot, Ok(true)) { BoardState::RebootPending } else { BoardState::Shutdown @@ -371,6 +371,10 @@ where } self.reset_vcpu(id, vcpu)?; + if let Err(e) = maybe_reboot { + break Err(e); + } + let mut mp_sync = self.mp_sync.lock(); if mp_sync.state == BoardState::Shutdown { break Ok(()); From 8acd78f7379d036c0a5a5374df67ed4e6cbe3769 Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Sun, 12 Jan 2025 22:24:03 -0800 Subject: [PATCH 4/4] fix(vm): only report the root failure Signed-off-by: Changyuan Lyu --- alioth-cli/src/main.rs | 4 +--- alioth/src/board/board.rs | 19 +++++++++++-------- alioth/src/vm.rs | 24 ++++++++++++------------ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/alioth-cli/src/main.rs b/alioth-cli/src/main.rs index a14e5895..76e36d03 100644 --- a/alioth-cli/src/main.rs +++ b/alioth-cli/src/main.rs @@ -488,9 +488,7 @@ fn main_run(args: RunArgs) -> Result<(), Error> { } vm.boot().context(error::BootVm)?; - for result in vm.wait() { - result.context(error::WaitVm)?; - } + vm.wait().context(error::WaitVm)?; Ok(()) } diff --git a/alioth/src/board/board.rs b/alioth/src/board/board.rs index 69419798..44e7e6f4 100644 --- a/alioth/src/board/board.rs +++ b/alioth/src/board/board.rs @@ -401,15 +401,18 @@ where let mut vcpu = self.create_vcpu(id, &event_tx)?; let ret = self.run_vcpu_inner(id, &mut vcpu, &boot_rx); - if ret.is_err() && !matches!(ret, Err(Error::PeerFailure { .. })) { - log::warn!("VCPU-{id} reported error, unblocking other VCPUs..."); - let mut mp_sync = self.mp_sync.lock(); - mp_sync.fatal = true; - if mp_sync.count > 0 { - self.cond_var.notify_all(); - } - } event_tx.send(id).unwrap(); + + if matches!(ret, Ok(_) | Err(Error::PeerFailure { .. })) { + return Ok(()); + } + + log::warn!("VCPU-{id} reported error, unblocking other VCPUs..."); + let mut mp_sync = self.mp_sync.lock(); + mp_sync.fatal = true; + if mp_sync.count > 0 { + self.cond_var.notify_all(); + } ret } diff --git a/alioth/src/vm.rs b/alioth/src/vm.rs index cff20c8e..d2d6df79 100644 --- a/alioth/src/vm.rs +++ b/alioth/src/vm.rs @@ -273,7 +273,7 @@ where Ok(()) } - pub fn wait(&self) -> Vec> { + pub fn wait(&self) -> Result<()> { self.event_rx.recv().unwrap(); let vcpus = self.board.vcpus.read(); for _ in 1..vcpus.len() { @@ -281,17 +281,17 @@ where } drop(vcpus); let mut vcpus = self.board.vcpus.write(); - vcpus - .drain(..) - .enumerate() - .map(|(id, (handle, _))| match handle.join() { - Err(e) => { - log::error!("cannot join vcpu {}: {:?}", id, e); - Ok(()) - } - Ok(r) => r.context(error::Vcpu { id: id as u32 }), - }) - .collect() + let mut ret = Ok(()); + for (id, (handle, _)) in vcpus.drain(..).enumerate() { + let Ok(r) = handle.join() else { + log::error!("Cannot join VCPU-{id}"); + continue; + }; + if r.is_err() && ret.is_ok() { + ret = r.context(error::Vcpu { id: id as u32 }); + } + } + ret } }