Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/recoco-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ source-gdrive = [
source-local-file = [
"batching",
"dep:async-stream",
"dep:notify",
"dep:recoco-splitters",
"recoco-splitters/pattern-matching",
"recoco-utils/bytes_decode",
Expand Down
4 changes: 1 addition & 3 deletions crates/recoco-core/src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1809,9 +1809,7 @@ mod tests {
let struct_part = KeyPart::from(vec![
KeyPart::from(String::from("world")),
KeyPart::from(100i64),
KeyPart::from(vec![
KeyPart::from(false),
]),
KeyPart::from(vec![KeyPart::from(false)]),
]);
assert_eq!(struct_part.to_strs(), vec!["world", "100", "false"]);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/recoco-core/src/ops/sources/amazon_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use redis::Client as RedisClient;
use std::sync::Arc;
use urlencoding;

use recoco_splitters::pattern_matcher::PatternMatcher;
use crate::base::field_attrs;
use crate::ops::sdk::*;
use recoco_splitters::pattern_matcher::PatternMatcher;

/// Decode a form-encoded URL string, treating '+' as spaces
fn decode_form_encoded_url(input: &str) -> Result<Arc<str>> {
Expand Down
2 changes: 1 addition & 1 deletion crates/recoco-core/src/ops/sources/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use azure_storage_blobs::prelude::*;
use futures::StreamExt;
use std::sync::Arc;

use recoco_splitters::pattern_matcher::PatternMatcher;
use crate::base::field_attrs;
use crate::ops::sdk::*;
use crate::prelude::*;
use recoco_splitters::pattern_matcher::PatternMatcher;

#[derive(Debug, Serialize, Deserialize)]
pub struct Spec {
Expand Down
2 changes: 1 addition & 1 deletion crates/recoco-core/src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
// Both the upstream CocoIndex code and the Recoco modifications are licensed under the Apache-2.0 License.
// SPDX-License-Identifier: Apache-2.0

use recoco_splitters::pattern_matcher::PatternMatcher;
use chrono::Duration;
use google_drive3::{
DriveHub,
Expand All @@ -21,6 +20,7 @@ use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use phf::phf_map;
use recoco_splitters::pattern_matcher::PatternMatcher;

use crate::base::field_attrs;
use crate::ops::sdk::*;
Expand Down
83 changes: 54 additions & 29 deletions crates/recoco-core/src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::path::Path;
use std::{path::PathBuf, sync::Arc};
use tracing::warn;

use recoco_splitters::pattern_matcher::PatternMatcher;
use crate::base::field_attrs;
use crate::{fields_value, ops::sdk::*};
use recoco_splitters::pattern_matcher::PatternMatcher;

#[derive(Debug, Serialize, Deserialize)]
pub struct Spec {
Expand Down Expand Up @@ -51,6 +51,24 @@ async fn ensure_metadata<'a>(
Ok(metadata.as_ref().unwrap())
}

async fn try_ensure_metadata<'a>(
path: &Path,
metadata: &'a mut Option<Metadata>,
) -> Option<&'a Metadata> {
if metadata.is_none() {
// Follow symlinks.
match tokio::fs::metadata(path).await {
Ok(m) => {
*metadata = Some(m);
}
Err(e) => {
warn!("Failed to get metadata for {}: {e}", path.display());
}
}
}
metadata.as_ref()
}

#[async_trait]
impl SourceExecutor for Executor {
async fn list(
Expand Down Expand Up @@ -78,20 +96,21 @@ impl SourceExecutor for Executor {
let mut metadata: Option<Metadata> = None;

// For symlinks, if the target doesn't exist, log and skip.
let file_type = entry.file_type().await?;
if file_type.is_symlink()
&& let Err(e) = ensure_metadata(&path, &mut metadata).await {
if e.kind() == std::io::ErrorKind::NotFound {
warn!("Skipped broken symlink: {}", path.display());
continue;
}
Err(e)?;
let file_type = match entry.file_type().await {
Ok(ft) => ft,
Err(e) => {
warn!("Failed to get file type for {}: {e}", path.display());
continue;
}
};
let is_dir = if file_type.is_dir() {
true
} else if file_type.is_symlink() {
// Follow symlinks to classify the target.
ensure_metadata(&path, &mut metadata).await?.is_dir()
let Some(m) = try_ensure_metadata(&path, &mut metadata).await else {
continue;
};
m.is_dir()
} else {
false
};
Expand All @@ -102,13 +121,17 @@ impl SourceExecutor for Executor {
} else if self.pattern_matcher.is_file_included(relative_path) {
// Check file size limit
if let Some(max_size) = self.max_file_size
&& let Ok(metadata) = ensure_metadata(&path, &mut metadata).await
&& metadata.len() > max_size as u64
&& try_ensure_metadata(&path, &mut metadata)
.await
.is_none_or(|m| m.len() > max_size as u64)
{
continue;
}
let ordinal: Option<Ordinal> = if options.include_ordinal {
let metadata = ensure_metadata(&path, &mut metadata).await?;
let Some(metadata) = try_ensure_metadata(&path, &mut metadata).await
else {
continue;
};
Some(metadata.modified()?.try_into()?)
} else {
None
Expand Down Expand Up @@ -256,26 +279,28 @@ impl SourceExecutor for Executor {
let (tx, mut rx) = mpsc::channel::<PathBuf>(100);

let mut watcher = RecommendedWatcher::new(
move |res: notify::Result<notify::Event>| {
match res {
Ok(event) => {
for path in event.paths {
if let Err(err) = tx.try_send(path) {
use tokio::sync::mpsc::error::TrySendError;
match err {
TrySendError::Full(_) => {
warn!("File watcher channel is full; dropping file change event");
}
TrySendError::Closed(_) => {
warn!("File watcher channel is closed; dropping file change event");
}
move |res: notify::Result<notify::Event>| match res {
Ok(event) => {
for path in event.paths {
if let Err(err) = tx.try_send(path) {
use tokio::sync::mpsc::error::TrySendError;
match err {
TrySendError::Full(_) => {
warn!(
"File watcher channel is full; dropping file change event"
);
}
TrySendError::Closed(_) => {
warn!(
"File watcher channel is closed; dropping file change event"
);
}
}
}
}
Err(e) => {
warn!("File watcher error: {}", e);
}
}
Err(e) => {
warn!("File watcher error: {}", e);
}
},
Config::default(),
Expand Down
33 changes: 17 additions & 16 deletions crates/recoco-core/src/setup/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,25 +178,26 @@ pub async fn apply_component_changes<D: SetupOperator>(
) -> Result<()> {
// First delete components that need to be removed, with bounded concurrency
// to avoid overloading the underlying store or exhausting connection pools.
run_bounded(changes.iter().flat_map(|change| {
change
.keys_to_delete
.iter()
.map(move |key| change.desc.delete(key, context))
}))
.await?;
let mut delete_futs = Vec::new();
for change in changes.iter().copied() {
for key in change.keys_to_delete.iter() {
delete_futs.push(change.desc.delete(key, context));
}
}
run_bounded(delete_futs).await?;

// Then upsert components that need to be updated, also with bounded concurrency.
run_bounded(changes.iter().flat_map(|change| {
change.states_to_upsert.iter().map(move |state| async move {
if state.already_exists {
change.desc.update(&state.state, context).await
let mut upsert_futs = Vec::new();
for change in changes.iter().copied() {
for state in change.states_to_upsert.iter() {
upsert_futs.push(if state.already_exists {
change.desc.update(&state.state, context)
} else {
change.desc.create(&state.state, context).await
}
})
}))
.await?;
change.desc.create(&state.state, context)
});
}
}
run_bounded(upsert_futs).await?;

Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions crates/recoco-core/src/setup/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ pub async fn read_setup_metadata(pool: &PgPool) -> Result<Option<Vec<SetupMetada
// Use to_regclass to check existence: it respects the connection's search_path
// and schema qualification, so it works correctly regardless of whether a custom
// db_schema_name is configured or the connection uses a non-public default schema.
let exists: Option<bool> = sqlx::query_scalar(
"SELECT to_regclass($1) IS NOT NULL",
)
.bind(&table_name)
.fetch_one(&mut *db_conn)
.await?;
let exists: Option<bool> = sqlx::query_scalar("SELECT to_regclass($1) IS NOT NULL")
.bind(&table_name)
.fetch_one(&mut *db_conn)
.await?;
if !exists.unwrap_or(false) {
None
} else {
Expand Down
Loading