Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ tempfile = "3.10.0"
trash = "5.2.2"
walkdir = "2.4.0"
windows-sys = "0.61.0"
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ allow = [
"Apache-2.0",
"ISC",
"CC0-1.0",
"BSL-1.0",
]
1 change: 1 addition & 0 deletions notify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log.workspace = true
walkdir.workspace = true
futures = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
xxhash-rust.workspace = true

[target.'cfg(any(target_os="linux", target_os="android"))'.dependencies]
inotify = { workspace = true, default-features = false }
Expand Down
139 changes: 89 additions & 50 deletions notify/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use std::{
time::Duration,
};

pub(crate) enum PollMessage {
Poll,
PollAndWait(Sender<crate::Result<()>>),
}

/// Event sent for registered handlers on initial directory scans
pub type ScanEvent = crate::Result<PathBuf>;

Expand Down Expand Up @@ -69,15 +74,15 @@ mod data {
use notify_types::event::EventKindMask;
use std::{
cell::RefCell,
collections::{hash_map::RandomState, HashMap},
collections::HashMap,
fmt::{self, Debug},
fs::{self, File, Metadata},
hash::{BuildHasher, Hasher},
io::{self, Read},
path::{Path, PathBuf},
time::Instant,
};
use walkdir::WalkDir;
use xxhash_rust::xxh3::Xxh3Default;

use super::ScanEventHandler;

Expand All @@ -92,10 +97,7 @@ mod data {
pub(super) struct DataBuilder {
emitter: EventEmitter,
scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,

// TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
// in future.
build_hasher: Option<RandomState>,
compare_contents: bool,

// current timestamp for building Data.
now: Instant,
Expand Down Expand Up @@ -124,7 +126,7 @@ mod data {
Self {
emitter: EventEmitter::new(event_handler, event_kinds),
scan_emitter,
build_hasher: compare_content.then(RandomState::default),
compare_contents: compare_content,
now: Instant::now(),
}
}
Expand Down Expand Up @@ -156,7 +158,7 @@ mod data {
impl Debug for DataBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DataBuilder")
.field("build_hasher", &self.build_hasher)
.field("compare_contents", &self.compare_contents)
.field("now", &self.now)
.finish()
}
Expand Down Expand Up @@ -366,38 +368,16 @@ mod data {

PathData {
mtime: metadata.modified().map_or(0, system_time_to_seconds),
hash: data_builder
.build_hasher
.as_ref()
.filter(|_| metadata.is_file())
.and_then(|build_hasher| {
Self::get_content_hash(build_hasher, meta_path.path()).ok()
}),
hash: if data_builder.compare_contents && metadata.is_file() {
content_hash(meta_path.path()).ok()
} else {
None
},

last_check: data_builder.now,
}
}

/// Get hash value for the data content in given file `path`.
fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
let mut hasher = build_hasher.build_hasher();
let mut file = File::open(path)?;
let mut buf = [0; 512];

loop {
let n = match file.read(&mut buf) {
Ok(0) => break,
Ok(len) => len,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};

hasher.write(&buf[..n]);
}

Ok(hasher.finish())
}

/// Get [`Event`] by compare two optional [`PathData`].
fn compare_to_event<P>(
path: P,
Expand All @@ -407,6 +387,11 @@ mod data {
where
P: Into<PathBuf>,
{
Self::compare_to_kind(old, new)
.map(|event_kind| Event::new(event_kind).add_path(path.into()))
}

fn compare_to_kind(old: Option<&PathData>, new: Option<&PathData>) -> Option<EventKind> {
match (old, new) {
(Some(old), Some(new)) => {
if new.mtime > old.mtime {
Expand All @@ -423,10 +408,29 @@ mod data {
(Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
(None, None) => None,
}
.map(|event_kind| Event::new(event_kind).add_path(path.into()))
}
}

/// Get hash value for the data content in given file `path`.
pub(super) fn content_hash(path: &Path) -> io::Result<u64> {
let mut hasher = Xxh3Default::new();
let mut file = File::open(path)?;
let mut buf = [0u8; 8 * 1024];

loop {
let n = match file.read(&mut buf) {
Ok(0) => break,
Ok(len) => len,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};

hasher.update(&buf[..n]);
}

Ok(hasher.digest())
}

/// Compose path and its metadata.
///
/// This data structure designed for make sure path and its metadata can be
Expand Down Expand Up @@ -520,7 +524,7 @@ pub struct PollWatcher {
want_to_stop: Arc<AtomicBool>,
/// channel to the poll loop
/// currently used only for manual polling
message_channel: Sender<()>,
message_channel: Sender<PollMessage>,
delay: Option<Duration>,
follow_sylinks: bool,
}
Expand All @@ -534,14 +538,27 @@ impl PollWatcher {
/// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
pub fn poll(&self) -> crate::Result<()> {
self.message_channel
.send(())
.send(PollMessage::Poll)
.map_err(|_| Error::generic("failed to send poll message"))?;
Ok(())
}

/// Actively poll for changes and block until the poll cycle has completed.
///
/// This is primarily useful together with [`Config::with_manual_polling`].
pub fn poll_blocking(&self) -> crate::Result<()> {
let (done_tx, done_rx) = std::sync::mpsc::channel();
self.message_channel
.send(PollMessage::PollAndWait(done_tx))
.map_err(|_| Error::generic("failed to send poll message"))?;
done_rx
.recv()
.map_err(|_| Error::generic("poll thread disconnected"))?
}

/// Returns a sender to initiate changes detection.
#[cfg(test)]
pub(crate) fn poll_sender(&self) -> Sender<()> {
pub(crate) fn poll_sender(&self) -> Sender<PollMessage> {
self.message_channel.clone()
}

Expand Down Expand Up @@ -569,7 +586,7 @@ impl PollWatcher {
config.event_kinds(),
);

let (tx, rx) = unbounded();
let (tx, rx) = unbounded::<PollMessage>();

let poll_watcher = PollWatcher {
watches: Default::default(),
Expand All @@ -585,7 +602,7 @@ impl PollWatcher {
Ok(poll_watcher)
}

fn run(&self, rx: Receiver<()>) {
fn run(&self, rx: Receiver<PollMessage>) {
let watches = Arc::clone(&self.watches);
let data_builder = Arc::clone(&self.data_builder);
let want_to_stop = Arc::clone(&self.want_to_stop);
Expand All @@ -594,30 +611,52 @@ impl PollWatcher {
let _ = thread::Builder::new()
.name("notify-rs poll loop".to_string())
.spawn(move || {
// do an immediate first scan, then sleep `delay` between subsequent scans.
let mut first_auto_scan = true;

loop {
if want_to_stop.load(Ordering::SeqCst) {
break;
}

let msg = match delay {
Some(_delay) if first_auto_scan => {
first_auto_scan = false;
None
}
Some(delay) => match rx.recv_timeout(delay) {
Ok(msg) => Some(msg),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => None,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
},
None => match rx.recv() {
Ok(msg) => Some(msg),
Err(_) => break,
},
};

// HINT: Make sure always lock in the same order to avoid deadlock.
//
// FIXME: inconsistent: some place mutex poison cause panic,
// some place just ignore.
if let (Ok(mut watches), Ok(mut data_builder)) =
(watches.lock(), data_builder.lock())
{
let scan_res = {
let mut watches = watches.lock().unwrap_or_else(|e| e.into_inner());
let mut data_builder =
data_builder.lock().unwrap_or_else(|e| e.into_inner());

data_builder.update_timestamp();

let vals = watches.values_mut();
for watch_data in vals {
watch_data.rescan(&mut data_builder);
}
}
// TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
if let Some(delay) = delay {
let _ = rx.recv_timeout(delay);
} else {
let _ = rx.recv();

Ok(())
};

// Acknowledge poll requests after a poll cycle has finished.
if let Some(PollMessage::PollAndWait(done)) = msg {
let _ = done.send(scan_res);
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion notify/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ pub fn poll_watcher_channel() -> (TestWatcher<PollWatcher>, Receiver) {
timeout: Receiver::DEFAULT_TIMEOUT,
detect_changes: Some(Box::new(move || {
sender
.send(())
.send(crate::poll::PollMessage::Poll)
.expect("PollWatcher receiver part was disconnected")
})),
kind: watcher.kind,
Expand Down