Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ run_tpch() {
echo "Running tpch benchmark..."

FORMAT=$2
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the tpch in memory (needs tpch parquet data)
Expand All @@ -693,7 +693,7 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the tpcds benchmark
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/queries/q10.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate >= date '1993-10-01'
and o_orderdate < date '1994-01-01'
and o_orderdate < date '1993-10-01' + interval '3' month
and l_returnflag = 'R'
and c_nationkey = n_nationkey
group by
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/queries/q11.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ group by
ps_partkey having
sum(ps_supplycost * ps_availqty) > (
select
sum(ps_supplycost * ps_availqty) * 0.0001
sum(ps_supplycost * ps_availqty) * 0.0001 /* __TPCH_Q11_FRACTION__ */
from
partsupp,
supplier,
Expand All @@ -24,4 +24,4 @@ group by
and n_name = 'GERMANY'
)
order by
value desc;
value desc;
4 changes: 2 additions & 2 deletions benchmarks/queries/q12.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ where
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1994-01-01'
and l_receiptdate < date '1995-01-01'
and l_receiptdate < date '1994-01-01' + interval '1' year
group by
l_shipmode
order by
l_shipmode;
l_shipmode;
2 changes: 1 addition & 1 deletion benchmarks/queries/q14.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ from
where
l_partkey = p_partkey
and l_shipdate >= date '1995-09-01'
and l_shipdate < date '1995-10-01';
and l_shipdate < date '1995-09-01' + interval '1' month;
4 changes: 2 additions & 2 deletions benchmarks/queries/q5.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ where
and n_regionkey = r_regionkey
and r_name = 'ASIA'
and o_orderdate >= date '1994-01-01'
and o_orderdate < date '1995-01-01'
and o_orderdate < date '1994-01-01' + interval '1' year
group by
n_name
order by
revenue desc;
revenue desc;
4 changes: 2 additions & 2 deletions benchmarks/queries/q6.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ from
lineitem
where
l_shipdate >= date '1994-01-01'
and l_shipdate < date '1995-01-01'
and l_shipdate < date '1994-01-01' + interval '1' year
and l_discount between 0.06 - 0.01 and 0.06 + 0.01
and l_quantity < 24;
and l_quantity < 24;
86 changes: 86 additions & 0 deletions benchmarks/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[

pub const TPCH_QUERY_START_ID: usize = 1;
pub const TPCH_QUERY_END_ID: usize = 22;
const TPCH_Q11_FRACTION_SENTINEL: &str = "0.0001 /* __TPCH_Q11_FRACTION__ */";

/// The `.tbl` file contains a trailing column
pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
Expand Down Expand Up @@ -139,6 +140,21 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {

/// Get the SQL statements from the specified query file
pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
get_query_sql_for_scale_factor(query, 1.0)
}

/// Get the SQL statements from the specified query file using the provided scale factor for
/// TPC-H substitutions such as Q11 FRACTION.
pub fn get_query_sql_for_scale_factor(
query: usize,
scale_factor: f64,
) -> Result<Vec<String>> {
if !(scale_factor.is_finite() && scale_factor > 0.0) {
return plan_err!(
"invalid scale factor. Expected a positive finite value, got {scale_factor}"
);
}

if query > 0 && query < 23 {
let possibilities = vec![
format!("queries/q{query}.sql"),
Expand All @@ -148,6 +164,7 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
for filename in possibilities {
match fs::read_to_string(&filename) {
Ok(contents) => {
let contents = customize_query_sql(query, contents, scale_factor)?;
return Ok(contents
.split(';')
.map(|s| s.trim())
Expand All @@ -164,6 +181,27 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
}
}

fn customize_query_sql(
query: usize,
contents: String,
scale_factor: f64,
) -> Result<String> {
if query != 11 {
return Ok(contents);
}

if !contents.contains(TPCH_Q11_FRACTION_SENTINEL) {
return plan_err!(
"invalid query 11. Missing fraction marker {TPCH_Q11_FRACTION_SENTINEL}"
);
}

Ok(contents.replace(
TPCH_Q11_FRACTION_SENTINEL,
&format!("(0.0001 / {scale_factor})"),
))
}

pub const QUERY_LIMIT: [Option<usize>; 22] = [
None,
Some(100),
Expand All @@ -188,3 +226,51 @@ pub const QUERY_LIMIT: [Option<usize>; 22] = [
Some(100),
None,
];

#[cfg(test)]
mod tests {
use super::{get_query_sql, get_query_sql_for_scale_factor};
use datafusion::error::Result;

fn get_single_query(query: usize) -> Result<String> {
let mut queries = get_query_sql(query)?;
assert_eq!(queries.len(), 1);
Ok(queries.remove(0))
}

fn get_single_query_for_scale_factor(
query: usize,
scale_factor: f64,
) -> Result<String> {
let mut queries = get_query_sql_for_scale_factor(query, scale_factor)?;
assert_eq!(queries.len(), 1);
Ok(queries.remove(0))
}

#[test]
fn q11_uses_scale_factor_substitution() -> Result<()> {
let sf1_sql = get_single_query(11)?;
assert!(sf1_sql.contains("(0.0001 / 1)"));

let sf01_sql = get_single_query_for_scale_factor(11, 0.1)?;
assert!(sf01_sql.contains("(0.0001 / 0.1)"));

let sf10_sql = get_single_query_for_scale_factor(11, 10.0)?;
assert!(sf10_sql.contains("(0.0001 / 10)"));

let sf30_sql = get_single_query_for_scale_factor(11, 30.0)?;
assert!(sf30_sql.contains("(0.0001 / 30)"));
assert!(!sf10_sql.contains("__TPCH_Q11_FRACTION__"));
Ok(())
}

#[test]
fn interval_queries_use_interval_arithmetic() -> Result<()> {
assert!(get_single_query(5)?.contains("date '1994-01-01' + interval '1' year"));
assert!(get_single_query(6)?.contains("date '1994-01-01' + interval '1' year"));
assert!(get_single_query(10)?.contains("date '1993-10-01' + interval '3' month"));
assert!(get_single_query(12)?.contains("date '1994-01-01' + interval '1' year"));
assert!(get_single_query(14)?.contains("date '1995-09-01' + interval '1' month"));
Ok(())
}
}
98 changes: 92 additions & 6 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use super::{
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql,
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql_for_scale_factor,
get_tbl_tpch_table_schema, get_tpch_table_schema,
};
use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};

use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
use datafusion::common::exec_err;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -71,6 +72,11 @@ pub struct RunOpt {
#[arg(required = true, short = 'p', long = "path")]
path: PathBuf,

/// TPC-H scale factor used for query substitutions such as Q11 FRACTION.
/// If omitted, the benchmark tries to infer it from paths like `.../tpch_sf10/...`.
#[arg(long)]
scale_factor: Option<f64>,

/// File format: `csv` or `parquet`
#[arg(short = 'f', long = "format", default_value = "csv")]
file_format: String,
Expand Down Expand Up @@ -133,10 +139,11 @@ impl RunOpt {
let ctx = SessionContext::new_with_config_rt(config, rt);
// register tables
self.register_tables(&ctx).await?;
let scale_factor = self.scale_factor()?;

for query_id in query_range {
benchmark_run.start_new_case(&format!("Query {query_id}"));
let query_run = self.benchmark_query(query_id, &ctx).await;
let query_run = self.benchmark_query(query_id, scale_factor, &ctx).await;
match query_run {
Ok(query_results) => {
for iter in query_results {
Expand All @@ -157,13 +164,14 @@ impl RunOpt {
async fn benchmark_query(
&self,
query_id: usize,
scale_factor: f64,
ctx: &SessionContext,
) -> Result<Vec<QueryResult>> {
let mut millis = vec![];
// run benchmark
let mut query_results = vec![];

let sql = &get_query_sql(query_id)?;
let sql = &get_query_sql_for_scale_factor(query_id, scale_factor)?;

for i in 0..self.iterations() {
let start = Instant::now();
Expand Down Expand Up @@ -346,6 +354,82 @@ impl RunOpt {
.partitions
.unwrap_or_else(get_available_parallelism)
}

fn scale_factor(&self) -> Result<f64> {
resolve_scale_factor(self.scale_factor, &self.path)
}
}

fn resolve_scale_factor(scale_factor: Option<f64>, path: &Path) -> Result<f64> {
let scale_factor = scale_factor
.or_else(|| infer_scale_factor_from_path(path))
.unwrap_or(1.0);

if scale_factor.is_finite() && scale_factor > 0.0 {
Ok(scale_factor)
} else {
exec_err!(
"Invalid TPC-H scale factor {scale_factor}. Expected a positive finite value"
)
}
}

fn infer_scale_factor_from_path(path: &Path) -> Option<f64> {
path.iter().find_map(|component| {
component
.to_str()?
.strip_prefix("tpch_sf")?
.parse::<f64>()
.ok()
})
}

#[cfg(test)]
mod scale_factor_tests {
use std::path::Path;

use super::{infer_scale_factor_from_path, resolve_scale_factor};
use datafusion::error::Result;

#[test]
fn uses_explicit_scale_factor_when_provided() -> Result<()> {
let scale_factor =
resolve_scale_factor(Some(30.0), Path::new("benchmarks/data/tpch_sf10"))?;
assert_eq!(scale_factor, 30.0);
Ok(())
}

#[test]
fn infers_scale_factor_from_standard_tpch_path() -> Result<()> {
let scale_factor =
resolve_scale_factor(None, Path::new("benchmarks/data/tpch_sf10"))?;
assert_eq!(scale_factor, 10.0);
assert_eq!(
infer_scale_factor_from_path(Path::new("benchmarks/data/tpch_sf0.1")),
Some(0.1)
);
Ok(())
}

#[test]
fn defaults_to_sf1_when_path_has_no_scale_factor() -> Result<()> {
let scale_factor = resolve_scale_factor(None, Path::new("benchmarks/data"))?;
assert_eq!(scale_factor, 1.0);
Ok(())
}

#[test]
fn rejects_invalid_scale_factors() {
assert!(resolve_scale_factor(Some(0.0), Path::new("benchmarks/data")).is_err());
assert!(resolve_scale_factor(Some(-1.0), Path::new("benchmarks/data")).is_err());
assert!(
resolve_scale_factor(Some(f64::NAN), Path::new("benchmarks/data")).is_err()
);
assert!(
resolve_scale_factor(Some(f64::INFINITY), Path::new("benchmarks/data"))
.is_err()
);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -392,6 +476,7 @@ mod tests {
query: Some(query),
common,
path: PathBuf::from(path.to_string()),
scale_factor: Some(1.0),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
Expand All @@ -402,7 +487,7 @@ mod tests {
hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
let queries = crate::tpch::get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.into_optimized_plan()?;
Expand Down Expand Up @@ -432,6 +517,7 @@ mod tests {
query: Some(query),
common,
path: PathBuf::from(path.to_string()),
scale_factor: Some(1.0),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
Expand All @@ -442,7 +528,7 @@ mod tests {
hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
let queries = crate::tpch::get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
Expand Down
Loading
Loading