From 7214a4d58ed0e4c1ba0ca1f2de1b0bc50f216f9e Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Mon, 14 Apr 2025 14:35:46 -0700 Subject: [PATCH 1/3] refactor(virtio): defer ioeventfd offloading to worker threads Signed-off-by: Changyuan Lyu --- alioth/src/virtio/dev/balloon.rs | 2 +- alioth/src/virtio/dev/blk.rs | 2 +- alioth/src/virtio/dev/dev.rs | 11 ++------ alioth/src/virtio/dev/entropy.rs | 2 +- alioth/src/virtio/dev/fs.rs | 2 +- alioth/src/virtio/dev/net/net.rs | 2 +- alioth/src/virtio/dev/vsock/vhost_vsock.rs | 2 +- alioth/src/virtio/pci.rs | 6 ++-- alioth/src/virtio/worker/io_uring.rs | 12 ++++---- alioth/src/virtio/worker/mio.rs | 33 +++++++++++----------- 10 files changed, 35 insertions(+), 39 deletions(-) diff --git a/alioth/src/virtio/dev/balloon.rs b/alioth/src/virtio/dev/balloon.rs index 5b5ece09..a404f933 100644 --- a/alioth/src/virtio/dev/balloon.rs +++ b/alioth/src/virtio/dev/balloon.rs @@ -204,7 +204,7 @@ impl Virtio for Balloon { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/dev/blk.rs b/alioth/src/virtio/dev/blk.rs index 8d111d52..3e285e01 100644 --- a/alioth/src/virtio/dev/blk.rs +++ b/alioth/src/virtio/dev/blk.rs @@ -303,7 +303,7 @@ impl Virtio for Block { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/dev/dev.rs b/alioth/src/virtio/dev/dev.rs index f48b3858..4298f62e 100644 --- a/alioth/src/virtio/dev/dev.rs +++ b/alioth/src/virtio/dev/dev.rs @@ -56,7 +56,7 @@ pub trait Virtio: Debug + Send + Sync + 'static { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)>; fn shared_mem_regions(&self) -> Option> { None @@ -125,7 +125,7 @@ where pub device_config: Arc, pub device_feature: u64, pub queue_regs: Arc<[Queue]>, - pub ioeventfds: Arc<[(E, bool)]>, + pub ioeventfds: Arc<[E]>, pub shared_mem_regions: Option>, pub waker: Arc, pub event_tx: Sender>, @@ -176,13 +176,8 @@ where }); let queue_regs = queue_regs.collect::>(); - let create_ioeventfd = |index: u16| -> Result<(E, bool)> { - let fd = registry.create()?; - let offloaded = dev.offload_ioeventfd(index, &fd)?; - Ok((fd, offloaded)) - }; let ioeventfds = (0..num_queues) - .map(create_ioeventfd) + .map(|_| registry.create()) .collect::, _>>()?; let shared_mem_regions = dev.shared_mem_regions(); diff --git a/alioth/src/virtio/dev/entropy.rs b/alioth/src/virtio/dev/entropy.rs index 6218a22f..7e9db51f 100644 --- a/alioth/src/virtio/dev/entropy.rs +++ b/alioth/src/virtio/dev/entropy.rs @@ -94,7 +94,7 @@ impl Virtio for Entropy { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/dev/fs.rs b/alioth/src/virtio/dev/fs.rs index dc179ffc..0899da40 100644 --- a/alioth/src/virtio/dev/fs.rs +++ b/alioth/src/virtio/dev/fs.rs @@ -208,7 +208,7 @@ impl Virtio for VuFs { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/dev/net/net.rs b/alioth/src/virtio/dev/net/net.rs index 3afd7983..4451a747 100644 --- a/alioth/src/virtio/dev/net/net.rs +++ b/alioth/src/virtio/dev/net/net.rs @@ -321,7 +321,7 @@ impl Virtio for Net { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/dev/vsock/vhost_vsock.rs b/alioth/src/virtio/dev/vsock/vhost_vsock.rs index 70789b8c..f4f5e220 100644 --- a/alioth/src/virtio/dev/vsock/vhost_vsock.rs +++ b/alioth/src/virtio/dev/vsock/vhost_vsock.rs @@ -149,7 +149,7 @@ impl Virtio for VhostVsock { event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, diff --git a/alioth/src/virtio/pci.rs b/alioth/src/virtio/pci.rs index afc94aaf..cb13ba51 100644 --- a/alioth/src/virtio/pci.rs +++ b/alioth/src/virtio/pci.rs @@ -521,7 +521,7 @@ where R: IoeventFdRegistry, { registry: R, - ioeventfds: Arc<[(R::IoeventFd, bool)]>, + ioeventfds: Arc<[R::IoeventFd]>, } impl MemRegionCallback for IoeventFdCallback @@ -529,7 +529,7 @@ where R: IoeventFdRegistry, { fn mapped(&self, addr: u64) -> mem::Result<()> { - for (q_index, (fd, _)) in self.ioeventfds.iter().enumerate() { + for (q_index, fd) in self.ioeventfds.iter().enumerate() { let base_addr = addr + (12 << 10) + VirtioPciRegister::OFFSET_QUEUE_NOTIFY as u64; let notify_addr = base_addr + (q_index * size_of::()) as u64; self.registry.register(fd, notify_addr, 0, None)?; @@ -539,7 +539,7 @@ where } fn unmapped(&self) -> mem::Result<()> { - for (fd, _) in self.ioeventfds.iter() { + for fd in self.ioeventfds.iter() { self.registry.deregister(fd)?; log::info!("ioeventfd {fd:?} de-registered") } diff --git a/alioth/src/virtio/worker/io_uring.rs b/alioth/src/virtio/worker/io_uring.rs index ed5c279a..b196edf0 100644 --- a/alioth/src/virtio/worker/io_uring.rs +++ b/alioth/src/virtio/worker/io_uring.rs @@ -64,7 +64,7 @@ const TOKEN_QUEUE: u64 = 1 << 62; const TOKEN_DESCRIPTOR: u64 = (1 << 62) | (1 << 61); pub struct IoUring { - queue_ioeventfds: Arc<[(E, bool)]>, + queue_ioeventfds: Arc<[E]>, waker: Arc, waker_token: u64, } @@ -86,7 +86,7 @@ where event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where D: VirtioIoUring, @@ -140,16 +140,14 @@ where queues: &mut [Option], irq_sender: &S, ) -> Result<()> { - context.dev.activate(feature, memory, irq_sender, queues)?; - let queue_submits = queues.iter().map(|_| QueueSubmit::default()).collect(); let mut ring = io_uring::IoUring::new(RING_SIZE as u32)?; let mut queue_count = 0; { let sq = &mut ring.submission(); self.submit_waker(sq)?; - for (index, (fd, offloaded)) in self.queue_ioeventfds.iter().enumerate() { - if *offloaded { + for (index, fd) in self.queue_ioeventfds.iter().enumerate() { + if context.dev.offload_ioeventfd(index as u16, fd)? { continue; } submit_queue_ioeventfd(index as u16, fd, sq)?; @@ -157,6 +155,8 @@ where } } + context.dev.activate(feature, memory, irq_sender, queues)?; + let mut active_ring = ActiveIoUring { ring, submitted_buffers: HashMap::new(), diff --git a/alioth/src/virtio/worker/mio.rs b/alioth/src/virtio/worker/mio.rs index 9c29c078..64f24e2c 100644 --- a/alioth/src/virtio/worker/mio.rs +++ b/alioth/src/virtio/worker/mio.rs @@ -63,7 +63,7 @@ const TOKEN_QUEUE: u64 = 1 << 62; pub struct Mio { poll: Poll, - _queue_ioeventfds: Arc<[(E, bool)]>, + ioeventfds: Arc<[E]>, } impl Mio @@ -75,29 +75,16 @@ where event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[(E, bool)]>, + fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where D: VirtioMio, S: IrqSender, { let poll = Poll::new().context(error::CreatePoll)?; - for (index, (fd, offloaded)) in fds.iter().enumerate() { - if *offloaded { - continue; - } - let token = index as u64 | TOKEN_QUEUE; - poll.registry() - .register( - &mut SourceFd(&fd.as_fd().as_raw_fd()), - Token(token as usize), - Interest::READABLE, - ) - .context(error::EventSource)?; - } let m = Mio { poll, - _queue_ioeventfds: fds, + ioeventfds: fds, }; Worker::spawn(dev, m, event_rx, memory, queue_regs) } @@ -147,6 +134,20 @@ where poll: &mut self.poll, mem: memory, }; + let registry = active_mio.poll.registry(); + for (index, fd) in self.ioeventfds.iter().enumerate() { + if context.dev.offload_ioeventfd(index as u16, fd)? { + continue; + } + let token = index as u64 | TOKEN_QUEUE; + registry + .register( + &mut SourceFd(&fd.as_fd().as_raw_fd()), + Token(token as usize), + Interest::READABLE, + ) + .context(error::EventSource)?; + } context.dev.activate(feature, &mut active_mio)?; 'out: loop { active_mio From 6efdaafe608ede312b87edf33821ccdb22442174 Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Mon, 14 Apr 2025 18:06:34 -0700 Subject: [PATCH 2/3] refactor(virtio): create ioeventfds in transport layer Signed-off-by: Changyuan Lyu --- alioth/src/virtio/dev/balloon.rs | 5 +- alioth/src/virtio/dev/blk.rs | 7 +- alioth/src/virtio/dev/dev.rs | 92 ++++++++++++---------- alioth/src/virtio/dev/entropy.rs | 5 +- alioth/src/virtio/dev/fs.rs | 5 +- alioth/src/virtio/dev/net/net.rs | 7 +- alioth/src/virtio/dev/vsock/vhost_vsock.rs | 5 +- alioth/src/virtio/pci.rs | 46 ++++++++--- alioth/src/virtio/worker/io_uring.rs | 40 +++++----- alioth/src/virtio/worker/mio.rs | 67 ++++++++-------- alioth/src/vm.rs | 1 - 11 files changed, 149 insertions(+), 131 deletions(-) diff --git a/alioth/src/virtio/dev/balloon.rs b/alioth/src/virtio/dev/balloon.rs index a404f933..1eb02e20 100644 --- a/alioth/src/virtio/dev/balloon.rs +++ b/alioth/src/virtio/dev/balloon.rs @@ -201,16 +201,15 @@ impl Virtio for Balloon { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, E: IoeventFd, { - Mio::spawn_worker(self, event_rx, memory, queue_regs, fds) + Mio::spawn_worker(self, event_rx, memory, queue_regs) } fn num_queues(&self) -> u16 { diff --git a/alioth/src/virtio/dev/blk.rs b/alioth/src/virtio/dev/blk.rs index 3e285e01..f0ddedf9 100644 --- a/alioth/src/virtio/dev/blk.rs +++ b/alioth/src/virtio/dev/blk.rs @@ -300,10 +300,9 @@ impl Virtio for Block { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, @@ -311,8 +310,8 @@ impl Virtio for Block { { match self.api { #[cfg(target_os = "linux")] - WorkerApi::IoUring => IoUring::spawn_worker(self, event_rx, memory, queue_regs, fds), - WorkerApi::Mio => Mio::spawn_worker(self, event_rx, memory, queue_regs, fds), + WorkerApi::IoUring => IoUring::spawn_worker(self, event_rx, memory, queue_regs), + WorkerApi::Mio => Mio::spawn_worker(self, event_rx, memory, queue_regs), } } } diff --git a/alioth/src/virtio/dev/dev.rs b/alioth/src/virtio/dev/dev.rs index 4298f62e..ae2f50ba 100644 --- a/alioth/src/virtio/dev/dev.rs +++ b/alioth/src/virtio/dev/dev.rs @@ -33,7 +33,7 @@ use std::thread::JoinHandle; use bitflags::Flags; use snafu::ResultExt; -use crate::hv::{IoeventFd, IoeventFdRegistry}; +use crate::hv::IoeventFd; use crate::mem::emulated::Mmio; use crate::mem::mapped::{Ram, RamBus}; use crate::mem::{LayoutChanged, LayoutUpdated, MemRegion}; @@ -53,10 +53,9 @@ pub trait Virtio: Debug + Send + Sync + 'static { fn feature(&self) -> u64; fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)>; fn shared_mem_regions(&self) -> Option> { None @@ -88,13 +87,25 @@ pub struct Register { const TOKEN_WARKER: u64 = 1 << 63; #[derive(Debug, Clone)] -pub enum WakeEvent +pub struct StartParam where S: IrqSender, + E: IoeventFd, +{ + pub(crate) feature: u64, + pub(crate) irq_sender: Arc, + pub(crate) ioeventfds: Option>, +} + +#[derive(Debug, Clone)] +pub enum WakeEvent +where + S: IrqSender, + E: IoeventFd, { Notify { q_index: u16 }, Shutdown, - Start { feature: u64, irq_sender: Arc }, + Start { param: StartParam }, Reset, } @@ -106,11 +117,12 @@ pub enum WorkerState { } #[derive(Debug)] -pub struct Worker +pub struct Worker where S: IrqSender, + E: IoeventFd, { - context: Context, + context: Context, backend: B, } @@ -125,10 +137,9 @@ where pub device_config: Arc, pub device_feature: u64, pub queue_regs: Arc<[Queue]>, - pub ioeventfds: Arc<[E]>, pub shared_mem_regions: Option>, pub waker: Arc, - pub event_tx: Sender>, + pub event_tx: Sender>, worker_handle: Option>, } @@ -149,16 +160,14 @@ where Ok(()) } - pub fn new( + pub fn new( name: impl Into>, dev: D, memory: Arc, - registry: &R, restricted_memory: bool, ) -> Result where D: Virtio, - R: IoeventFdRegistry, { let name = name.into(); let id = D::DEVICE_ID; @@ -176,14 +185,9 @@ where }); let queue_regs = queue_regs.collect::>(); - let ioeventfds = (0..num_queues) - .map(|_| registry.create()) - .collect::, _>>()?; - let shared_mem_regions = dev.shared_mem_regions(); let (event_tx, event_rx) = mpsc::channel(); - let (handle, waker) = - dev.spawn_worker(event_rx, memory, queue_regs.clone(), ioeventfds.clone())?; + let (handle, waker) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?; log::debug!( "{name}: created with {:x?} {:x?}", VirtioFeature::from_bits_retain(device_feature & !D::Feature::all().bits()), @@ -194,7 +198,6 @@ where id, device_feature, queue_regs, - ioeventfds, worker_handle: Some(handle), event_tx, waker, @@ -220,14 +223,17 @@ where pub trait Backend: Send + 'static { fn register_waker(&mut self, token: u64) -> Result>; fn reset(&self, dev: &mut D) -> Result<()>; - fn event_loop<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn event_loop<'m, S, Q, E>( &mut self, - feature: u64, memory: &'m Ram, - context: &mut Context, + context: &mut Context, queues: &mut [Option], - irq_sender: &S, - ) -> Result<()>; + param: &StartParam, + ) -> Result<()> + where + S: IrqSender, + Q: VirtQueue<'m>, + E: IoeventFd; } pub trait BackendEvent { @@ -241,21 +247,23 @@ pub trait ActiveBackend { } #[derive(Debug)] -pub struct Context +pub struct Context where S: IrqSender, + E: IoeventFd, { pub dev: D, memory: Arc, - event_rx: Receiver>, + event_rx: Receiver>, queue_regs: Arc<[Queue]>, pub state: WorkerState, } -impl Context +impl Context where D: Virtio, S: IrqSender, + E: IoeventFd, { fn handle_wake_events(&mut self, backend: &mut B) -> Result<()> where @@ -281,16 +289,13 @@ where Ok(()) } - fn wait_start(&mut self) -> Option<(u64, Arc)> { + fn wait_start(&mut self) -> Option> { for wake_event in self.event_rx.iter() { match wake_event { WakeEvent::Reset => {} - WakeEvent::Start { - feature, - irq_sender, - } => { + WakeEvent::Start { param } => { self.state = WorkerState::Running; - return Some((feature, irq_sender)); + return Some(param); } WakeEvent::Shutdown => break, WakeEvent::Notify { q_index } => { @@ -317,16 +322,17 @@ where } } -impl Worker +impl Worker where D: Virtio, S: IrqSender, B: Backend, + E: IoeventFd, { pub fn spawn( dev: D, mut backend: B, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, ) -> Result<(JoinHandle<()>, Arc)> { @@ -352,37 +358,37 @@ where fn event_loop<'m, Q>( &mut self, queues: &mut [Option], - irq_sender: &S, - feature: u64, ram: &'m Ram, + param: &StartParam, ) -> Result<()> where Q: VirtQueue<'m>, + E: IoeventFd, { log::debug!( "{}: activated with {:x?} {:x?}", self.context.dev.name(), - VirtioFeature::from_bits_retain(feature & !D::Feature::all().bits()), - D::Feature::from_bits_truncate(feature) + VirtioFeature::from_bits_retain(param.feature & !D::Feature::all().bits()), + D::Feature::from_bits_truncate(param.feature) ); self.backend - .event_loop(feature, ram, &mut self.context, queues, irq_sender) + .event_loop(ram, &mut self.context, queues, param) } fn loop_until_reset(&mut self) -> Result<()> { - let Some((feature, irq_sender)) = self.context.wait_start() else { + let Some(param) = self.context.wait_start() else { return Ok(()); }; let memory = self.context.memory.clone(); let ram = memory.lock_layout(); - let feature = feature & !VirtioFeature::ACCESS_PLATFORM.bits(); + let feature = param.feature & !VirtioFeature::ACCESS_PLATFORM.bits(); let queue_regs = self.context.queue_regs.clone(); if VirtioFeature::from_bits_retain(feature).contains(VirtioFeature::RING_PACKED) { todo!() } else { let new_queue = |reg| SplitQueue::new(reg, &ram, feature); let queues: Result> = queue_regs.iter().map(new_queue).collect(); - self.event_loop(&mut (queues?), &irq_sender, feature, &ram)?; + self.event_loop(&mut (queues?), &ram, ¶m)?; }; self.backend.reset(&mut self.context.dev)?; Ok(()) diff --git a/alioth/src/virtio/dev/entropy.rs b/alioth/src/virtio/dev/entropy.rs index 7e9db51f..85608c76 100644 --- a/alioth/src/virtio/dev/entropy.rs +++ b/alioth/src/virtio/dev/entropy.rs @@ -91,16 +91,15 @@ impl Virtio for Entropy { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, E: IoeventFd, { - Mio::spawn_worker(self, event_rx, memory, queue_regs, fds) + Mio::spawn_worker(self, event_rx, memory, queue_regs) } fn num_queues(&self) -> u16 { diff --git a/alioth/src/virtio/dev/fs.rs b/alioth/src/virtio/dev/fs.rs index 0899da40..83a191fe 100644 --- a/alioth/src/virtio/dev/fs.rs +++ b/alioth/src/virtio/dev/fs.rs @@ -205,16 +205,15 @@ impl Virtio for VuFs { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, E: IoeventFd, { - Mio::spawn_worker(self, event_rx, memory, queue_regs, fds) + Mio::spawn_worker(self, event_rx, memory, queue_regs) } fn offload_ioeventfd(&self, q_index: u16, fd: &E) -> Result diff --git a/alioth/src/virtio/dev/net/net.rs b/alioth/src/virtio/dev/net/net.rs index 4451a747..8f00689e 100644 --- a/alioth/src/virtio/dev/net/net.rs +++ b/alioth/src/virtio/dev/net/net.rs @@ -318,18 +318,17 @@ impl Virtio for Net { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, E: IoeventFd, { match self.api { - WorkerApi::Mio => Mio::spawn_worker(self, event_rx, memory, queue_regs, fds), - WorkerApi::IoUring => IoUring::spawn_worker(self, event_rx, memory, queue_regs, fds), + WorkerApi::Mio => Mio::spawn_worker(self, event_rx, memory, queue_regs), + WorkerApi::IoUring => IoUring::spawn_worker(self, event_rx, memory, queue_regs), } } } diff --git a/alioth/src/virtio/dev/vsock/vhost_vsock.rs b/alioth/src/virtio/dev/vsock/vhost_vsock.rs index f4f5e220..bd0b6d64 100644 --- a/alioth/src/virtio/dev/vsock/vhost_vsock.rs +++ b/alioth/src/virtio/dev/vsock/vhost_vsock.rs @@ -146,16 +146,15 @@ impl Virtio for VhostVsock { fn spawn_worker( self, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where S: IrqSender, E: IoeventFd, { - Mio::spawn_worker(self, event_rx, memory, queue_regs, fds) + Mio::spawn_worker(self, event_rx, memory, queue_regs) } } diff --git a/alioth/src/virtio/pci.rs b/alioth/src/virtio/pci.rs index cb13ba51..e586e0a3 100644 --- a/alioth/src/virtio/pci.rs +++ b/alioth/src/virtio/pci.rs @@ -38,7 +38,7 @@ use crate::pci::{self, Pci, PciBar}; use crate::utils::{ get_atomic_high32, get_atomic_low32, get_high32, get_low32, set_atomic_high32, set_atomic_low32, }; -use crate::virtio::dev::{Register, VirtioDevice, WakeEvent}; +use crate::virtio::dev::{Register, StartParam, VirtioDevice, WakeEvent}; use crate::virtio::queue::Queue; use crate::virtio::worker::Waker; use crate::virtio::{DevStatus, DeviceId, IrqSender, Result, error}; @@ -175,23 +175,26 @@ pub struct VirtioPciRegister { } #[derive(Debug)] -pub struct VirtioPciRegisterMmio +pub struct VirtioPciRegisterMmio where M: MsiSender, + E: IoeventFd, { name: Arc, reg: Register, queues: Arc<[Queue]>, irq_sender: Arc>, - event_tx: Sender>>, + ioeventfds: Option>, + event_tx: Sender, E>>, waker: Arc, } -impl VirtioPciRegisterMmio +impl VirtioPciRegisterMmio where M: MsiSender, + E: IoeventFd, { - fn wake_up_dev(&self, event: WakeEvent>) { + fn wake_up_dev(&self, event: WakeEvent, E>) { let is_start = matches!(event, WakeEvent::Start { .. }); if let Err(e) = self.event_tx.send(event) { log::error!("{}: failed to send event: {e}", self.name); @@ -235,9 +238,10 @@ where } } -impl Mmio for VirtioPciRegisterMmio +impl Mmio for VirtioPciRegisterMmio where M: MsiSender, + E: IoeventFd, { fn size(&self) -> u64 { (size_of::() + size_of::() * self.queues.len()) as u64 @@ -405,10 +409,12 @@ where let old = DevStatus::from_bits_retain(old); if (old ^ status).contains(DevStatus::DRIVER_OK) { let event = if status.contains(DevStatus::DRIVER_OK) { - WakeEvent::Start { + let param = StartParam { feature: reg.driver_feature.load(Ordering::Acquire), irq_sender: self.irq_sender.clone(), - } + ioeventfds: self.ioeventfds.clone(), + }; + WakeEvent::Start { param } } else { self.reset(); WakeEvent::Reset @@ -640,7 +646,7 @@ where { pub dev: VirtioDevice, E>, pub config: EmulatedConfig, - pub registers: Arc>, + pub registers: Arc>, } impl VirtioPciDevice @@ -810,6 +816,17 @@ where .collect(), }; + let maybe_ioeventfds = (0..num_queues) + .map(|_| ioeventfd_reg.create()) + .collect::, _>>(); + let ioeventfds = match maybe_ioeventfds { + Ok(fds) => Some(fds), + Err(e) => { + log::warn!("{}: failed to create ioeventfds: {e:?}", dev.name); + None + } + }; + let registers = Arc::new(VirtioPciRegisterMmio { name: dev.name.clone(), reg: Register { @@ -824,15 +841,18 @@ where msix_table: msix_table.clone(), msi_sender, }), + ioeventfds: ioeventfds.clone(), }); bar0.ranges.push(MemRange::Emulated(msix_table)); bar0.ranges .push(MemRange::Span((12 << 10) - msix_table_size as u64)); bar0.ranges.push(MemRange::Emulated(registers.clone())); - bar0.callbacks.lock().push(Box::new(IoeventFdCallback { - registry: ioeventfd_reg, - ioeventfds: dev.ioeventfds.clone(), - })); + if let Some(ioeventfds) = ioeventfds { + bar0.callbacks.lock().push(Box::new(IoeventFdCallback { + registry: ioeventfd_reg, + ioeventfds, + })); + } if device_config.size() > 0 { bar0.ranges.push(MemRange::Emulated(device_config)) } diff --git a/alioth/src/virtio/worker/io_uring.rs b/alioth/src/virtio/worker/io_uring.rs index b196edf0..d56ec5d8 100644 --- a/alioth/src/virtio/worker/io_uring.rs +++ b/alioth/src/virtio/worker/io_uring.rs @@ -25,7 +25,8 @@ use io_uring::{SubmissionQueue, opcode, types}; use crate::hv::IoeventFd; use crate::mem::mapped::{Ram, RamBus}; use crate::virtio::dev::{ - ActiveBackend, Backend, BackendEvent, Context, Virtio, WakeEvent, Worker, WorkerState, + ActiveBackend, Backend, BackendEvent, Context, StartParam, Virtio, WakeEvent, Worker, + WorkerState, }; use crate::virtio::queue::{Descriptor, Queue, VirtQueue}; use crate::virtio::worker::Waker; @@ -63,16 +64,12 @@ pub trait VirtioIoUring: Virtio { const TOKEN_QUEUE: u64 = 1 << 62; const TOKEN_DESCRIPTOR: u64 = (1 << 62) | (1 << 61); -pub struct IoUring { - queue_ioeventfds: Arc<[E]>, +pub struct IoUring { waker: Arc, waker_token: u64, } -impl IoUring -where - E: IoeventFd, -{ +impl IoUring { fn submit_waker(&self, sq: &mut SubmissionQueue) -> Result<()> { let fd = types::Fd(self.waker.0.as_raw_fd()); let poll = opcode::PollAdd::new(fd, libc::EPOLLIN as _).multi(true); @@ -81,12 +78,11 @@ where Ok(()) } - pub fn spawn_worker( + pub fn spawn_worker( dev: D, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where D: VirtioIoUring, @@ -95,7 +91,6 @@ where { let waker = Waker::new_eventfd()?; let ring = IoUring { - queue_ioeventfds: fds, waker: Arc::new(waker), waker_token: 0, }; @@ -118,10 +113,9 @@ struct QueueSubmit { count: u16, } -impl Backend for IoUring +impl Backend for IoUring where D: VirtioIoUring, - E: IoeventFd, { fn register_waker(&mut self, token: u64) -> Result> { self.waker_token = token; @@ -132,21 +126,25 @@ where Ok(()) } - fn event_loop<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn event_loop<'m, S, Q, E>( &mut self, - feature: u64, memory: &'m Ram, - context: &mut Context, + context: &mut Context, queues: &mut [Option], - irq_sender: &S, - ) -> Result<()> { + param: &StartParam, + ) -> Result<()> + where + S: IrqSender, + Q: VirtQueue<'m>, + E: IoeventFd, + { let queue_submits = queues.iter().map(|_| QueueSubmit::default()).collect(); let mut ring = io_uring::IoUring::new(RING_SIZE as u32)?; let mut queue_count = 0; - { + if let Some(fds) = ¶m.ioeventfds { let sq = &mut ring.submission(); self.submit_waker(sq)?; - for (index, fd) in self.queue_ioeventfds.iter().enumerate() { + for (index, fd) in fds.iter().enumerate() { if context.dev.offload_ioeventfd(index as u16, fd)? { continue; } @@ -155,6 +153,8 @@ where } } + let irq_sender = &*param.irq_sender; + let feature = param.feature; context.dev.activate(feature, memory, irq_sender, queues)?; let mut active_ring = ActiveIoUring { diff --git a/alioth/src/virtio/worker/mio.rs b/alioth/src/virtio/worker/mio.rs index 64f24e2c..645cdf69 100644 --- a/alioth/src/virtio/worker/mio.rs +++ b/alioth/src/virtio/worker/mio.rs @@ -25,7 +25,8 @@ use snafu::ResultExt; use crate::hv::IoeventFd; use crate::mem::mapped::{Ram, RamBus}; use crate::virtio::dev::{ - ActiveBackend, Backend, BackendEvent, Context, Virtio, WakeEvent, Worker, WorkerState, + ActiveBackend, Backend, BackendEvent, Context, StartParam, Virtio, WakeEvent, Worker, + WorkerState, }; use crate::virtio::queue::{Queue, VirtQueue}; use crate::virtio::worker::Waker; @@ -61,39 +62,31 @@ impl BackendEvent for Event { const TOKEN_QUEUE: u64 = 1 << 62; -pub struct Mio { +pub struct Mio { poll: Poll, - ioeventfds: Arc<[E]>, } -impl Mio -where - E: IoeventFd, -{ - pub fn spawn_worker( +impl Mio { + pub fn spawn_worker( dev: D, - event_rx: Receiver>, + event_rx: Receiver>, memory: Arc, queue_regs: Arc<[Queue]>, - fds: Arc<[E]>, ) -> Result<(JoinHandle<()>, Arc)> where D: VirtioMio, S: IrqSender, + E: IoeventFd, { let poll = Poll::new().context(error::CreatePoll)?; - let m = Mio { - poll, - ioeventfds: fds, - }; + let m = Mio { poll }; Worker::spawn(dev, m, event_rx, memory, queue_regs) } } -impl Backend for Mio +impl Backend for Mio where D: VirtioMio, - E: IoeventFd, { fn register_waker(&mut self, token: u64) -> Result> { #[cfg(target_os = "linux")] @@ -119,36 +112,42 @@ where Ok(()) } - fn event_loop<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn event_loop<'m, S, Q, E>( &mut self, - feature: u64, memory: &'m Ram, - context: &mut Context, + context: &mut Context, queues: &mut [Option], - irq_sender: &S, - ) -> Result<()> { + param: &StartParam, + ) -> Result<()> + where + S: IrqSender, + Q: VirtQueue<'m>, + E: IoeventFd, + { let mut events = Events::with_capacity(128); let mut active_mio = ActiveMio { queues, - irq_sender, + irq_sender: &*param.irq_sender, poll: &mut self.poll, mem: memory, }; let registry = active_mio.poll.registry(); - for (index, fd) in self.ioeventfds.iter().enumerate() { - if context.dev.offload_ioeventfd(index as u16, fd)? { - continue; + if let Some(fds) = ¶m.ioeventfds { + for (index, fd) in fds.iter().enumerate() { + if context.dev.offload_ioeventfd(index as u16, fd)? { + continue; + } + let token = index as u64 | TOKEN_QUEUE; + registry + .register( + &mut SourceFd(&fd.as_fd().as_raw_fd()), + Token(token as usize), + Interest::READABLE, + ) + .context(error::EventSource)?; } - let token = index as u64 | TOKEN_QUEUE; - registry - .register( - &mut SourceFd(&fd.as_fd().as_raw_fd()), - Token(token as usize), - Interest::READABLE, - ) - .context(error::EventSource)?; } - context.dev.activate(feature, &mut active_mio)?; + context.dev.activate(param.feature, &mut active_mio)?; 'out: loop { active_mio .poll diff --git a/alioth/src/vm.rs b/alioth/src/vm.rs index 01663393..7a2c4d52 100644 --- a/alioth/src/vm.rs +++ b/alioth/src/vm.rs @@ -249,7 +249,6 @@ where name.clone(), dev, self.board.memory.ram_bus(), - ®istry, self.board.config.coco.is_some(), )?; let msi_sender = self.board.vm.create_msi_sender( From f4caab7fc050a2c508439e9b6208cf7668582698 Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Thu, 17 Apr 2025 23:11:29 -0700 Subject: [PATCH 3/3] refactor(virtio): offload ioeventfds in activate() Signed-off-by: Changyuan Lyu --- alioth/src/virtio/dev/balloon.rs | 33 +++++--- alioth/src/virtio/dev/blk.rs | 50 ++++++++----- alioth/src/virtio/dev/dev.rs | 5 +- alioth/src/virtio/dev/entropy.rs | 33 +++++--- alioth/src/virtio/dev/fs.rs | 44 +++++++---- alioth/src/virtio/dev/net/net.rs | 50 +++++++++---- alioth/src/virtio/dev/vsock/vhost_vsock.rs | 87 ++++++++++++---------- alioth/src/virtio/worker/io_uring.rs | 61 ++++++++------- alioth/src/virtio/worker/mio.rs | 63 +++++++++------- 9 files changed, 262 insertions(+), 164 deletions(-) diff --git a/alioth/src/virtio/dev/balloon.rs b/alioth/src/virtio/dev/balloon.rs index 1eb02e20..b616db94 100644 --- a/alioth/src/virtio/dev/balloon.rs +++ b/alioth/src/virtio/dev/balloon.rs @@ -226,11 +226,16 @@ impl Virtio for Balloon { } impl VirtioMio for Balloon { - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let feature = BalloonFeature::from_bits_retain(feature); self.queues[0] = BalloonQueue::Inflate; self.queues[1] = BalloonQueue::Deflate; @@ -249,11 +254,16 @@ impl VirtioMio for Balloon { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let Some(Some(queue)) = active_mio.queues.get_mut(index as usize) else { log::error!("{}: invalid queue index {index}", self.name); return Ok(()); @@ -287,11 +297,16 @@ impl VirtioMio for Balloon { }) } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, _event: &Event, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { Ok(()) } diff --git a/alioth/src/virtio/dev/blk.rs b/alioth/src/virtio/dev/blk.rs index f0ddedf9..89a558d9 100644 --- a/alioth/src/virtio/dev/blk.rs +++ b/alioth/src/virtio/dev/blk.rs @@ -37,14 +37,12 @@ use snafu::ResultExt; use zerocopy::{FromBytes, FromZeros, Immutable, IntoBytes}; use crate::hv::IoeventFd; -#[cfg(target_os = "linux")] -use crate::mem::mapped::Ram; use crate::mem::mapped::RamBus; use crate::virtio::dev::{DevParam, Virtio, WakeEvent}; use crate::virtio::queue::handlers::handle_desc; use crate::virtio::queue::{Descriptor, Queue, VirtQueue}; #[cfg(target_os = "linux")] -use crate::virtio::worker::io_uring::{BufferAction, IoUring, VirtioIoUring}; +use crate::virtio::worker::io_uring::{ActiveIoUring, BufferAction, IoUring, VirtioIoUring}; use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio}; use crate::virtio::worker::{Waker, WorkerApi}; use crate::virtio::{DeviceId, FEATURE_BUILT_IN, IrqSender, Result, error}; @@ -319,27 +317,42 @@ impl Virtio for Block { impl VirtioMio for Block { fn reset(&mut self, _registry: &Registry) {} - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, _feature: u64, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { Ok(()) } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, _event: &Event, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let Some(Some(queue)) = active_mio.queues.get_mut(index as usize) else { log::error!("{}: invalid queue index {index}", self.name); return Ok(()); @@ -399,13 +412,16 @@ impl VirtioMio for Block { #[cfg(target_os = "linux")] impl VirtioIoUring for Block { - fn activate<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn activate<'a, 'm, Q, S, E>( &mut self, _feature: u64, - _memory: &Ram, - _irq_sender: &S, - _queues: &mut [Option], - ) -> Result<()> { + _ring: &mut ActiveIoUring<'a, 'm, Q, S, E>, + ) -> Result<()> + where + S: IrqSender, + Q: VirtQueue<'m>, + E: IoeventFd, + { Ok(()) } diff --git a/alioth/src/virtio/dev/dev.rs b/alioth/src/virtio/dev/dev.rs index ae2f50ba..4ceebcf9 100644 --- a/alioth/src/virtio/dev/dev.rs +++ b/alioth/src/virtio/dev/dev.rs @@ -60,10 +60,7 @@ pub trait Virtio: Debug + Send + Sync + 'static { fn shared_mem_regions(&self) -> Option> { None } - fn offload_ioeventfd(&self, _qindex: u16, _fd: &E) -> Result - where - E: IoeventFd, - { + fn ioeventfd_offloaded(&self, _q_index: u16) -> Result { Ok(false) } fn mem_update_callback(&self) -> Option> { diff --git a/alioth/src/virtio/dev/entropy.rs b/alioth/src/virtio/dev/entropy.rs index 85608c76..b433c913 100644 --- a/alioth/src/virtio/dev/entropy.rs +++ b/alioth/src/virtio/dev/entropy.rs @@ -116,19 +116,29 @@ impl Virtio for Entropy { } impl VirtioMio for Entropy { - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, _feature: u64, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let Some(Some(queue)) = active_mio.queues.get_mut(index as usize) else { log::error!("{}: invalid queue index {index}", self.name); return Ok(()); @@ -142,11 +152,16 @@ impl VirtioMio for Entropy { ) } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, _event: &Event, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { Ok(()) } diff --git a/alioth/src/virtio/dev/fs.rs b/alioth/src/virtio/dev/fs.rs index 83a191fe..3610f589 100644 --- a/alioth/src/virtio/dev/fs.rs +++ b/alioth/src/virtio/dev/fs.rs @@ -216,13 +216,8 @@ impl Virtio for VuFs { Mio::spawn_worker(self, event_rx, memory, queue_regs) } - fn offload_ioeventfd(&self, q_index: u16, fd: &E) -> Result - where - E: IoeventFd, - { + fn ioeventfd_offloaded(&self, q_index: u16) -> Result { if q_index < self.num_queues { - self.vu_dev - .set_virtq_kick(&(q_index as u64), fd.as_fd().as_raw_fd())?; Ok(true) } else { error::InvalidQueueIndex { index: q_index }.fail() @@ -245,13 +240,22 @@ impl Virtio for VuFs { } impl VirtioMio for VuFs { - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { self.vu_dev .set_features(&(feature | VirtioFeature::VHOST_PROTOCOL.bits()))?; + for (index, fd) in active_mio.ioeventfds.iter().enumerate() { + self.vu_dev + .set_virtq_kick(&(index as u64), fd.as_fd().as_raw_fd())?; + } for (index, queue) in active_mio.queues.iter().enumerate() { let Some(queue) = queue else { continue; @@ -318,11 +322,16 @@ impl VirtioMio for VuFs { Ok(()) } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, event: &Event, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let q_index = event.token().0; if q_index < active_mio.queues.len() { return vu_error::QueueErr { @@ -404,11 +413,16 @@ impl VirtioMio for VuFs { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { unreachable!( "{}: queue {index} notification should go to vhost-user backend", self.name diff --git a/alioth/src/virtio/dev/net/net.rs b/alioth/src/virtio/dev/net/net.rs index 8f00689e..077303cc 100644 --- a/alioth/src/virtio/dev/net/net.rs +++ b/alioth/src/virtio/dev/net/net.rs @@ -39,12 +39,12 @@ use serde_aco::Help; use zerocopy::{FromBytes, Immutable, IntoBytes}; use crate::hv::IoeventFd; -use crate::mem::mapped::{Ram, RamBus}; +use crate::mem::mapped::RamBus; use crate::net::MacAddr; use crate::virtio::dev::{DevParam, DeviceId, Result, Virtio, WakeEvent}; use crate::virtio::queue::handlers::{handle_desc, queue_to_writer, reader_to_queue}; use crate::virtio::queue::{Descriptor, Queue, VirtQueue}; -use crate::virtio::worker::io_uring::{BufferAction, IoUring, VirtioIoUring}; +use crate::virtio::worker::io_uring::{ActiveIoUring, BufferAction, IoUring, VirtioIoUring}; use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio}; use crate::virtio::worker::{Waker, WorkerApi}; use crate::virtio::{FEATURE_BUILT_IN, IrqSender, error}; @@ -339,11 +339,16 @@ impl VirtioMio for Net { let _ = registry.deregister(&mut SourceFd(&self.tap_sockets[0].as_raw_fd())); } - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { self.driver_feature = NetFeature::from_bits_retain(feature); let socket = &mut self.tap_sockets[0]; enable_tap_offload(socket, self.driver_feature)?; @@ -355,11 +360,16 @@ impl VirtioMio for Net { Ok(()) } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, event: &Event, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let token = event.token().0; let irq_sender = active_mio.irq_sender; if event.is_readable() { @@ -389,11 +399,16 @@ impl VirtioMio for Net { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let Some(Some(queue)) = active_mio.queues.get_mut(index as usize) else { log::error!("{}: invalid queue index {index}", self.name); return Ok(()); @@ -420,13 +435,16 @@ impl VirtioMio for Net { } impl VirtioIoUring for Net { - fn activate<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - _memory: &Ram, - _irq_sender: &S, - _queues: &mut [Option], - ) -> Result<()> { + _ring: &mut ActiveIoUring<'a, 'm, Q, S, E>, + ) -> Result<()> + where + S: IrqSender, + Q: VirtQueue<'m>, + E: IoeventFd, + { self.driver_feature = NetFeature::from_bits_retain(feature); let socket = &mut self.tap_sockets[0]; enable_tap_offload(socket, self.driver_feature)?; diff --git a/alioth/src/virtio/dev/vsock/vhost_vsock.rs b/alioth/src/virtio/dev/vsock/vhost_vsock.rs index bd0b6d64..a441b508 100644 --- a/alioth/src/virtio/dev/vsock/vhost_vsock.rs +++ b/alioth/src/virtio/dev/vsock/vhost_vsock.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::iter::zip; use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::path::PathBuf; use std::sync::Arc; @@ -122,18 +121,9 @@ impl Virtio for VhostVsock { self.features } - fn offload_ioeventfd(&self, q_index: u16, fd: &E) -> Result - where - E: crate::hv::IoeventFd, - { + fn ioeventfd_offloaded(&self, q_index: u16) -> Result { match q_index { - 0 | 1 => { - self.vhost_dev.set_virtq_kick(&VirtqFile { - index: q_index as _, - fd: fd.as_fd().as_raw_fd(), - })?; - Ok(true) - } + 0 | 1 => Ok(true), _ => Ok(false), } } @@ -159,15 +149,25 @@ impl Virtio for VhostVsock { } impl VirtioMio for VhostVsock { - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { self.vhost_dev.set_features(&feature)?; - for (index, (queue, error_fd)) in - zip(active_mio.queues.iter().take(2), self.error_fds.iter_mut()).enumerate() - { + for (index, fd) in active_mio.ioeventfds.iter().take(2).enumerate() { + let kick = VirtqFile { + index: index as u32, + fd: fd.as_fd().as_raw_fd(), + }; + self.vhost_dev.set_virtq_kick(&kick)?; + } + for (index, queue) in active_mio.queues.iter().take(2).enumerate() { let Some(queue) = queue else { continue; }; @@ -176,21 +176,8 @@ impl VirtioMio for VhostVsock { let fd = active_mio.irq_sender.queue_irqfd(index as _)?; self.vhost_dev.set_virtq_call(&VirtqFile { index, fd })?; - let err_fd = - unsafe { OwnedFd::from_raw_fd(ffi!(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))?) }; - self.vhost_dev.set_virtq_err(&VirtqFile { - index, - fd: err_fd.as_raw_fd(), - })?; - active_mio.poll.registry().register( - &mut SourceFd(&err_fd.as_raw_fd()), - Token(index as _), - Interest::READABLE, - )?; - *error_fd = Some(err_fd); - self.vhost_dev.set_virtq_num(&VirtqState { - index: index as _, + index, val: reg.size.load(Ordering::Acquire) as _, })?; self.vhost_dev @@ -206,6 +193,20 @@ impl VirtioMio for VhostVsock { }; self.vhost_dev.set_virtq_addr(&virtq_addr)?; } + for (index, fd) in self.error_fds.iter_mut().enumerate() { + let err_fd = + unsafe { OwnedFd::from_raw_fd(ffi!(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))?) }; + self.vhost_dev.set_virtq_err(&VirtqFile { + index: index as u32, + fd: err_fd.as_raw_fd(), + })?; + active_mio.poll.registry().register( + &mut SourceFd(&err_fd.as_raw_fd()), + Token(index as _), + Interest::READABLE, + )?; + *fd = Some(err_fd); + } self.vhost_dev.vsock_set_running(true)?; Ok(()) } @@ -229,11 +230,16 @@ impl VirtioMio for VhostVsock { } } - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, event: &Event, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { let q_index = event.token(); error::VhostQueueErr { dev: "vsock", @@ -243,11 +249,16 @@ impl VirtioMio for VhostVsock { Ok(()) } - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - _active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()> { + _active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd, + { match index { 0 | 1 => unreachable!("{}: queue 0 and 1 are offloaded to kernel", self.name), 2 => log::info!("{}: event queue buffer available", self.name), diff --git a/alioth/src/virtio/worker/io_uring.rs b/alioth/src/virtio/worker/io_uring.rs index d56ec5d8..2996f8ee 100644 --- a/alioth/src/virtio/worker/io_uring.rs +++ b/alioth/src/virtio/worker/io_uring.rs @@ -38,13 +38,15 @@ pub enum BufferAction { } pub trait VirtioIoUring: Virtio { - fn activate<'m, S: IrqSender, Q: VirtQueue<'m>>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - memory: &Ram, - irq_sender: &S, - queues: &mut [Option], - ) -> Result<()>; + ring: &mut ActiveIoUring<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd; fn handle_buffer( &mut self, @@ -139,33 +141,29 @@ where E: IoeventFd, { let queue_submits = queues.iter().map(|_| QueueSubmit::default()).collect(); - let mut ring = io_uring::IoUring::new(RING_SIZE as u32)?; - let mut queue_count = 0; + let mut active_ring = ActiveIoUring { + ring: io_uring::IoUring::new(RING_SIZE as u32)?, + submitted_buffers: HashMap::new(), + shared_count: RING_SIZE - 1, + irq_sender: &*param.irq_sender, + ioeventfds: param.ioeventfds.as_deref().unwrap_or(&[]), + mem: memory, + queues, + queue_submits, + }; + context.dev.activate(param.feature, &mut active_ring)?; if let Some(fds) = ¶m.ioeventfds { - let sq = &mut ring.submission(); + let sq = &mut active_ring.ring.submission(); self.submit_waker(sq)?; for (index, fd) in fds.iter().enumerate() { - if context.dev.offload_ioeventfd(index as u16, fd)? { + if context.dev.ioeventfd_offloaded(index as u16)? { continue; } submit_queue_ioeventfd(index as u16, fd, sq)?; - queue_count += 1; + active_ring.shared_count -= QUEUE_RESERVE_SIZE + 1; } } - let irq_sender = &*param.irq_sender; - let feature = param.feature; - context.dev.activate(feature, memory, irq_sender, queues)?; - - let mut active_ring = ActiveIoUring { - ring, - submitted_buffers: HashMap::new(), - shared_count: RING_SIZE - 1 - queue_count * (QUEUE_RESERVE_SIZE + 1), - irq_sender, - queues, - queue_submits, - }; - 'out: loop { active_ring.ring.submit_and_wait(1)?; loop { @@ -182,13 +180,12 @@ where } } -struct ActiveIoUring<'a, 'm, Q, S> -where - Q: VirtQueue<'m>, -{ +pub struct ActiveIoUring<'a, 'm, Q, S, E> { ring: io_uring::IoUring, - queues: &'a mut [Option], - irq_sender: &'a S, + pub queues: &'a mut [Option], + pub irq_sender: &'a S, + pub ioeventfds: &'a [E], + pub mem: &'m Ram, submitted_buffers: HashMap>, shared_count: u16, queue_submits: Box<[QueueSubmit]>, @@ -207,10 +204,11 @@ where Ok(()) } -impl<'m, Q, S> ActiveIoUring<'_, 'm, Q, S> +impl<'m, Q, S, E> ActiveIoUring<'_, 'm, Q, S, E> where Q: VirtQueue<'m>, S: IrqSender, + E: IoeventFd, { fn submit_buffers(&mut self, dev: &mut D, index: u16) -> Result<()> where @@ -263,11 +261,12 @@ where } } -impl<'m, Q, S, D> ActiveBackend for ActiveIoUring<'_, 'm, Q, S> +impl<'m, D, Q, S, E> ActiveBackend for ActiveIoUring<'_, 'm, Q, S, E> where D: VirtioIoUring, Q: VirtQueue<'m>, S: IrqSender, + E: IoeventFd, { type Event = Cqe; diff --git a/alioth/src/virtio/worker/mio.rs b/alioth/src/virtio/worker/mio.rs index 645cdf69..442b5057 100644 --- a/alioth/src/virtio/worker/mio.rs +++ b/alioth/src/virtio/worker/mio.rs @@ -33,23 +33,35 @@ use crate::virtio::worker::Waker; use crate::virtio::{IrqSender, Result, error}; pub trait VirtioMio: Virtio { - fn activate<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn activate<'a, 'm, Q, S, E>( &mut self, feature: u64, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()>; + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd; - fn handle_queue<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_queue<'a, 'm, Q, S, E>( &mut self, index: u16, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()>; + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd; - fn handle_event<'a, 'm, Q: VirtQueue<'m>, S: IrqSender>( + fn handle_event<'a, 'm, Q, S, E>( &mut self, event: &Event, - active_mio: &mut ActiveMio<'a, 'm, Q, S>, - ) -> Result<()>; + active_mio: &mut ActiveMio<'a, 'm, Q, S, E>, + ) -> Result<()> + where + Q: VirtQueue<'m>, + S: IrqSender, + E: IoeventFd; fn reset(&mut self, registry: &Registry); } @@ -128,26 +140,25 @@ where let mut active_mio = ActiveMio { queues, irq_sender: &*param.irq_sender, + ioeventfds: param.ioeventfds.as_deref().unwrap_or(&[]), poll: &mut self.poll, mem: memory, }; + context.dev.activate(param.feature, &mut active_mio)?; let registry = active_mio.poll.registry(); - if let Some(fds) = ¶m.ioeventfds { - for (index, fd) in fds.iter().enumerate() { - if context.dev.offload_ioeventfd(index as u16, fd)? { - continue; - } - let token = index as u64 | TOKEN_QUEUE; - registry - .register( - &mut SourceFd(&fd.as_fd().as_raw_fd()), - Token(token as usize), - Interest::READABLE, - ) - .context(error::EventSource)?; + for (index, fd) in active_mio.ioeventfds.iter().enumerate() { + if context.dev.ioeventfd_offloaded(index as u16)? { + continue; } + let token = index as u64 | TOKEN_QUEUE; + registry + .register( + &mut SourceFd(&fd.as_fd().as_raw_fd()), + Token(token as usize), + Interest::READABLE, + ) + .context(error::EventSource)?; } - context.dev.activate(param.feature, &mut active_mio)?; 'out: loop { active_mio .poll @@ -163,18 +174,20 @@ where } } -pub struct ActiveMio<'a, 'm, Q, S> { +pub struct ActiveMio<'a, 'm, Q, S, E> { pub queues: &'a mut [Option], pub irq_sender: &'a S, + pub ioeventfds: &'a [E], pub poll: &'a mut Poll, pub mem: &'m Ram, } -impl<'m, Q, S, D> ActiveBackend for ActiveMio<'_, 'm, Q, S> +impl<'m, D, Q, S, E> ActiveBackend for ActiveMio<'_, 'm, Q, S, E> where D: VirtioMio, Q: VirtQueue<'m>, S: IrqSender, + E: IoeventFd, { type Event = Event;