Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,5 +513,49 @@ For example, to run query 1 with the small data generated above:
cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1
```

## h2o benchmarks for join

### Generate data for h2o benchmarks
There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory.

1. Generate small data (4 table files, the largest is 1e7 rows)
```bash
./bench.sh data h2o_small_join
```


2. Generate medium data (4 table files, the largest is 1e8 rows)
```bash
./bench.sh data h2o_medium_join
```

3. Generate large data (4 table files, the largest is 1e9 rows)
```bash
./bench.sh data h2o_big_join
```

### Run h2o benchmarks
There are three options for running h2o benchmarks: `small`, `medium`, and `big`.
1. Run small data benchmark
```bash
./bench.sh run h2o_small_join
```

2. Run medium data benchmark
```bash
./bench.sh run h2o_medium_join
```

3. Run large data benchmark
```bash
./bench.sh run h2o_big_join
```

4. Run a specific query with a specific join data paths, the data paths are including 4 table files.

For example, to run query 1 with the small data generated above:
```bash
cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/join.sql --query 1
```
[1]: http://www.tpc.org/tpch/
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
162 changes: 157 additions & 5 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet

**********
Expand Down Expand Up @@ -149,6 +152,9 @@ main() {
data_h2o "SMALL"
data_h2o "MEDIUM"
data_h2o "BIG"
data_h2o_join "SMALL"
data_h2o_join "MEDIUM"
data_h2o_join "BIG"
data_clickbench_1
data_clickbench_partitioned
data_imdb
Expand Down Expand Up @@ -188,6 +194,15 @@ main() {
h2o_big)
data_h2o "BIG" "CSV"
;;
h2o_small_join)
data_h2o_join "SMALL" "CSV"
;;
h2o_medium_join)
data_h2o_join "MEDIUM" "CSV"
;;
h2o_big_join)
data_h2o_join "BIG" "CSV"
;;
external_aggr)
# same data as for tpch
data_tpch "1"
Expand Down Expand Up @@ -240,6 +255,9 @@ main() {
run_h2o "SMALL" "PARQUET" "groupby"
run_h2o "MEDIUM" "PARQUET" "groupby"
run_h2o "BIG" "PARQUET" "groupby"
run_h2o_join "SMALL" "PARQUET" "join"
run_h2o_join "MEDIUM" "PARQUET" "join"
run_h2o_join "BIG" "PARQUET" "join"
run_imdb
run_external_aggr
;;
Expand Down Expand Up @@ -282,6 +300,15 @@ main() {
h2o_big)
run_h2o "BIG" "CSV" "groupby"
;;
h2o_small_join)
run_h2o_join "SMALL" "CSV" "join"
;;
h2o_medium_join)
run_h2o_join "MEDIUM" "CSV" "join"
;;
h2o_big_join)
run_h2o_join "BIG" "CSV" "join"
;;
external_aggr)
run_external_aggr
;;
Expand Down Expand Up @@ -674,7 +701,82 @@ data_h2o() {
deactivate
}

## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join
data_h2o_join() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}

# Function to compare Python versions
version_ge() {
[ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
}

export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

# Find the highest available Python version (3.10 or higher)
REQUIRED_VERSION="3.10"
PYTHON_CMD=$(command -v python3 || true)

if [ -n "$PYTHON_CMD" ]; then
PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
echo "Found Python version $PYTHON_VERSION, which is suitable."
else
echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
PYTHON_CMD=""
fi
fi

# Search for suitable Python versions if the default is unsuitable
if [ -z "$PYTHON_CMD" ]; then
# Loop through all available Python3 commands on the system
for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
if command -v "$CMD" &> /dev/null; then
PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
PYTHON_CMD="$CMD"
echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
break
fi
fi
done
fi

# If no suitable Python version found, exit with an error
if [ -z "$PYTHON_CMD" ]; then
echo "Python 3.10 or higher is required. Please install it."
return 1
fi

echo "Using Python command: $PYTHON_CMD"

# Install falsa and other dependencies
echo "Installing falsa..."

# Set virtual environment directory
VIRTUAL_ENV="${PWD}/venv"

# Create a virtual environment using the detected Python command
$PYTHON_CMD -m venv "$VIRTUAL_ENV"

# Activate the virtual environment and install dependencies
source "$VIRTUAL_ENV/bin/activate"

# Ensure 'falsa' is installed (avoid unnecessary reinstall)
pip install --quiet --upgrade falsa

# Create directory if it doesn't exist
H2O_DIR="${DATA_DIR}/h2o"
mkdir -p "${H2O_DIR}"

# Generate h2o test data
echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
falsa join --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

# Deactivate virtual environment after completion
deactivate
}

run_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
Expand All @@ -687,7 +789,7 @@ run_h2o() {
RESULTS_FILE="${RESULTS_DIR}/h2o.json"

echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running h2o benchmark..."
echo "Running h2o groupby benchmark..."

# Set the file name based on the size
case "$SIZE" in
Expand Down Expand Up @@ -717,6 +819,56 @@ run_h2o() {
-o "${RESULTS_FILE}"
}

run_h2o_join() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}
DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
RUN_Type=${3:-"join"}

# Data directory and results file path
H2O_DIR="${DATA_DIR}/h2o"
RESULTS_FILE="${RESULTS_DIR}/h2o_join.json"

echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running h2o join benchmark..."

# Set the file name based on the size
case "$SIZE" in
"SMALL")
X_TABLE_FILE_NAME="J1_1e7_NA_0.${DATA_FORMAT}"
SMALL_TABLE_FILE_NAME="J1_1e7_1e1_0.${DATA_FORMAT}"
MEDIUM_TABLE_FILE_NAME="J1_1e7_1e4_0.${DATA_FORMAT}"
LARGE_TABLE_FILE_NAME="J1_1e7_1e7_NA.${DATA_FORMAT}"
;;
"MEDIUM")
X_TABLE_FILE_NAME="J1_1e8_NA_0.${DATA_FORMAT}"
SMALL_TABLE_FILE_NAME="J1_1e8_1e2_0.${DATA_FORMAT}"
MEDIUM_TABLE_FILE_NAME="J1_1e8_1e5_0.${DATA_FORMAT}"
LARGE_TABLE_FILE_NAME="J1_1e8_1e8_NA.${DATA_FORMAT}"
;;
"BIG")
X_TABLE_FILE_NAME="J1_1e9_NA_0.${DATA_FORMAT}"
SMALL_TABLE_FILE_NAME="J1_1e9_1e3_0.${DATA_FORMAT}"
MEDIUM_TABLE_FILE_NAME="J1_1e9_1e6_0.${DATA_FORMAT}"
LARGE_TABLE_FILE_NAME="J1_1e9_1e9_NA.${DATA_FORMAT}"
;;
*)
echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
return 1
;;
esac

# Set the query file name based on the RUN_Type
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

$CARGO_COMMAND --bin dfbench -- h2o \
--iterations 3 \
--join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}"
}

# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
Expand Down
27 changes: 25 additions & 2 deletions benchmarks/src/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ pub struct RunOpt {
)]
path: PathBuf,

/// Path to data files (parquet or csv), using , to separate the paths
/// Default value is the small files for join x table, small table, medium table, big table files in the h2o benchmark
/// This is the small csv file case
#[structopt(
short = "join-paths",
long = "join-paths",
default_value = "benchmarks/data/h2o/J1_1e7_NA_0.csv,benchmarks/data/h2o/J1_1e7_1e1_0.csv,benchmarks/data/h2o/J1_1e7_1e4_0.csv,benchmarks/data/h2o/J1_1e7_1e7_NA.csv"
)]
join_paths: String,

/// If present, write results json here
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
Expand All @@ -71,8 +81,16 @@ impl RunOpt {
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

// Register data
self.register_data(&ctx).await?;
if self.queries_path.to_str().unwrap().contains("join") {
let join_paths: Vec<&str> = self.join_paths.split(',').collect();
let table_name: Vec<&str> = vec!["x", "small", "medium", "large"];
for (i, path) in join_paths.iter().enumerate() {
ctx.register_csv(table_name[i], path, Default::default())
.await?;
}
} else if self.queries_path.to_str().unwrap().contains("groupby") {
self.register_data(&ctx).await?;
}

let iterations = self.common.iterations;
let mut benchmark_run = BenchmarkRun::new();
Expand All @@ -81,17 +99,22 @@ impl RunOpt {
let sql = queries.get_query(query_id)?;
println!("Q{query_id}: {sql}");

let mut millis = Vec::with_capacity(iterations);
for i in 1..=iterations {
let start = Instant::now();
let results = ctx.sql(sql).await?.collect().await?;
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
benchmark_run.write_iter(elapsed, row_count);
}
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

if self.common.debug {
ctx.sql(sql).await?.explain(false, false)?.show().await?;
}
Expand Down