Skip to content

Commit 7e0af3c

Browse files
authored
Unify download management (#7490)
## Summary Unifies the download management for benchmarks. Also makes the downloads smarter with AIMD and nicer progress bars. ## Testing I just ran it in my terminal and it works well enough. Let me know if we want a video for this and I can figure that out. https://github.com/user-attachments/assets/85a7a0a6-3b88-4f35-bcbc-41dbdba37aff --------- Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent d441299 commit 7e0af3c

File tree

8 files changed

+789
-242
lines changed

8 files changed

+789
-242
lines changed

vortex-bench/src/clickbench/benchmark.rs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use std::path::Path;
77

88
use anyhow::Result;
99
use url::Url;
10-
use vortex::error::VortexExpect;
1110

1211
use crate::Benchmark;
1312
use crate::BenchmarkDataset;
1413
use crate::IdempotentPath;
1514
use crate::TableSpec;
1615
use crate::clickbench::*;
16+
use crate::utils::file::resolve_data_url;
1717

1818
/// ClickBench benchmark implementation
1919
pub struct ClickBenchBenchmark {
@@ -37,31 +37,7 @@ impl ClickBenchBenchmark {
3737
}
3838

3939
fn create_data_url(remote_data_dir: &Option<String>, flavor: Flavor) -> Result<Url> {
40-
match remote_data_dir {
41-
None => {
42-
let basepath = format!("clickbench_{flavor}").to_data_path();
43-
Ok(Url::parse(&format!(
44-
"file:{}/",
45-
basepath.to_str().vortex_expect("path should be utf8")
46-
))?)
47-
}
48-
Some(remote_data_dir) => {
49-
if !remote_data_dir.ends_with("/") {
50-
tracing::warn!(
51-
"Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/parquet/"
52-
);
53-
}
54-
tracing::info!(
55-
concat!(
56-
"Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\\n",
57-
"If it does not, you should kill this command, locally generate the files (by running without\\n",
58-
"--use-remote-data-dir) and upload data/clickbench/ to some remote location.",
59-
),
60-
remote_data_dir,
61-
);
62-
Ok(Url::parse(remote_data_dir)?)
63-
}
64-
}
40+
resolve_data_url(remote_data_dir.as_deref(), &format!("clickbench_{flavor}"))
6541
}
6642
}
6743

vortex-bench/src/clickbench/data.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ use arrow_schema::TimeUnit;
1414
use clap::ValueEnum;
1515
use serde::Deserialize;
1616
use serde::Serialize;
17-
use tokio::task::JoinSet;
1817
use tracing::info;
1918
use vortex::error::VortexExpect;
2019

2120
use crate::Format;
2221
// Re-export for use by clickbench_benchmark
2322
pub use crate::conversions::convert_parquet_directory_to_vortex;
2423
use crate::datasets::data_downloads::download_data;
24+
use crate::datasets::data_downloads::download_many;
2525

2626
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
2727
use DataType::*;
@@ -193,18 +193,14 @@ impl Flavor {
193193
Flavor::Partitioned => {
194194
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
195195
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
196-
197-
let mut tasks = (0_u32..100).map(|idx| {
198-
let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet"));
199-
200-
info!("Downloading file {idx}");
196+
info!("Downloading 100 ClickBench parquet shards");
197+
let parquet_dir = basepath.join(Format::Parquet.name());
198+
let downloads = (0_u32..100).map(|idx| {
199+
let output_path = parquet_dir.join(format!("hits_{idx}.parquet"));
201200
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
202-
download_data(output_path, url)
203-
}).collect::<JoinSet<_>>();
204-
205-
while let Some(task) = tasks.join_next().await {
206-
task??;
207-
}
201+
(output_path, url)
202+
});
203+
download_many(downloads).await?;
208204
}
209205
}
210206
Ok(())

0 commit comments

Comments
 (0)