From cd85609bda1344d87633dbdd02efc03dc8841c96 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:40:33 +0000 Subject: [PATCH 1/5] feat: add filesystem watch support to local_file source Port upstream feature from cocoindex-io/cocoindex#1669 to enable real-time change detection for the LocalFile source using the notify crate. Changes: - Add notify 8.2.0 dependency to workspace and recoco-core - Wire notify into source-local-file feature - Add optional watch_changes field to Spec (defaults to false) - Add watch_changes field to Executor - Implement change_stream() method using notify::RecommendedWatcher - Add Clone derive to PatternMatcher for use in async stream - Filter filesystem events through existing PatternMatcher The feature is opt-in and fully backward-compatible. When enabled, the source bridges filesystem events via tokio::sync::mpsc into the change_stream() interface for low-latency continuous indexing. Related to #27 Co-authored-by: Adam Poulemanos --- Cargo.toml | 1 + crates/recoco-core/Cargo.toml | 2 + .../recoco-core/src/ops/sources/local_file.rs | 69 +++++++++++++++++++ .../src/ops/sources/shared/pattern_matcher.rs | 2 +- 4 files changed, 73 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ee02343c..ddd29e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ itertools = "0.14.0" json5 = "1.3.0" log = "0.4.29" neo4rs = "0.8.0" +notify = "8.2.0" pgvector = { version = "0.4.1", features = ["halfvec", "sqlx"] } phf = { version = "0.12.1", features = ["macros"] } qdrant-client = "1.16.0" diff --git a/crates/recoco-core/Cargo.toml b/crates/recoco-core/Cargo.toml index d9ce4c06..7c10ea37 100644 --- a/crates/recoco-core/Cargo.toml +++ b/crates/recoco-core/Cargo.toml @@ -58,6 +58,7 @@ itertools = { workspace = true, optional = true } json5 = { workspace = true, optional = true } log = { workspace = true, optional = true } neo4rs = { workspace = true, optional = true } +notify = { workspace = true, optional = true } pgvector = { workspace = true, optional = true } phf = { workspace = true } # compile time qdrant-client = { workspace = true, optional = true } @@ -247,6 +248,7 @@ source-local-file = [ "batching", "dep:async-stream", "dep:globset", + "dep:notify", "recoco-utils/bytes_decode", "recoco-utils/local-file" ] diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 9c7f026a..6306749e 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -28,6 +28,7 @@ pub struct Spec { included_patterns: Option>, excluded_patterns: Option>, max_file_size: Option, + watch_changes: Option, } struct Executor { @@ -35,6 +36,7 @@ struct Executor { binary: bool, pattern_matcher: PatternMatcher, max_file_size: Option, + watch_changes: bool, } async fn ensure_metadata<'a>( @@ -189,6 +191,72 @@ impl SourceExecutor for Executor { fn provides_ordinal(&self) -> bool { true } + + async fn change_stream( + &self, + ) -> Result>>> { + if !self.watch_changes { + return Ok(None); + } + + use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; + use tokio::sync::mpsc; + + let root_path = self.root_path.clone(); + let root_component_size = root_path.components().count(); + let pattern_matcher = self.pattern_matcher.clone(); + + let (tx, mut rx) = mpsc::channel::(100); + + let mut watcher = RecommendedWatcher::new( + move |res: notify::Result| { + if let Ok(event) = res { + for path in event.paths { + let _ = tx.blocking_send(path); + } + } + }, + Config::default(), + ) + .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?; + + watcher + .watch(&root_path, RecursiveMode::Recursive) + .map_err(|e| anyhow::anyhow!("Failed to watch path: {}", e))?; + + let stream = async_stream::stream! { + // Keep the watcher alive for the duration of the stream + let _watcher = watcher; + + while let Some(path) = rx.recv().await { + let mut path_components = path.components(); + for _ in 0..root_component_size { + path_components.next(); + } + let Some(relative_path) = path_components.as_path().to_str() else { + continue; + }; + + // Filter through pattern matcher + if pattern_matcher.is_file_included(relative_path) { + yield Ok(SourceChangeMessage { + changes: vec![SourceChange { + key: KeyValue::from_single_part(relative_path.to_string()), + key_aux_info: serde_json::Value::Null, + data: PartialSourceRowData { + ordinal: None, + content_version_fp: None, + value: None, + }, + }], + ack_fn: None, + }); + } + } + }; + + Ok(Some(stream.boxed())) + } } pub struct Factory; @@ -242,6 +310,7 @@ impl SourceFactoryBase for Factory { binary: spec.binary, pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, max_file_size: spec.max_file_size, + watch_changes: spec.watch_changes.unwrap_or(false), })) } } diff --git a/crates/recoco-core/src/ops/sources/shared/pattern_matcher.rs b/crates/recoco-core/src/ops/sources/shared/pattern_matcher.rs index a4997222..39e29f38 100644 --- a/crates/recoco-core/src/ops/sources/shared/pattern_matcher.rs +++ b/crates/recoco-core/src/ops/sources/shared/pattern_matcher.rs @@ -23,7 +23,7 @@ fn build_glob_set(patterns: Vec) -> Result { } /// Pattern matcher that handles include and exclude patterns for files -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PatternMatcher { /// Patterns matching full path of files to be included. included_glob_set: Option, From a49e9ae96c925fcb766cdb8a57df2214bf25ba91 Mon Sep 17 00:00:00 2001 From: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:04:57 -0400 Subject: [PATCH 2/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> --- crates/recoco-core/src/ops/sources/local_file.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 6306749e..3ab44802 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -237,6 +237,11 @@ impl SourceExecutor for Executor { continue; }; + // Skip events that correspond to the root directory itself or yield no relative path. + if relative_path.is_empty() { + continue; + } + // Filter through pattern matcher if pattern_matcher.is_file_included(relative_path) { yield Ok(SourceChangeMessage { From 132ea49df38029e04c08b5ee7057e9ebb5a22822 Mon Sep 17 00:00:00 2001 From: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:05:06 -0400 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> --- crates/recoco-core/src/ops/sources/local_file.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 3ab44802..7ef1064a 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -212,7 +212,17 @@ impl SourceExecutor for Executor { move |res: notify::Result| { if let Ok(event) = res { for path in event.paths { - let _ = tx.blocking_send(path); + if let Err(err) = tx.try_send(path) { + use tokio::sync::mpsc::error::TrySendError; + match err { + TrySendError::Full(_) => { + warn!("File watcher channel is full; dropping file change event"); + } + TrySendError::Closed(_) => { + warn!("File watcher channel is closed; dropping file change event"); + } + } + } } } }, From 1f091e4b93bc52e251b40a304dd2ac1df8f1baeb Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 22:39:38 -0400 Subject: [PATCH 4/5] fix(local_file): address review feedback on change_stream() watcher implementation (#98) * Initial plan * fix: address review feedback on change_stream() in local_file source Co-authored-by: bashandbone <89049923+bashandbone@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> --------- Signed-off-by: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: bashandbone <89049923+bashandbone@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- Cargo.lock | 134 ++++++++++++++++-- .../recoco-core/src/ops/sources/local_file.rs | 41 ++++-- 2 files changed, 154 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61653779..c299d6df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -1926,6 +1932,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.31" @@ -2947,6 +2962,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -3028,6 +3063,26 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3055,7 +3110,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ - "bitflags", + "bitflags 2.10.0", "libc", "redox_syscall 0.7.0", ] @@ -3188,6 +3243,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.61.2", ] @@ -3257,6 +3313,33 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.10.0", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3378,7 +3461,7 @@ version = "0.10.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -4068,6 +4151,7 @@ dependencies = [ "json5 1.3.0", "log", "neo4rs", + "notify", "pgvector", "phf 0.12.1", "qdrant-client", @@ -4201,7 +4285,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4210,7 +4294,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4368,7 +4452,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32" dependencies = [ - "bitflags", + "bitflags 2.10.0", "once_cell", "serde", "serde_derive", @@ -4427,7 +4511,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys", @@ -4540,6 +4624,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -4638,7 +4731,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -4651,7 +4744,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -5080,7 +5173,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.10.0", "byteorder", "bytes", "chrono", @@ -5124,7 +5217,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.10.0", "byteorder", "chrono", "crc", @@ -5580,7 +5673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "async-compression", - "bitflags", + "bitflags 2.10.0", "bytes", "futures-core", "futures-util", @@ -6144,6 +6237,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -6300,6 +6403,15 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-core" version = "0.62.2" diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 7ef1064a..715c5182 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -210,20 +210,25 @@ impl SourceExecutor for Executor { let mut watcher = RecommendedWatcher::new( move |res: notify::Result| { - if let Ok(event) = res { - for path in event.paths { - if let Err(err) = tx.try_send(path) { - use tokio::sync::mpsc::error::TrySendError; - match err { - TrySendError::Full(_) => { - warn!("File watcher channel is full; dropping file change event"); - } - TrySendError::Closed(_) => { - warn!("File watcher channel is closed; dropping file change event"); + match res { + Ok(event) => { + for path in event.paths { + if let Err(err) = tx.try_send(path) { + use tokio::sync::mpsc::error::TrySendError; + match err { + TrySendError::Full(_) => { + warn!("File watcher channel is full; dropping file change event"); + } + TrySendError::Closed(_) => { + warn!("File watcher channel is closed; dropping file change event"); + } } } } } + Err(e) => { + warn!("File watcher error: {}", e); + } } }, Config::default(), @@ -239,6 +244,22 @@ impl SourceExecutor for Executor { let _watcher = watcher; while let Some(path) = rx.recv().await { + // Skip directory paths - notify can emit events for directories, + // and reading a directory as a file would produce an EISDIR error. + let is_dir = match std::fs::metadata(&path) { + Ok(metadata) => metadata.is_dir(), + Err(err) => { + // If the file no longer exists, this may be a deletion event; do not skip it. + if err.kind() != std::io::ErrorKind::NotFound { + warn!("Failed to read metadata for path {:?}: {}", path, err); + } + false + } + }; + if is_dir { + continue; + } + let mut path_components = path.components(); for _ in 0..root_component_size { path_components.next(); From e8362988e4be74bfa60a32de70eaa406a29ee9b4 Mon Sep 17 00:00:00 2001 From: Adam Poulemanos Date: Sun, 15 Mar 2026 22:54:36 -0400 Subject: [PATCH 5/5] update lockfile --- Cargo.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 6808b49a..7e20e5ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4136,7 +4136,6 @@ dependencies = [ "derive-where", "expect-test", "futures", - "globset", "google-cloud-aiplatform-v1", "google-cloud-gax", "google-drive3", @@ -4182,7 +4181,9 @@ dependencies = [ name = "recoco-splitters" version = "0.2.1" dependencies = [ + "anyhow", "cfg-if", + "globset", "regex", "tree-sitter", "tree-sitter-c",