From bacee66965205206f33098d640b36983d0a95a53 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 10:51:01 -0700 Subject: [PATCH 1/6] add count distinct group benchmarks --- .../benches/count_distinct.rs | 115 +++++++++++++++++- 1 file changed, 113 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index e742a3e5c1267..a095e1871afcb 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -23,7 +23,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; -use datafusion_expr::{Accumulator, AggregateUDFImpl}; +use datafusion_expr::{Accumulator, AggregateUDFImpl, EmitTo}; use datafusion_functions_aggregate::count::Count; use datafusion_physical_expr::expressions::col; use rand::rngs::StdRng; @@ -87,6 +87,30 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } +fn prepare_args(data_type: DataType) -> (Arc, AccumulatorArgs<'static>) { + let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); + let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); + let expr = col("f", schema_leaked).unwrap(); + let expr_leaked: &'static _ = Box::leak(Box::new(expr)); + let return_field: Arc = Field::new("f", DataType::Int64, true).into(); + let return_field_leaked: &'static _ = Box::leak(Box::new(return_field.clone())); + let expr_field = expr_leaked.return_field(schema_leaked).unwrap(); + let expr_field_leaked: &'static _ = Box::leak(Box::new(expr_field)); + + let accumulator_args = AccumulatorArgs { + return_field: return_field_leaked.clone(), + schema: schema_leaked, + expr_fields: std::slice::from_ref(expr_field_leaked), + ignore_nulls: false, + order_bys: &[], + is_reversed: false, + name: "count(distinct f)", + is_distinct: true, + exprs: std::slice::from_ref(expr_leaked), + }; + (schema, accumulator_args) +} + fn count_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; @@ -150,5 +174,92 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } -criterion_group!(benches, count_distinct_benchmark); +/// Create group indices with uniform distribution +fn create_uniform_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| rng.random_range(0..num_groups)) + .collect() +} + +/// Create group indices with skewed distribution (80% in 20% of groups) +fn create_skewed_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + let hot_groups = (num_groups / 5).max(1); + (0..BATCH_SIZE) + .map(|_| { + if rng.random_range(0..100) < 80 { + rng.random_range(0..hot_groups) + } else { + rng.random_range(0..num_groups) + } + }) + .collect() +} + +fn count_distinct_groups_benchmark(c: &mut Criterion) { + let count_fn = Count::new(); + + // bench different scenarios + let scenarios = [ + // (name, num_groups, distinct_pct, group_fn) + ("sparse_uniform", 10, 80, "uniform"), + ("moderate_uniform", 100, 80, "uniform"), + ("dense_uniform", 1000, 80, "uniform"), + ("sparse_skewed", 10, 80, "skewed"), + ("dense_skewed", 1000, 80, "skewed"), + ("sparse_high_cardinality", 10, 99, "uniform"), + ("dense_low_cardinality", 1000, 20, "uniform"), + ]; + + for (name, num_groups, distinct_pct, group_type) in scenarios { + let n_distinct = BATCH_SIZE * distinct_pct / 100; + let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; + let group_indices = if group_type == "uniform" { + create_uniform_groups(num_groups) + } else { + create_skewed_groups(num_groups) + }; + + let (_schema, args) = prepare_args(DataType::Int64); + + if count_fn.groups_accumulator_supported(args.clone()) { + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let (_schema, args) = prepare_args(DataType::Int64); + let mut acc = count_fn.create_groups_accumulator(args).unwrap(); + acc.update_batch(&[values.clone()], &group_indices, None, num_groups) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }); + } else { + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let mut accumulators: Vec<_> = (0..num_groups) + .map(|_| prepare_accumulator(DataType::Int64)) + .collect(); + + let arr = values.as_any().downcast_ref::().unwrap(); + for (idx, group_idx) in group_indices.iter().enumerate() { + if let Some(val) = arr.value(idx).into() { + let single_val = + Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef; + accumulators[*group_idx] + .update_batch(std::slice::from_ref(&single_val)) + .unwrap(); + } + } + + let _results: Vec<_> = accumulators + .iter_mut() + .map(|acc| acc.evaluate().unwrap()) + .collect(); + }) + }); + } + } +} + +criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark); criterion_main!(benches); From c4461b7fc8954abac874093a87e3e3f875201aaf Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 11:27:58 -0700 Subject: [PATCH 2/6] add count distinct group benchmarks --- datafusion/functions-aggregate/benches/count_distinct.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index a095e1871afcb..38f184c710ea8 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -261,5 +261,9 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { } } -criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark); +criterion_group!( + benches, + count_distinct_benchmark, + count_distinct_groups_benchmark +); criterion_main!(benches); From 45a19b0ad43fbbd24527414ff1b0e2efb7891a18 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 11:39:08 -0700 Subject: [PATCH 3/6] add count distinct group benchmarks --- datafusion/functions-aggregate/benches/count_distinct.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 38f184c710ea8..4e56cb6b82974 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -228,8 +228,13 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { b.iter(|| { let (_schema, args) = prepare_args(DataType::Int64); let mut acc = count_fn.create_groups_accumulator(args).unwrap(); - acc.update_batch(&[values.clone()], &group_indices, None, num_groups) - .unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); acc.evaluate(EmitTo::All).unwrap() }) }); From 659754f8da7a8b1d4f0648ef53a7c2ff1786f0a2 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 13 Apr 2026 19:07:03 -0700 Subject: [PATCH 4/6] count group benchmark check --- .../benches/count_distinct.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 4e56cb6b82974..427acd46f99cb 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -224,6 +224,7 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { let (_schema, args) = prepare_args(DataType::Int64); if count_fn.groups_accumulator_supported(args.clone()) { +<<<<<<< Updated upstream c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { let (_schema, args) = prepare_args(DataType::Int64); @@ -238,6 +239,25 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { acc.evaluate(EmitTo::All).unwrap() }) }); +======= + c.bench_function( + &format!("count_distinct_groups i64 {num_groups} groups"), + |b| { + b.iter(|| { + let mut acc = + count_fn.create_groups_accumulator(args.clone()).unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }, + ); +>>>>>>> Stashed changes } else { c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { From 5f2d9bb771f5b3279760c03d008b73d0458f84ec Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 13 Apr 2026 19:11:57 -0700 Subject: [PATCH 5/6] count group benchmark check --- .../benches/count_distinct.rs | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 427acd46f99cb..d8d6e68158478 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -224,11 +224,10 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { let (_schema, args) = prepare_args(DataType::Int64); if count_fn.groups_accumulator_supported(args.clone()) { -<<<<<<< Updated upstream c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { - let (_schema, args) = prepare_args(DataType::Int64); - let mut acc = count_fn.create_groups_accumulator(args).unwrap(); + let mut acc = + count_fn.create_groups_accumulator(args.clone()).unwrap(); acc.update_batch( std::slice::from_ref(&values), &group_indices, @@ -239,25 +238,6 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { acc.evaluate(EmitTo::All).unwrap() }) }); -======= - c.bench_function( - &format!("count_distinct_groups i64 {num_groups} groups"), - |b| { - b.iter(|| { - let mut acc = - count_fn.create_groups_accumulator(args.clone()).unwrap(); - acc.update_batch( - std::slice::from_ref(&values), - &group_indices, - None, - num_groups, - ) - .unwrap(); - acc.evaluate(EmitTo::All).unwrap() - }) - }, - ); ->>>>>>> Stashed changes } else { c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { From f61b1e759af09c468eb8b31fdbe3e6f6ea6b3a2a Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 14 Apr 2026 16:20:37 +0300 Subject: [PATCH 6/6] Add AI config files --- .cursor/rules.md | 5 +++++ AGENTS.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 .cursor/rules.md diff --git a/.cursor/rules.md b/.cursor/rules.md new file mode 100644 index 0000000000000..00385e6f5f759 --- /dev/null +++ b/.cursor/rules.md @@ -0,0 +1,5 @@ +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! + diff --git a/AGENTS.md b/AGENTS.md index 9dff7f6f1ffd1..74b77ec968fe1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -39,3 +39,8 @@ When creating a PR, you MUST follow the [PR template](.github/pull_request_templ See the [Testing Quick Start](docs/source/contributor-guide/testing.md#testing-quick-start) for the recommended pre-PR test commands. +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! +