Skip to content

Commit 18703fd

Browse files
committed
perf: wrap download_many call sites with idempotent_dir_async
Use the .success marker pattern at all download_many call sites so that a completed batch of downloads is skipped in full on re-runs: - clickbench Partitioned: wraps the 100-shard parquet_dir download - public_bi download_bzips: wraps the csv_bzip2 directory download - vector_dataset download: wraps the train-shard directory download Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 95850b6 commit 18703fd

File tree

3 files changed

+34
-16
lines changed

3 files changed

+34
-16
lines changed

vortex-bench/src/clickbench/data.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::Format;
2222
pub use crate::conversions::convert_parquet_directory_to_vortex;
2323
use crate::datasets::data_downloads::download_data;
2424
use crate::datasets::data_downloads::download_many;
25+
use crate::idempotent_dir_async;
2526

2627
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
2728
use DataType::*;
@@ -195,12 +196,16 @@ impl Flavor {
195196
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
196197
info!("Downloading 100 ClickBench parquet shards");
197198
let parquet_dir = basepath.join(Format::Parquet.name());
198-
let downloads = (0_u32..100).map(|idx| {
199-
let output_path = parquet_dir.join(format!("hits_{idx}.parquet"));
200-
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
201-
(output_path, url)
202-
});
203-
download_many(downloads).await?;
199+
let parquet_dir2 = parquet_dir.clone();
200+
idempotent_dir_async(&parquet_dir, || async move {
201+
let downloads = (0_u32..100).map(|idx| {
202+
let output_path = parquet_dir2.join(format!("hits_{idx}.parquet"));
203+
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
204+
(output_path, url)
205+
});
206+
download_many(downloads).await.map(|_| ())
207+
})
208+
.await?;
204209
}
205210
}
206211
Ok(())

vortex-bench/src/public_bi.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::datasets::Dataset;
4545
use crate::datasets::data_downloads::decompress_bz2;
4646
use crate::datasets::data_downloads::download_many;
4747
use crate::idempotent_async;
48+
use crate::idempotent_dir_async;
4849
use crate::workspace_root;
4950

5051
pub static PBI_DATASETS: LazyLock<PBIDatasets> = LazyLock::new(|| {
@@ -289,13 +290,21 @@ pub struct PBIData {
289290

290291
impl PBIData {
291292
async fn download_bzips(&self) -> anyhow::Result<()> {
292-
let downloads = self.tables.iter().map(|table| {
293-
(
294-
self.get_file_path(&table.name, FileType::CsvBzip2),
295-
table.data_url.as_str().to_owned(),
296-
)
297-
});
298-
download_many(downloads).await?;
293+
let bzip_dir = self.base_path.join(FileType::CsvBzip2.name());
294+
let downloads: Vec<(PathBuf, String)> = self
295+
.tables
296+
.iter()
297+
.map(|table| {
298+
(
299+
self.get_file_path(&table.name, FileType::CsvBzip2),
300+
table.data_url.as_str().to_owned(),
301+
)
302+
})
303+
.collect();
304+
idempotent_dir_async(&bzip_dir, || async move {
305+
download_many(downloads).await.map(|_| ())
306+
})
307+
.await?;
299308
Ok(())
300309
}
301310

vortex-bench/src/vector_dataset/download.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use anyhow::Result;
2020

2121
use crate::datasets::data_downloads::download_data;
2222
use crate::datasets::data_downloads::download_many;
23+
use crate::idempotent_dir_async;
2324
use crate::vector_dataset::catalog::VectorDataset;
2425
use crate::vector_dataset::layout::LayoutSpec;
2526
use crate::vector_dataset::layout::TrainLayout;
@@ -102,14 +103,17 @@ pub async fn download(ds: VectorDataset, layout: TrainLayout) -> Result<DatasetP
102103
let train_targets = paths::train_files(ds, layout, spec.num_files());
103104
debug_assert_eq!(urls.len(), train_targets.len());
104105

106+
let train_dir = paths::train_dir(ds, layout);
105107
let train_downloads: Vec<(PathBuf, String)> = train_targets
106108
.iter()
107109
.cloned()
108110
.zip(urls.into_iter())
109111
.collect();
110-
download_many(train_downloads)
111-
.await
112-
.with_context(|| format!("download train shards for {}", ds.name()))?;
112+
idempotent_dir_async(&train_dir, || async move {
113+
download_many(train_downloads).await.map(|_| ())
114+
})
115+
.await
116+
.with_context(|| format!("download train shards for {}", ds.name()))?;
113117

114118
let test = download_data(paths::test_path(ds, layout), &test_url(ds))
115119
.await

0 commit comments

Comments
 (0)