From 4beca26fea92a13fb8da3b4b5af56939464d5cfe Mon Sep 17 00:00:00 2001 From: Luca Barbato Date: Mon, 30 Dec 2024 12:38:43 +0100 Subject: [PATCH 1/2] Simplify the event stream --- Cargo.toml | 3 ++- src/stream.rs | 30 ++++++++---------------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9069e7f..d63a1e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,14 @@ anyhow = "1.0.56" tracing = "0.1.32" tracing-subscriber = { version = "0.3.9", features = ["env-filter"] } netlink-sys = { version = "0.8.5", features = ["tokio_socket"] } -futures-util = "0.3.21" tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread", "sync", "fs"] } fork = "0.1.19" walkdir = "2.3.2" clap = { version = "4.5.1", features = ["derive", "wrap_help"] } nix = { version = "0.27.1", features = ["user", "fs"] } bytes = "1.5.0" +async-stream = "0.3.6" +futures-util = "0.3.31" [dev-dependencies] regex = "1.5.5" diff --git a/src/stream.rs b/src/stream.rs index 3a21d9b..674dcaa 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,11 +1,9 @@ use std::process; use anyhow::anyhow; - -use futures_util::stream::{unfold, Stream}; - +use async_stream::try_stream; +use futures_util::Stream; use kobject_uevent::UEvent; - use netlink_sys::{ protocols::NETLINK_KOBJECT_UEVENT, AsyncSocket, AsyncSocketExt, SocketAddr, TokioSocket, }; @@ -20,22 +18,10 @@ pub fn uevents() -> anyhow::Result>> { .bind(&sa) .map_err(|e| anyhow!("Socket bind error: {}", e))?; - Ok(unfold( - (socket, bytes::BytesMut::with_capacity(1024 * 8)), - |(mut socket, mut buf)| async move { - buf.clear(); - match socket.recv_from(&mut buf).await { - Ok(_addr) => { - if buf.len() == 0 { - return None; - } - } - Err(e) => { - return Some((Err(anyhow!("Socket receive error: {}", e)), (socket, buf))); - } - }; - - Some((UEvent::from_netlink_packet(&buf), (socket, buf))) - }, - )) + Ok(try_stream! { + loop { + let (buf, _sock) = socket.recv_from_full().await?; + yield UEvent::from_netlink_packet(&buf)?; + } + }) } From 39e351e6a07d50583992eb485b9be96e58f6c98a Mon Sep 17 00:00:00 2001 From: Luca Barbato Date: Mon, 30 Dec 2024 13:14:32 +0100 Subject: [PATCH 2/2] Avoid map(await { } The result of the map was not awaited. --- src/bin/mdev.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/bin/mdev.rs b/src/bin/mdev.rs index 973af3f..15c96f9 100644 --- a/src/bin/mdev.rs +++ b/src/bin/mdev.rs @@ -180,23 +180,26 @@ impl Opt { mdev::stream::uevents()? .for_each(|ev| async { info!("event {:?}", ev); - let result = ev.map(|ev| async { - let result = - react_to_event(&ev.devpath, &ev.env, ev.action, conf, &self.devpath) - .await; - if let Some(rebroadcast_sender) = &rebroadcast_sender { - if rebroadcast_sender - .send(RebroadcastMessage::Event(ev)) - .await - .is_err() + + match ev { + Ok(ev) => { + if let Err(e) = + react_to_event(&ev.devpath, &ev.env, ev.action, conf, &self.devpath) + .await { - panic!("rebroadcaster channel is closed"); + warn!("{e}"); + } + if let Some(rebroadcast_sender) = &rebroadcast_sender { + if rebroadcast_sender + .send(RebroadcastMessage::Event(ev)) + .await + .is_err() + { + warn!("rebroadcaster channel is closed"); + } } } - result - }); - if let Err(e) = result { - warn!("{}", e); + Err(e) => warn!("{}", e), } }) .await;