diff --git a/Cargo.lock b/Cargo.lock index b3900c73..7e20e5ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -1926,6 +1932,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.31" @@ -2947,6 +2962,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -3028,6 +3063,26 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3055,7 +3110,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ - "bitflags", + "bitflags 2.10.0", "libc", "redox_syscall 0.7.0", ] @@ -3188,6 +3243,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.61.2", ] @@ -3257,6 +3313,33 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.10.0", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3378,7 +3461,7 @@ version = "0.10.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -4053,7 +4136,6 @@ dependencies = [ "derive-where", "expect-test", "futures", - "globset", "google-cloud-aiplatform-v1", "google-cloud-gax", "google-drive3", @@ -4068,6 +4150,7 @@ dependencies = [ "json5 1.3.0", "log", "neo4rs", + "notify", "pgvector", "phf 0.12.1", "qdrant-client", @@ -4098,7 +4181,9 @@ dependencies = [ name = "recoco-splitters" version = "0.2.1" dependencies = [ + "anyhow", "cfg-if", + "globset", "regex", "tree-sitter", "tree-sitter-c", @@ -4201,7 +4286,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4210,7 +4295,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4368,7 +4453,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32" dependencies = [ - "bitflags", + "bitflags 2.10.0", "once_cell", "serde", "serde_derive", @@ -4427,7 +4512,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys", @@ -4540,6 +4625,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -4638,7 +4732,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -4651,7 +4745,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -5080,7 +5174,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.10.0", "byteorder", "bytes", "chrono", @@ -5124,7 +5218,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.10.0", "byteorder", "chrono", "crc", @@ -5580,7 +5674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "async-compression", - "bitflags", + "bitflags 2.10.0", "bytes", "futures-core", "futures-util", @@ -6144,6 +6238,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -6300,6 +6404,15 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.toml b/Cargo.toml index ee02343c..ddd29e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ itertools = "0.14.0" json5 = "1.3.0" log = "0.4.29" neo4rs = "0.8.0" +notify = "8.2.0" pgvector = { version = "0.4.1", features = ["halfvec", "sqlx"] } phf = { version = "0.12.1", features = ["macros"] } qdrant-client = "1.16.0" diff --git a/crates/recoco-core/Cargo.toml b/crates/recoco-core/Cargo.toml index ab95351c..47435594 100644 --- a/crates/recoco-core/Cargo.toml +++ b/crates/recoco-core/Cargo.toml @@ -57,6 +57,7 @@ itertools = { workspace = true, optional = true } json5 = { workspace = true, optional = true } log = { workspace = true, optional = true } neo4rs = { workspace = true, optional = true } +notify = { workspace = true, optional = true } pgvector = { workspace = true, optional = true } phf = { workspace = true } # compile time qdrant-client = { workspace = true, optional = true } diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index f72dd2b3..4a77c2d5 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -28,6 +28,7 @@ pub struct Spec { included_patterns: Option>, excluded_patterns: Option>, max_file_size: Option, + watch_changes: Option, } struct Executor { @@ -36,6 +37,7 @@ struct Executor { binary: bool, pattern_matcher: PatternMatcher, max_file_size: Option, + watch_changes: bool, } async fn ensure_metadata<'a>( @@ -236,6 +238,108 @@ impl SourceExecutor for Executor { fn provides_ordinal(&self) -> bool { true } + + async fn change_stream( + &self, + ) -> Result>>> { + if !self.watch_changes { + return Ok(None); + } + + use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; + use tokio::sync::mpsc; + + let root_path = self.root_path.clone(); + let root_component_size = root_path.components().count(); + let pattern_matcher = self.pattern_matcher.clone(); + + let (tx, mut rx) = mpsc::channel::(100); + + let mut watcher = RecommendedWatcher::new( + move |res: notify::Result| { + match res { + Ok(event) => { + for path in event.paths { + if let Err(err) = tx.try_send(path) { + use tokio::sync::mpsc::error::TrySendError; + match err { + TrySendError::Full(_) => { + warn!("File watcher channel is full; dropping file change event"); + } + TrySendError::Closed(_) => { + warn!("File watcher channel is closed; dropping file change event"); + } + } + } + } + } + Err(e) => { + warn!("File watcher error: {}", e); + } + } + }, + Config::default(), + ) + .map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?; + + watcher + .watch(&root_path, RecursiveMode::Recursive) + .map_err(|e| anyhow::anyhow!("Failed to watch path: {}", e))?; + + let stream = async_stream::stream! { + // Keep the watcher alive for the duration of the stream + let _watcher = watcher; + + while let Some(path) = rx.recv().await { + // Skip directory paths - notify can emit events for directories, + // and reading a directory as a file would produce an EISDIR error. + let is_dir = match std::fs::metadata(&path) { + Ok(metadata) => metadata.is_dir(), + Err(err) => { + // If the file no longer exists, this may be a deletion event; do not skip it. + if err.kind() != std::io::ErrorKind::NotFound { + warn!("Failed to read metadata for path {:?}: {}", path, err); + } + false + } + }; + if is_dir { + continue; + } + + let mut path_components = path.components(); + for _ in 0..root_component_size { + path_components.next(); + } + let Some(relative_path) = path_components.as_path().to_str() else { + continue; + }; + + // Skip events that correspond to the root directory itself or yield no relative path. + if relative_path.is_empty() { + continue; + } + + // Filter through pattern matcher + if pattern_matcher.is_file_included(relative_path) { + yield Ok(SourceChangeMessage { + changes: vec![SourceChange { + key: KeyValue::from_single_part(relative_path.to_string()), + key_aux_info: serde_json::Value::Null, + data: PartialSourceRowData { + ordinal: None, + content_version_fp: None, + value: None, + }, + }], + ack_fn: None, + }); + } + } + }; + + Ok(Some(stream.boxed())) + } } pub struct Factory; @@ -293,6 +397,7 @@ impl SourceFactoryBase for Factory { binary: spec.binary, pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, max_file_size: spec.max_file_size, + watch_changes: spec.watch_changes.unwrap_or(false), })) } } diff --git a/crates/recoco-splitters/src/pattern_matcher.rs b/crates/recoco-splitters/src/pattern_matcher.rs index bf00bd21..34a14328 100644 --- a/crates/recoco-splitters/src/pattern_matcher.rs +++ b/crates/recoco-splitters/src/pattern_matcher.rs @@ -23,7 +23,7 @@ fn build_glob_set(patterns: Vec) -> Result { } /// Pattern matcher that handles include and exclude patterns for files -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PatternMatcher { /// Patterns matching full path of files to be included. included_glob_set: Option,