From 2dc33970525226cc0eaa3519fff0134046b32ae1 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Sat, 14 Feb 2026 19:00:21 +0900 Subject: [PATCH 1/2] perf: replace pool hasher with xxh3 --- Cargo.lock | 15 +++-- Cargo.toml | 1 + notify/Cargo.toml | 1 + notify/src/poll.rs | 139 +++++++++++++++++++++++++++++---------------- notify/src/test.rs | 2 +- 5 files changed, 103 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1113d7d..92f6650c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -687,6 +687,7 @@ dependencies = [ "trash", "walkdir", "windows-sys 0.61.2", + "xxhash-rust", ] [[package]] @@ -987,7 +988,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1108,7 +1109,7 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1333,7 +1334,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1627,6 +1628,12 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yansi" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 8235c9c7..585404f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,3 +50,4 @@ tempfile = "3.10.0" trash = "5.2.2" walkdir = "2.4.0" windows-sys = "0.61.0" +xxhash-rust = { version = "0.8.15", features = ["xxh3"] } diff --git a/notify/Cargo.toml b/notify/Cargo.toml index 670d1792..884b915e 100644 --- a/notify/Cargo.toml +++ b/notify/Cargo.toml @@ -35,6 +35,7 @@ log.workspace = true walkdir.workspace = true futures = { workspace = true, optional = true } tokio = { workspace = true, optional = true } +xxhash-rust.workspace = true [target.'cfg(any(target_os="linux", target_os="android"))'.dependencies] inotify = { workspace = true, default-features = false } diff --git a/notify/src/poll.rs b/notify/src/poll.rs index b14b396f..bd2f95f3 100644 --- a/notify/src/poll.rs +++ b/notify/src/poll.rs @@ -15,6 +15,11 @@ use std::{ time::Duration, }; +pub(crate) enum PollMessage { + Poll, + PollAndWait(Sender>), +} + /// Event sent for registered handlers on initial directory scans pub type ScanEvent = crate::Result; @@ -69,15 +74,15 @@ mod data { use notify_types::event::EventKindMask; use std::{ cell::RefCell, - collections::{hash_map::RandomState, HashMap}, + collections::HashMap, fmt::{self, Debug}, fs::{self, File, Metadata}, - hash::{BuildHasher, Hasher}, io::{self, Read}, path::{Path, PathBuf}, time::Instant, }; use walkdir::WalkDir; + use xxhash_rust::xxh3::Xxh3Default; use super::ScanEventHandler; @@ -92,10 +97,7 @@ mod data { pub(super) struct DataBuilder { emitter: EventEmitter, scan_emitter: Option>>, - - // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault - // in future. - build_hasher: Option, + compare_contents: bool, // current timestamp for building Data. now: Instant, @@ -124,7 +126,7 @@ mod data { Self { emitter: EventEmitter::new(event_handler, event_kinds), scan_emitter, - build_hasher: compare_content.then(RandomState::default), + compare_contents: compare_content, now: Instant::now(), } } @@ -156,7 +158,7 @@ mod data { impl Debug for DataBuilder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("DataBuilder") - .field("build_hasher", &self.build_hasher) + .field("compare_contents", &self.compare_contents) .field("now", &self.now) .finish() } @@ -366,38 +368,16 @@ mod data { PathData { mtime: metadata.modified().map_or(0, system_time_to_seconds), - hash: data_builder - .build_hasher - .as_ref() - .filter(|_| metadata.is_file()) - .and_then(|build_hasher| { - Self::get_content_hash(build_hasher, meta_path.path()).ok() - }), + hash: if data_builder.compare_contents && metadata.is_file() { + content_hash(meta_path.path()).ok() + } else { + None + }, last_check: data_builder.now, } } - /// Get hash value for the data content in given file `path`. - fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result { - let mut hasher = build_hasher.build_hasher(); - let mut file = File::open(path)?; - let mut buf = [0; 512]; - - loop { - let n = match file.read(&mut buf) { - Ok(0) => break, - Ok(len) => len, - Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - }; - - hasher.write(&buf[..n]); - } - - Ok(hasher.finish()) - } - /// Get [`Event`] by compare two optional [`PathData`]. fn compare_to_event

( path: P, @@ -407,6 +387,11 @@ mod data { where P: Into, { + Self::compare_to_kind(old, new) + .map(|event_kind| Event::new(event_kind).add_path(path.into())) + } + + fn compare_to_kind(old: Option<&PathData>, new: Option<&PathData>) -> Option { match (old, new) { (Some(old), Some(new)) => { if new.mtime > old.mtime { @@ -423,10 +408,29 @@ mod data { (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)), (None, None) => None, } - .map(|event_kind| Event::new(event_kind).add_path(path.into())) } } + /// Get hash value for the data content in given file `path`. + pub(super) fn content_hash(path: &Path) -> io::Result { + let mut hasher = Xxh3Default::new(); + let mut file = File::open(path)?; + let mut buf = [0u8; 8 * 1024]; + + loop { + let n = match file.read(&mut buf) { + Ok(0) => break, + Ok(len) => len, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + + hasher.update(&buf[..n]); + } + + Ok(hasher.digest()) + } + /// Compose path and its metadata. /// /// This data structure designed for make sure path and its metadata can be @@ -520,7 +524,7 @@ pub struct PollWatcher { want_to_stop: Arc, /// channel to the poll loop /// currently used only for manual polling - message_channel: Sender<()>, + message_channel: Sender, delay: Option, follow_sylinks: bool, } @@ -534,14 +538,27 @@ impl PollWatcher { /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling. pub fn poll(&self) -> crate::Result<()> { self.message_channel - .send(()) + .send(PollMessage::Poll) .map_err(|_| Error::generic("failed to send poll message"))?; Ok(()) } + /// Actively poll for changes and block until the poll cycle has completed. + /// + /// This is primarily useful together with [`Config::with_manual_polling`]. + pub fn poll_blocking(&self) -> crate::Result<()> { + let (done_tx, done_rx) = std::sync::mpsc::channel(); + self.message_channel + .send(PollMessage::PollAndWait(done_tx)) + .map_err(|_| Error::generic("failed to send poll message"))?; + done_rx + .recv() + .map_err(|_| Error::generic("poll thread disconnected"))? + } + /// Returns a sender to initiate changes detection. #[cfg(test)] - pub(crate) fn poll_sender(&self) -> Sender<()> { + pub(crate) fn poll_sender(&self) -> Sender { self.message_channel.clone() } @@ -569,7 +586,7 @@ impl PollWatcher { config.event_kinds(), ); - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded::(); let poll_watcher = PollWatcher { watches: Default::default(), @@ -585,7 +602,7 @@ impl PollWatcher { Ok(poll_watcher) } - fn run(&self, rx: Receiver<()>) { + fn run(&self, rx: Receiver) { let watches = Arc::clone(&self.watches); let data_builder = Arc::clone(&self.data_builder); let want_to_stop = Arc::clone(&self.want_to_stop); @@ -594,30 +611,52 @@ impl PollWatcher { let _ = thread::Builder::new() .name("notify-rs poll loop".to_string()) .spawn(move || { + // do an immediate first scan, then sleep `delay` between subsequent scans. + let mut first_auto_scan = true; + loop { if want_to_stop.load(Ordering::SeqCst) { break; } + let msg = match delay { + Some(_delay) if first_auto_scan => { + first_auto_scan = false; + None + } + Some(delay) => match rx.recv_timeout(delay) { + Ok(msg) => Some(msg), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => None, + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, + }, + None => match rx.recv() { + Ok(msg) => Some(msg), + Err(_) => break, + }, + }; + // HINT: Make sure always lock in the same order to avoid deadlock. // // FIXME: inconsistent: some place mutex poison cause panic, // some place just ignore. - if let (Ok(mut watches), Ok(mut data_builder)) = - (watches.lock(), data_builder.lock()) - { + let scan_res = { + let mut watches = watches.lock().unwrap_or_else(|e| e.into_inner()); + let mut data_builder = + data_builder.lock().unwrap_or_else(|e| e.into_inner()); + data_builder.update_timestamp(); let vals = watches.values_mut(); for watch_data in vals { watch_data.rescan(&mut data_builder); } - } - // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start)) - if let Some(delay) = delay { - let _ = rx.recv_timeout(delay); - } else { - let _ = rx.recv(); + + Ok(()) + }; + + // Acknowledge poll requests after a poll cycle has finished. + if let Some(PollMessage::PollAndWait(done)) = msg { + let _ = done.send(scan_res); } } }); diff --git a/notify/src/test.rs b/notify/src/test.rs index 9be4fc32..4dbee078 100644 --- a/notify/src/test.rs +++ b/notify/src/test.rs @@ -425,7 +425,7 @@ pub fn poll_watcher_channel() -> (TestWatcher, Receiver) { timeout: Receiver::DEFAULT_TIMEOUT, detect_changes: Some(Box::new(move || { sender - .send(()) + .send(crate::poll::PollMessage::Poll) .expect("PollWatcher receiver part was disconnected") })), kind: watcher.kind, From bc101d9225dc3daca540ecab20db4896d5b5aeb1 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Sat, 14 Feb 2026 19:15:31 +0900 Subject: [PATCH 2/2] deny --- deny.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/deny.toml b/deny.toml index b2a17951..39b48888 100644 --- a/deny.toml +++ b/deny.toml @@ -4,4 +4,5 @@ allow = [ "Apache-2.0", "ISC", "CC0-1.0", + "BSL-1.0", ]