Skip to content

Commit c4d8c4e

Browse files
committed
feat(client): add batch_fetch for concurrent multi-URL fetching
Add batch_fetch() and batch_fetch_with_options() that fetch multiple URLs concurrently with configurable concurrency limit (default: 5). Each URL processed independently, results returned in input order. Closes #75
1 parent 44346e1 commit c4d8c4e

File tree

2 files changed

+192
-1
lines changed

2 files changed

+192
-1
lines changed

crates/fetchkit/src/client.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,91 @@ pub async fn fetch_with_options(
157157
registry.fetch(req, options).await
158158
}
159159

160+
/// Default concurrency limit for batch fetching.
161+
const DEFAULT_BATCH_CONCURRENCY: usize = 5;
162+
163+
/// Fetch multiple URLs concurrently.
164+
///
165+
/// Each URL is processed independently — one failure doesn't affect others.
166+
/// Results are returned in the same order as the input requests.
167+
/// Concurrency is limited to `concurrency` (default: 5).
168+
///
169+
/// # Examples
170+
///
171+
/// ```no_run
172+
/// use fetchkit::{FetchRequest, batch_fetch};
173+
///
174+
/// # async fn example() -> Result<(), fetchkit::FetchError> {
175+
/// let requests = vec![
176+
/// FetchRequest::new("https://example.com").as_markdown(),
177+
/// FetchRequest::new("https://example.org").as_markdown(),
178+
/// ];
179+
/// let results = batch_fetch(requests, None).await;
180+
/// for result in &results {
181+
/// match result {
182+
/// Ok(resp) => println!("{}: {}", resp.url, resp.status_code),
183+
/// Err(e) => println!("Error: {}", e),
184+
/// }
185+
/// }
186+
/// # Ok(())
187+
/// # }
188+
/// ```
189+
pub async fn batch_fetch(
190+
requests: Vec<FetchRequest>,
191+
concurrency: Option<usize>,
192+
) -> Vec<Result<FetchResponse, FetchError>> {
193+
let options = FetchOptions {
194+
enable_markdown: true,
195+
enable_text: true,
196+
..Default::default()
197+
};
198+
batch_fetch_with_options(requests, options, concurrency).await
199+
}
200+
201+
/// Fetch multiple URLs concurrently with custom options.
202+
///
203+
/// Each URL is processed independently with the shared options.
204+
/// Returns results in the same order as input requests.
205+
pub async fn batch_fetch_with_options(
206+
requests: Vec<FetchRequest>,
207+
options: FetchOptions,
208+
concurrency: Option<usize>,
209+
) -> Vec<Result<FetchResponse, FetchError>> {
210+
use futures::stream::{self, StreamExt};
211+
use std::sync::Arc;
212+
213+
let concurrency = concurrency.unwrap_or(DEFAULT_BATCH_CONCURRENCY).max(1);
214+
let num_requests = requests.len();
215+
let options = Arc::new(options);
216+
217+
let mut indexed_results: Vec<(usize, Result<FetchResponse, FetchError>)> =
218+
stream::iter(requests.into_iter().enumerate())
219+
.map(|(idx, req)| {
220+
let options = Arc::clone(&options);
221+
async move {
222+
let registry = FetcherRegistry::with_defaults();
223+
let result = registry.fetch(req, (*options).clone()).await;
224+
(idx, result)
225+
}
226+
})
227+
.buffer_unordered(concurrency)
228+
.collect()
229+
.await;
230+
231+
// Sort by original index to preserve request order
232+
indexed_results.sort_by_key(|(idx, _)| *idx);
233+
234+
// Pre-fill with errors, then replace with actual results
235+
let mut results: Vec<Result<FetchResponse, FetchError>> = (0..num_requests)
236+
.map(|_| Err(FetchError::MissingUrl))
237+
.collect();
238+
for (idx, result) in indexed_results {
239+
results[idx] = result;
240+
}
241+
242+
results
243+
}
244+
160245
#[cfg(test)]
161246
mod tests {
162247
use super::*;
@@ -229,4 +314,110 @@ mod tests {
229314
Err(FetchError::BlockedUrl)
230315
));
231316
}
317+
318+
#[tokio::test]
319+
async fn test_batch_fetch_multiple_urls() {
320+
use wiremock::matchers::{method, path};
321+
use wiremock::{Mock, MockServer, ResponseTemplate};
322+
323+
let server = MockServer::start().await;
324+
Mock::given(method("GET"))
325+
.and(path("/page1"))
326+
.respond_with(
327+
ResponseTemplate::new(200)
328+
.set_body_string("Page 1")
329+
.insert_header("content-type", "text/plain"),
330+
)
331+
.mount(&server)
332+
.await;
333+
Mock::given(method("GET"))
334+
.and(path("/page2"))
335+
.respond_with(
336+
ResponseTemplate::new(200)
337+
.set_body_string("Page 2")
338+
.insert_header("content-type", "text/plain"),
339+
)
340+
.mount(&server)
341+
.await;
342+
343+
let requests = vec![
344+
FetchRequest::new(format!("{}/page1", server.uri())),
345+
FetchRequest::new(format!("{}/page2", server.uri())),
346+
];
347+
348+
let options = FetchOptions {
349+
enable_markdown: true,
350+
dns_policy: DnsPolicy::allow_all(),
351+
..Default::default()
352+
};
353+
let results = batch_fetch_with_options(requests, options, None).await;
354+
355+
assert_eq!(results.len(), 2);
356+
assert!(results[0]
357+
.as_ref()
358+
.unwrap()
359+
.content
360+
.as_deref()
361+
.unwrap()
362+
.contains("Page 1"));
363+
assert!(results[1]
364+
.as_ref()
365+
.unwrap()
366+
.content
367+
.as_deref()
368+
.unwrap()
369+
.contains("Page 2"));
370+
}
371+
372+
#[tokio::test]
373+
async fn test_batch_fetch_partial_failure() {
374+
use wiremock::matchers::{method, path};
375+
use wiremock::{Mock, MockServer, ResponseTemplate};
376+
377+
let server = MockServer::start().await;
378+
Mock::given(method("GET"))
379+
.and(path("/ok"))
380+
.respond_with(
381+
ResponseTemplate::new(200)
382+
.set_body_string("OK")
383+
.insert_header("content-type", "text/plain"),
384+
)
385+
.mount(&server)
386+
.await;
387+
388+
let requests = vec![
389+
FetchRequest::new(format!("{}/ok", server.uri())),
390+
FetchRequest::new(""), // Will fail with MissingUrl
391+
];
392+
393+
let options = FetchOptions {
394+
dns_policy: DnsPolicy::allow_all(),
395+
..Default::default()
396+
};
397+
let results = batch_fetch_with_options(requests, options, None).await;
398+
399+
assert_eq!(results.len(), 2);
400+
assert!(results[0].is_ok());
401+
assert!(results[1].is_err());
402+
}
403+
404+
#[tokio::test]
405+
async fn test_batch_fetch_respects_concurrency_limit() {
406+
// Just verify it works with concurrency=1
407+
let requests = vec![
408+
FetchRequest::new(""), // Will fail
409+
FetchRequest::new(""), // Will fail
410+
];
411+
412+
let results = batch_fetch(requests, Some(1)).await;
413+
assert_eq!(results.len(), 2);
414+
assert!(results[0].is_err());
415+
assert!(results[1].is_err());
416+
}
417+
418+
#[tokio::test]
419+
async fn test_batch_fetch_empty_input() {
420+
let results = batch_fetch(vec![], None).await;
421+
assert!(results.is_empty());
422+
}
232423
}

crates/fetchkit/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub mod file_saver;
8484
mod tool;
8585
mod types;
8686

87-
pub use client::{fetch, fetch_with_options, FetchOptions};
87+
pub use client::{batch_fetch, batch_fetch_with_options, fetch, fetch_with_options, FetchOptions};
8888
pub use convert::{
8989
extract_headings, extract_metadata, html_to_markdown, html_to_text, strip_boilerplate,
9090
};

0 commit comments

Comments
 (0)