From 3925fbd1528a4954ae7058b4233fa4f273777646 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 26 Feb 2025 18:58:37 +0800 Subject: [PATCH 1/2] Add H2O.ai Database-like Ops benchmark to dfbench (join support) --- benchmarks/bench.sh | 162 ++++++++++++++++++++++++++++++++++++++++-- benchmarks/src/h2o.rs | 22 +++++- 2 files changed, 177 insertions(+), 7 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 96c90aa1f60d8..e05ebd4c51664 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -80,9 +80,12 @@ clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark -h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv -h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv -h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv +h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv +h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv +h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv +h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv +h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv +h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet ********** @@ -149,6 +152,9 @@ main() { data_h2o "SMALL" data_h2o "MEDIUM" data_h2o "BIG" + data_h2o_join "SMALL" + data_h2o_join "MEDIUM" + data_h2o_join "BIG" data_clickbench_1 data_clickbench_partitioned data_imdb @@ -188,6 +194,15 @@ main() { h2o_big) data_h2o "BIG" "CSV" ;; + h2o_small_join) + data_h2o_join "SMALL" "CSV" + ;; + h2o_medium_join) + data_h2o_join "MEDIUM" "CSV" + ;; + h2o_big_join) + data_h2o_join "BIG" "CSV" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -240,6 +255,9 @@ main() { run_h2o "SMALL" "PARQUET" "groupby" run_h2o "MEDIUM" "PARQUET" "groupby" run_h2o "BIG" "PARQUET" "groupby" + run_h2o_join "SMALL" "PARQUET" "join" + run_h2o_join "MEDIUM" "PARQUET" "join" + run_h2o_join "BIG" "PARQUET" "join" run_imdb run_external_aggr ;; @@ -282,6 +300,15 @@ main() { h2o_big) run_h2o "BIG" "CSV" "groupby" ;; + h2o_small_join) + run_h2o_join "SMALL" "CSV" "join" + ;; + h2o_medium_join) + run_h2o_join "MEDIUM" "CSV" "join" + ;; + h2o_big_join) + run_h2o_join "BIG" "CSV" "join" + ;; external_aggr) run_external_aggr ;; @@ -674,7 +701,82 @@ data_h2o() { deactivate } -## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join +data_h2o_join() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"CSV"} + + # Function to compare Python versions + version_ge() { + [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ] + } + + export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1 + + # Find the highest available Python version (3.10 or higher) + REQUIRED_VERSION="3.10" + PYTHON_CMD=$(command -v python3 || true) + + if [ -n "$PYTHON_CMD" ]; then + PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + echo "Found Python version $PYTHON_VERSION, which is suitable." + else + echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required." + PYTHON_CMD="" + fi + fi + + # Search for suitable Python versions if the default is unsuitable + if [ -z "$PYTHON_CMD" ]; then + # Loop through all available Python3 commands on the system + for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do + if command -v "$CMD" &> /dev/null; then + PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + PYTHON_CMD="$CMD" + echo "Found suitable Python version: $PYTHON_VERSION ($CMD)" + break + fi + fi + done + fi + + # If no suitable Python version found, exit with an error + if [ -z "$PYTHON_CMD" ]; then + echo "Python 3.10 or higher is required. Please install it." + return 1 + fi + + echo "Using Python command: $PYTHON_CMD" + + # Install falsa and other dependencies + echo "Installing falsa..." + + # Set virtual environment directory + VIRTUAL_ENV="${PWD}/venv" + + # Create a virtual environment using the detected Python command + $PYTHON_CMD -m venv "$VIRTUAL_ENV" + + # Activate the virtual environment and install dependencies + source "$VIRTUAL_ENV/bin/activate" + + # Ensure 'falsa' is installed (avoid unnecessary reinstall) + pip install --quiet --upgrade falsa + + # Create directory if it doesn't exist + H2O_DIR="${DATA_DIR}/h2o" + mkdir -p "${H2O_DIR}" + + # Generate h2o test data + echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}" + falsa join --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}" + + # Deactivate virtual environment after completion + deactivate +} + run_h2o() { # Default values for size and data format SIZE=${1:-"SMALL"} @@ -687,7 +789,7 @@ run_h2o() { RESULTS_FILE="${RESULTS_DIR}/h2o.json" echo "RESULTS_FILE: ${RESULTS_FILE}" - echo "Running h2o benchmark..." + echo "Running h2o groupby benchmark..." # Set the file name based on the size case "$SIZE" in @@ -717,6 +819,56 @@ run_h2o() { -o "${RESULTS_FILE}" } +run_h2o_join() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"CSV"} + DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]') + RUN_Type=${3:-"join"} + + # Data directory and results file path + H2O_DIR="${DATA_DIR}/h2o" + RESULTS_FILE="${RESULTS_DIR}/h2o_join.json" + + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running h2o join benchmark..." + + # Set the file name based on the size + case "$SIZE" in + "SMALL") + X_TABLE_FILE_NAME="J1_1e7_NA_0.${DATA_FORMAT}" + SMALL_TABLE_FILE_NAME="J1_1e7_1e1_0.${DATA_FORMAT}" + MEDIUM_TABLE_FILE_NAME="J1_1e7_1e4_0.${DATA_FORMAT}" + LARGE_TABLE_FILE_NAME="J1_1e7_1e7_NA.${DATA_FORMAT}" + ;; + "MEDIUM") + X_TABLE_FILE_NAME="J1_1e8_NA_0.${DATA_FORMAT}" + SMALL_TABLE_FILE_NAME="J1_1e8_1e2_0.${DATA_FORMAT}" + MEDIUM_TABLE_FILE_NAME="J1_1e8_1e5_0.${DATA_FORMAT}" + LARGE_TABLE_FILE_NAME="J1_1e8_1e8_NA.${DATA_FORMAT}" + ;; + "BIG") + X_TABLE_FILE_NAME="J1_1e9_NA_0.${DATA_FORMAT}" + SMALL_TABLE_FILE_NAME="J1_1e9_1e3_0.${DATA_FORMAT}" + MEDIUM_TABLE_FILE_NAME="J1_1e9_1e6_0.${DATA_FORMAT}" + LARGE_TABLE_FILE_NAME="J1_1e9_1e9_NA.${DATA_FORMAT}" + ;; + *) + echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG." + return 1 + ;; + esac + + # Set the query file name based on the RUN_Type + QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql" + + $CARGO_COMMAND --bin dfbench -- h2o \ + --iterations 3 \ + --join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \ + --queries-path "${QUERY_FILE}" \ + -o "${RESULTS_FILE}" +} + # Runs the external aggregation benchmark run_external_aggr() { # Use TPC-H SF1 dataset diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index eae7f67f1d62c..955c23aa79854 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -53,6 +53,16 @@ pub struct RunOpt { )] path: PathBuf, + /// Path to data files (parquet or csv), using , to separate the paths + /// Default value is the small files for join x table, small table, medium table, big table files in the h2o benchmark + /// This is the small csv file case + #[structopt( + short = "join-paths", + long = "join-paths", + default_value = "benchmarks/data/h2o/J1_1e7_NA_0.csv,benchmarks/data/h2o/J1_1e7_1e1_0.csv,benchmarks/data/h2o/J1_1e7_1e4_0.csv,benchmarks/data/h2o/J1_1e7_1e7_NA.csv" + )] + join_paths: String, + /// If present, write results json here #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, @@ -71,8 +81,16 @@ impl RunOpt { let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); - // Register data - self.register_data(&ctx).await?; + if self.queries_path.to_str().unwrap().contains("join") { + let join_paths: Vec<&str> = self.join_paths.split(',').collect(); + let table_name: Vec<&str> = vec!["x", "small", "medium", "large"]; + for (i, path) in join_paths.iter().enumerate() { + ctx.register_csv(table_name[i], path, Default::default()) + .await?; + } + } else if self.queries_path.to_str().unwrap().contains("groupby") { + self.register_data(&ctx).await?; + } let iterations = self.common.iterations; let mut benchmark_run = BenchmarkRun::new(); From 644b4bc776fb39bb9a28f7b195e6118816905398 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 27 Feb 2025 16:24:24 +0800 Subject: [PATCH 2/2] address new comments --- benchmarks/README.md | 44 +++++++++++++++++++++++++++++++++++++++++++ benchmarks/src/h2o.rs | 5 +++++ 2 files changed, 49 insertions(+) diff --git a/benchmarks/README.md b/benchmarks/README.md index 2954f42c25db4..c0bdb1dba7199 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -513,5 +513,49 @@ For example, to run query 1 with the small data generated above: cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1 ``` +## h2o benchmarks for join + +### Generate data for h2o benchmarks +There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory. + +1. Generate small data (4 table files, the largest is 1e7 rows) +```bash +./bench.sh data h2o_small_join +``` + + +2. Generate medium data (4 table files, the largest is 1e8 rows) +```bash +./bench.sh data h2o_medium_join +``` + +3. Generate large data (4 table files, the largest is 1e9 rows) +```bash +./bench.sh data h2o_big_join +``` + +### Run h2o benchmarks +There are three options for running h2o benchmarks: `small`, `medium`, and `big`. +1. Run small data benchmark +```bash +./bench.sh run h2o_small_join +``` + +2. Run medium data benchmark +```bash +./bench.sh run h2o_medium_join +``` + +3. Run large data benchmark +```bash +./bench.sh run h2o_big_join +``` + +4. Run a specific query with a specific join data paths, the data paths are including 4 table files. + +For example, to run query 1 with the small data generated above: +```bash +cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/join.sql --query 1 +``` [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 955c23aa79854..cc463e70d74a2 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -99,17 +99,22 @@ impl RunOpt { let sql = queries.get_query(query_id)?; println!("Q{query_id}: {sql}"); + let mut millis = Vec::with_capacity(iterations); for i in 1..=iterations { let start = Instant::now(); let results = ctx.sql(sql).await?.collect().await?; let elapsed = start.elapsed(); let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); println!( "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" ); benchmark_run.write_iter(elapsed, row_count); } + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {query_id} avg time: {avg:.2} ms"); + if self.common.debug { ctx.sql(sql).await?.explain(false, false)?.show().await?; }