From e3b224df13bedb52aff1c5f36a0e13f1b1442e21 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 16:57:07 +0000 Subject: [PATCH 1/3] Initial plan From fd88c9a7971f93349e991c77d0feac34ebb68266 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 17:45:55 +0000 Subject: [PATCH 2/3] fix: upstream-sync LocalFile resilient list() + notify feature gate + closure lifetime bug - Add try_ensure_metadata() helper: warns on error, returns Option<&Metadata> - Change file_type().await? to match with warn+continue on error - Remove old symlink error block, replace with try_ensure_metadata - Use is_none_or for file size limit check (skip if metadata unavailable or too big) - Use try_ensure_metadata with let..else for ordinal fetch - Add dep:notify to source-local-file feature (fixes pre-existing CI failure) - Fix apply_component_changes closure lifetime bug (HRTB E0308) in components.rs by using explicit for loops + Vec instead of flat_map iterator chains Co-authored-by: bashandbone <89049923+bashandbone@users.noreply.github.com> --- crates/recoco-core/Cargo.toml | 1 + crates/recoco-core/src/base/value.rs | 4 +- .../recoco-core/src/ops/sources/amazon_s3.rs | 2 +- .../recoco-core/src/ops/sources/azure_blob.rs | 2 +- .../src/ops/sources/google_drive.rs | 2 +- .../recoco-core/src/ops/sources/local_file.rs | 83 ++++++++++++------- crates/recoco-core/src/setup/components.rs | 33 ++++---- crates/recoco-core/src/setup/db_metadata.rs | 10 +-- 8 files changed, 80 insertions(+), 57 deletions(-) diff --git a/crates/recoco-core/Cargo.toml b/crates/recoco-core/Cargo.toml index 47435594..cc65e1f6 100644 --- a/crates/recoco-core/Cargo.toml +++ b/crates/recoco-core/Cargo.toml @@ -249,6 +249,7 @@ source-gdrive = [ source-local-file = [ "batching", "dep:async-stream", + "dep:notify", "dep:recoco-splitters", "recoco-splitters/pattern-matching", "recoco-utils/bytes_decode", diff --git a/crates/recoco-core/src/base/value.rs b/crates/recoco-core/src/base/value.rs index 9b21069d..782560f0 100644 --- a/crates/recoco-core/src/base/value.rs +++ b/crates/recoco-core/src/base/value.rs @@ -1809,9 +1809,7 @@ mod tests { let struct_part = KeyPart::from(vec![ KeyPart::from(String::from("world")), KeyPart::from(100i64), - KeyPart::from(vec![ - KeyPart::from(false), - ]), + KeyPart::from(vec![KeyPart::from(false)]), ]); assert_eq!(struct_part.to_strs(), vec!["world", "100", "false"]); } diff --git a/crates/recoco-core/src/ops/sources/amazon_s3.rs b/crates/recoco-core/src/ops/sources/amazon_s3.rs index 55af6462..a73590a5 100644 --- a/crates/recoco-core/src/ops/sources/amazon_s3.rs +++ b/crates/recoco-core/src/ops/sources/amazon_s3.rs @@ -19,9 +19,9 @@ use redis::Client as RedisClient; use std::sync::Arc; use urlencoding; -use recoco_splitters::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::ops::sdk::*; +use recoco_splitters::pattern_matcher::PatternMatcher; /// Decode a form-encoded URL string, treating '+' as spaces fn decode_form_encoded_url(input: &str) -> Result> { diff --git a/crates/recoco-core/src/ops/sources/azure_blob.rs b/crates/recoco-core/src/ops/sources/azure_blob.rs index 5689857e..88de2265 100644 --- a/crates/recoco-core/src/ops/sources/azure_blob.rs +++ b/crates/recoco-core/src/ops/sources/azure_blob.rs @@ -18,10 +18,10 @@ use azure_storage_blobs::prelude::*; use futures::StreamExt; use std::sync::Arc; -use recoco_splitters::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::ops::sdk::*; use crate::prelude::*; +use recoco_splitters::pattern_matcher::PatternMatcher; #[derive(Debug, Serialize, Deserialize)] pub struct Spec { diff --git a/crates/recoco-core/src/ops/sources/google_drive.rs b/crates/recoco-core/src/ops/sources/google_drive.rs index 987cd0c4..dc25bf72 100644 --- a/crates/recoco-core/src/ops/sources/google_drive.rs +++ b/crates/recoco-core/src/ops/sources/google_drive.rs @@ -10,7 +10,6 @@ // Both the upstream CocoIndex code and the Recoco modifications are licensed under the Apache-2.0 License. // SPDX-License-Identifier: Apache-2.0 -use recoco_splitters::pattern_matcher::PatternMatcher; use chrono::Duration; use google_drive3::{ DriveHub, @@ -21,6 +20,7 @@ use http_body_util::BodyExt; use hyper_rustls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; use phf::phf_map; +use recoco_splitters::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::ops::sdk::*; diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 4a77c2d5..83e1e07c 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -17,9 +17,9 @@ use std::path::Path; use std::{path::PathBuf, sync::Arc}; use tracing::warn; -use recoco_splitters::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::{fields_value, ops::sdk::*}; +use recoco_splitters::pattern_matcher::PatternMatcher; #[derive(Debug, Serialize, Deserialize)] pub struct Spec { @@ -51,6 +51,24 @@ async fn ensure_metadata<'a>( Ok(metadata.as_ref().unwrap()) } +async fn try_ensure_metadata<'a>( + path: &Path, + metadata: &'a mut Option, +) -> Option<&'a Metadata> { + if metadata.is_none() { + // Follow symlinks. + match tokio::fs::metadata(path).await { + Ok(m) => { + *metadata = Some(m); + } + Err(e) => { + warn!("Failed to get metadata for {}: {e}", path.display()); + } + } + } + metadata.as_ref() +} + #[async_trait] impl SourceExecutor for Executor { async fn list( @@ -78,20 +96,21 @@ impl SourceExecutor for Executor { let mut metadata: Option = None; // For symlinks, if the target doesn't exist, log and skip. - let file_type = entry.file_type().await?; - if file_type.is_symlink() - && let Err(e) = ensure_metadata(&path, &mut metadata).await { - if e.kind() == std::io::ErrorKind::NotFound { - warn!("Skipped broken symlink: {}", path.display()); - continue; - } - Err(e)?; + let file_type = match entry.file_type().await { + Ok(ft) => ft, + Err(e) => { + warn!("Failed to get file type for {}: {e}", path.display()); + continue; } + }; let is_dir = if file_type.is_dir() { true } else if file_type.is_symlink() { // Follow symlinks to classify the target. - ensure_metadata(&path, &mut metadata).await?.is_dir() + let Some(m) = try_ensure_metadata(&path, &mut metadata).await else { + continue; + }; + m.is_dir() } else { false }; @@ -102,13 +121,17 @@ impl SourceExecutor for Executor { } else if self.pattern_matcher.is_file_included(relative_path) { // Check file size limit if let Some(max_size) = self.max_file_size - && let Ok(metadata) = ensure_metadata(&path, &mut metadata).await - && metadata.len() > max_size as u64 + && try_ensure_metadata(&path, &mut metadata) + .await + .is_none_or(|m| m.len() > max_size as u64) { continue; } let ordinal: Option = if options.include_ordinal { - let metadata = ensure_metadata(&path, &mut metadata).await?; + let Some(metadata) = try_ensure_metadata(&path, &mut metadata).await + else { + continue; + }; Some(metadata.modified()?.try_into()?) } else { None @@ -256,26 +279,28 @@ impl SourceExecutor for Executor { let (tx, mut rx) = mpsc::channel::(100); let mut watcher = RecommendedWatcher::new( - move |res: notify::Result| { - match res { - Ok(event) => { - for path in event.paths { - if let Err(err) = tx.try_send(path) { - use tokio::sync::mpsc::error::TrySendError; - match err { - TrySendError::Full(_) => { - warn!("File watcher channel is full; dropping file change event"); - } - TrySendError::Closed(_) => { - warn!("File watcher channel is closed; dropping file change event"); - } + move |res: notify::Result| match res { + Ok(event) => { + for path in event.paths { + if let Err(err) = tx.try_send(path) { + use tokio::sync::mpsc::error::TrySendError; + match err { + TrySendError::Full(_) => { + warn!( + "File watcher channel is full; dropping file change event" + ); + } + TrySendError::Closed(_) => { + warn!( + "File watcher channel is closed; dropping file change event" + ); } } } } - Err(e) => { - warn!("File watcher error: {}", e); - } + } + Err(e) => { + warn!("File watcher error: {}", e); } }, Config::default(), diff --git a/crates/recoco-core/src/setup/components.rs b/crates/recoco-core/src/setup/components.rs index 2035642c..d1986804 100644 --- a/crates/recoco-core/src/setup/components.rs +++ b/crates/recoco-core/src/setup/components.rs @@ -178,25 +178,26 @@ pub async fn apply_component_changes( ) -> Result<()> { // First delete components that need to be removed, with bounded concurrency // to avoid overloading the underlying store or exhausting connection pools. - run_bounded(changes.iter().flat_map(|change| { - change - .keys_to_delete - .iter() - .map(move |key| change.desc.delete(key, context)) - })) - .await?; + let mut delete_futs = Vec::new(); + for change in changes.iter().copied() { + for key in change.keys_to_delete.iter() { + delete_futs.push(change.desc.delete(key, context)); + } + } + run_bounded(delete_futs).await?; // Then upsert components that need to be updated, also with bounded concurrency. - run_bounded(changes.iter().flat_map(|change| { - change.states_to_upsert.iter().map(move |state| async move { - if state.already_exists { - change.desc.update(&state.state, context).await + let mut upsert_futs = Vec::new(); + for change in changes.iter().copied() { + for state in change.states_to_upsert.iter() { + upsert_futs.push(if state.already_exists { + change.desc.update(&state.state, context) } else { - change.desc.create(&state.state, context).await - } - }) - })) - .await?; + change.desc.create(&state.state, context) + }); + } + } + run_bounded(upsert_futs).await?; Ok(()) } diff --git a/crates/recoco-core/src/setup/db_metadata.rs b/crates/recoco-core/src/setup/db_metadata.rs index fe53e340..be70513e 100644 --- a/crates/recoco-core/src/setup/db_metadata.rs +++ b/crates/recoco-core/src/setup/db_metadata.rs @@ -67,12 +67,10 @@ pub async fn read_setup_metadata(pool: &PgPool) -> Result = sqlx::query_scalar( - "SELECT to_regclass($1) IS NOT NULL", - ) - .bind(&table_name) - .fetch_one(&mut *db_conn) - .await?; + let exists: Option = sqlx::query_scalar("SELECT to_regclass($1) IS NOT NULL") + .bind(&table_name) + .fetch_one(&mut *db_conn) + .await?; if !exists.unwrap_or(false) { None } else { From 1d3040404d052134c4cf22d157f28ca437f83a1c Mon Sep 17 00:00:00 2001 From: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:49:16 -0400 Subject: [PATCH 3/3] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Adam Poulemanos <89049923+bashandbone@users.noreply.github.com> --- crates/recoco-core/src/ops/sources/local_file.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/recoco-core/src/ops/sources/local_file.rs b/crates/recoco-core/src/ops/sources/local_file.rs index 83e1e07c..09847610 100644 --- a/crates/recoco-core/src/ops/sources/local_file.rs +++ b/crates/recoco-core/src/ops/sources/local_file.rs @@ -132,7 +132,21 @@ impl SourceExecutor for Executor { else { continue; }; - Some(metadata.modified()?.try_into()?) + let modified = match metadata.modified() { + Ok(mtime) => mtime, + Err(e) => { + warn!("Failed to get modification time for {}: {e}", path.display()); + continue; + } + }; + let ordinal = match modified.try_into() { + Ok(ord) => ord, + Err(e) => { + warn!("Failed to convert modification time for {} into Ordinal: {e}", path.display()); + continue; + } + }; + Some(ordinal) } else { None };