diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 7339aba78f206..8952e456398d0 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -100,15 +100,24 @@ clickbench_pushdown: ClickBench queries against partitioned (100 files) parqu clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) # H2O.ai Benchmarks (Group By, Join, Window) -h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv -h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv -h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv -h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv -h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv -h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv -h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv -h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv -h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv +h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv +h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv +h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv +h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv +h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv +h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv +h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv +h2o_medium_window: Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv +h2o_big_window: Extended h2oai benchmark with large dataset (1e9 rows) for window, default file format is csv +h2o_small_parquet: h2oai benchmark with small dataset (1e7 rows) for groupby, file format is parquet +h2o_medium_parquet: h2oai benchmark with medium dataset (1e8 rows) for groupby, file format is parquet +h2o_big_parquet: h2oai benchmark with large dataset (1e9 rows) for groupby, file format is parquet +h2o_small_join_parquet: h2oai benchmark with small dataset (1e7 rows) for join, file format is parquet +h2o_medium_join_parquet: h2oai benchmark with medium dataset (1e8 rows) for join, file format is parquet +h2o_big_join_parquet: h2oai benchmark with large dataset (1e9 rows) for join, file format is parquet +h2o_small_window_parquet: Extended h2oai benchmark with small dataset (1e7 rows) for window, file format is parquet +h2o_medium_window_parquet: Extended h2oai benchmark with medium dataset (1e8 rows) for window, file format is parquet +h2o_big_window_parquet: Extended h2oai benchmark with large dataset (1e9 rows) for window, file format is parquet # Join Order Benchmark (IMDB) imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet @@ -245,6 +254,34 @@ main() { h2o_big_window) data_h2o_join "BIG" "CSV" ;; + h2o_small_parquet) + data_h2o "SMALL" "PARQUET" + ;; + h2o_medium_parquet) + data_h2o "MEDIUM" "PARQUET" + ;; + h2o_big_parquet) + data_h2o "BIG" "PARQUET" + ;; + h2o_small_join_parquet) + data_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_join_parquet) + data_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_join_parquet) + data_h2o_join "BIG" "PARQUET" + ;; + # h2o window benchmark uses the same data as the h2o join + h2o_small_window_parquet) + data_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_window_parquet) + data_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_window_parquet) + data_h2o_join "BIG" "PARQUET" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -381,6 +418,34 @@ main() { h2o_big_window) run_h2o_window "BIG" "CSV" "window" ;; + h2o_small_parquet) + run_h2o "SMALL" "PARQUET" + ;; + h2o_medium_parquet) + run_h2o "MEDIUM" "PARQUET" + ;; + h2o_big_parquet) + run_h2o "BIG" "PARQUET" + ;; + h2o_small_join_parquet) + run_h2o_join "SMALL" "PARQUET" + ;; + h2o_medium_join_parquet) + run_h2o_join "MEDIUM" "PARQUET" + ;; + h2o_big_join_parquet) + run_h2o_join "BIG" "PARQUET" + ;; + # h2o window benchmark uses the same data as the h2o join + h2o_small_window_parquet) + run_h2o_window "SMALL" "PARQUET" + ;; + h2o_medium_window_parquet) + run_h2o_window "MEDIUM" "PARQUET" + ;; + h2o_big_window_parquet) + run_h2o_window "BIG" "PARQUET" + ;; external_aggr) run_external_aggr ;; diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index 009f1708ef983..9d4deaf387283 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -24,7 +24,7 @@ use crate::util::{BenchmarkRun, CommonOpt}; use datafusion::logical_expr::{ExplainFormat, ExplainOption}; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::{ - exec_datafusion_err, instant::Instant, internal_err, DataFusionError, + exec_datafusion_err, instant::Instant, internal_err, DataFusionError, TableReference, }; use std::path::{Path, PathBuf}; use structopt::StructOpt; @@ -92,18 +92,18 @@ impl RunOpt { // Register tables depending on which h2o benchmark is being run // (groupby/join/window) if self.queries_path.to_str().unwrap().ends_with("groupby.sql") { - self.register_data(&ctx).await?; + self.register_data("x", self.path.as_os_str().to_str().unwrap(), &ctx) + .await?; } else if self.queries_path.to_str().unwrap().ends_with("join.sql") { let join_paths: Vec<&str> = self.join_paths.split(',').collect(); let table_name: Vec<&str> = vec!["x", "small", "medium", "large"]; for (i, path) in join_paths.iter().enumerate() { - ctx.register_csv(table_name[i], path, Default::default()) - .await?; + self.register_data(table_name[i], path, &ctx).await?; } } else if self.queries_path.to_str().unwrap().ends_with("window.sql") { // Only register the 'large' table in h2o-join dataset let h2o_join_large_path = self.join_paths.split(',').nth(3).unwrap(); - ctx.register_csv("large", h2o_join_large_path, Default::default()) + self.register_data("large", h2o_join_large_path, &ctx) .await?; } else { return internal_err!("Invalid query file path"); @@ -147,39 +147,52 @@ impl RunOpt { Ok(()) } - async fn register_data(&self, ctx: &SessionContext) -> Result<()> { + async fn register_data( + &self, + table_ref: impl Into, + table_path: impl AsRef, + ctx: &SessionContext, + ) -> Result<()> { let csv_options = Default::default(); let parquet_options = Default::default(); - let path = self.path.as_os_str().to_str().unwrap(); - - if self.path.extension().map(|s| s == "csv").unwrap_or(false) { - ctx.register_csv("x", path, csv_options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {path}"), - Box::new(e), - ) - }) - .expect("error registering csv"); - } - if self - .path + let table_path_str = table_path.as_ref(); + + let extension = Path::new(table_path_str) .extension() - .map(|s| s == "parquet") - .unwrap_or(false) - { - ctx.register_parquet("x", path, parquet_options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'table' as {path}"), - Box::new(e), - ) - }) - .expect("error registering parquet"); + .and_then(|s| s.to_str()) + .unwrap_or(""); + + match extension { + "csv" => { + ctx.register_csv(table_ref, table_path_str, csv_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {table_path_str}"), + Box::new(e), + ) + }) + .expect("error registering csv"); + } + "parquet" => { + ctx.register_parquet(table_ref, table_path_str, parquet_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {table_path_str}"), + Box::new(e), + ) + }) + .expect("error registering parquet"); + } + _ => { + return Err(DataFusionError::Plan(format!( + "Unsupported file extension: {extension}", + ))); + } } + Ok(()) } }