-
Notifications
You must be signed in to change notification settings - Fork 0
21575: chore: add count distinct group benchmarks #312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bacee66
c4461b7
45a19b0
659754f
5f2d9bb
72568bb
f61b1e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| The AI agents should never link to any issue or a pull request | ||
| in any GitHub repository in the code reviews! | ||
|
|
||
| The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ use arrow::array::{ | |
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use criterion::{Criterion, criterion_group, criterion_main}; | ||
| use datafusion_expr::function::AccumulatorArgs; | ||
| use datafusion_expr::{Accumulator, AggregateUDFImpl}; | ||
| use datafusion_expr::{Accumulator, AggregateUDFImpl, EmitTo}; | ||
| use datafusion_functions_aggregate::count::Count; | ||
| use datafusion_physical_expr::expressions::col; | ||
| use rand::rngs::StdRng; | ||
|
|
@@ -87,6 +87,30 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { | |
| .collect() | ||
| } | ||
|
|
||
| fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) { | ||
| let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); | ||
| let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); | ||
| let expr = col("f", schema_leaked).unwrap(); | ||
| let expr_leaked: &'static _ = Box::leak(Box::new(expr)); | ||
| let return_field: Arc<Field> = Field::new("f", DataType::Int64, true).into(); | ||
| let return_field_leaked: &'static _ = Box::leak(Box::new(return_field.clone())); | ||
| let expr_field = expr_leaked.return_field(schema_leaked).unwrap(); | ||
| let expr_field_leaked: &'static _ = Box::leak(Box::new(expr_field)); | ||
|
|
||
| let accumulator_args = AccumulatorArgs { | ||
| return_field: return_field_leaked.clone(), | ||
| schema: schema_leaked, | ||
| expr_fields: std::slice::from_ref(expr_field_leaked), | ||
| ignore_nulls: false, | ||
| order_bys: &[], | ||
| is_reversed: false, | ||
| name: "count(distinct f)", | ||
| is_distinct: true, | ||
| exprs: std::slice::from_ref(expr_leaked), | ||
| }; | ||
| (schema, accumulator_args) | ||
| } | ||
|
|
||
| fn count_distinct_benchmark(c: &mut Criterion) { | ||
| for pct in [80, 99] { | ||
| let n_distinct = BATCH_SIZE * pct / 100; | ||
|
|
@@ -150,5 +174,101 @@ fn count_distinct_benchmark(c: &mut Criterion) { | |
| }); | ||
| } | ||
|
|
||
| criterion_group!(benches, count_distinct_benchmark); | ||
| /// Create group indices with uniform distribution | ||
| fn create_uniform_groups(num_groups: usize) -> Vec<usize> { | ||
| let mut rng = StdRng::seed_from_u64(42); | ||
| (0..BATCH_SIZE) | ||
| .map(|_| rng.random_range(0..num_groups)) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Create group indices with skewed distribution (80% in 20% of groups) | ||
| fn create_skewed_groups(num_groups: usize) -> Vec<usize> { | ||
| let mut rng = StdRng::seed_from_u64(42); | ||
| let hot_groups = (num_groups / 5).max(1); | ||
| (0..BATCH_SIZE) | ||
| .map(|_| { | ||
| if rng.random_range(0..100) < 80 { | ||
| rng.random_range(0..hot_groups) | ||
| } else { | ||
| rng.random_range(0..num_groups) | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| fn count_distinct_groups_benchmark(c: &mut Criterion) { | ||
| let count_fn = Count::new(); | ||
|
|
||
| // bench different scenarios | ||
| let scenarios = [ | ||
| // (name, num_groups, distinct_pct, group_fn) | ||
| ("sparse_uniform", 10, 80, "uniform"), | ||
| ("moderate_uniform", 100, 80, "uniform"), | ||
| ("dense_uniform", 1000, 80, "uniform"), | ||
| ("sparse_skewed", 10, 80, "skewed"), | ||
| ("dense_skewed", 1000, 80, "skewed"), | ||
| ("sparse_high_cardinality", 10, 99, "uniform"), | ||
| ("dense_low_cardinality", 1000, 20, "uniform"), | ||
| ]; | ||
|
|
||
| for (name, num_groups, distinct_pct, group_type) in scenarios { | ||
| let n_distinct = BATCH_SIZE * distinct_pct / 100; | ||
| let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; | ||
| let group_indices = if group_type == "uniform" { | ||
| create_uniform_groups(num_groups) | ||
| } else { | ||
| create_skewed_groups(num_groups) | ||
| }; | ||
|
|
||
| let (_schema, args) = prepare_args(DataType::Int64); | ||
|
|
||
| if count_fn.groups_accumulator_supported(args.clone()) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. datafusion/functions-aggregate/benches/count_distinct.rs:226: Severity: medium 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage. |
||
| c.bench_function(&format!("count_distinct_groups {name}"), |b| { | ||
| b.iter(|| { | ||
| let mut acc = | ||
| count_fn.create_groups_accumulator(args.clone()).unwrap(); | ||
| acc.update_batch( | ||
| std::slice::from_ref(&values), | ||
| &group_indices, | ||
| None, | ||
| num_groups, | ||
| ) | ||
| .unwrap(); | ||
| acc.evaluate(EmitTo::All).unwrap() | ||
| }) | ||
| }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Groups accumulator benchmark path is unreachable dead codeHigh Severity The Additional Locations (1)Reviewed by Cursor Bugbot for commit f61b1e7. Configure here. |
||
| } else { | ||
| c.bench_function(&format!("count_distinct_groups {name}"), |b| { | ||
| b.iter(|| { | ||
| let mut accumulators: Vec<_> = (0..num_groups) | ||
| .map(|_| prepare_accumulator(DataType::Int64)) | ||
| .collect(); | ||
|
|
||
| let arr = values.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| for (idx, group_idx) in group_indices.iter().enumerate() { | ||
| if let Some(val) = arr.value(idx).into() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null check using
|
||
| let single_val = | ||
| Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef; | ||
| accumulators[*group_idx] | ||
| .update_batch(std::slice::from_ref(&single_val)) | ||
| .unwrap(); | ||
| } | ||
| } | ||
|
Comment on lines
+248
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fallback path for
Using for (idx, group_idx) in group_indices.iter().enumerate() {
let single_val = values.slice(idx, 1);
accumulators[*group_idx]
.update_batch(std::slice::from_ref(&single_val))
.unwrap();
}
Comment on lines
+248
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify if this code compiles and check Arrow's Int64Array::value signature
ast-grep --pattern 'fn value($_, $_) -> $RET'
# Also check if there's any Into<Option> impl for i64 in the codebase
rg -n 'impl.*Into.*Option.*for.*i64' --type rustRepository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # First, let's look at the actual file and the lines in question
head -260 datafusion/functions-aggregate/benches/count_distinct.rs | tail -20Repository: martin-augment/datafusion Length of output: 1048 🏁 Script executed: # Search for Int64Array definition and its value method
fd -t f -e rs | xargs grep -l "impl.*Int64Array" | head -5Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Look for where Int64Array comes from (likely arrow crate)
rg "type Int64Array" --type rust -A 2Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Check the imports in the count_distinct.rs file
head -30 datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion Length of output: 1360 🏁 Script executed: # Find and check the Arrow version and Int64Array definition
cd datafusion/functions-aggregate && cat Cargo.toml | grep -A 5 arrowRepository: martin-augment/datafusion Length of output: 466 🏁 Script executed: # Search for the actual value() method signature in the codebase or check error when trying to compile
# Let's see if we can find any reference to Int64Array::value in the codebase
rg "\.value\(" datafusion/functions-aggregate/benches/count_distinct.rs -B 2 -A 2Repository: martin-augment/datafusion Length of output: 433 🏁 Script executed: # Let's check what the actual method returns - look for arrow documentation or implementation
# Search for how Int64Array is used elsewhere in the codebase with .into()
rg "arr\.value\(" datafusion --type rust -B 1 -A 1 | head -30Repository: martin-augment/datafusion Length of output: 2183 🏁 Script executed: # Check if there's a pattern matching on value() results elsewhere
rg "if let Some.*\.value\(" datafusion --type rustRepository: martin-augment/datafusion Length of output: 192 🏁 Script executed: # Let's check the Arrow crate version to understand what Int64Array::value() returns
rg "arrow =" Cargo.toml -A 1 | grep version | head -1Repository: martin-augment/datafusion Length of output: 111 🏁 Script executed: # Check if there's any custom Into implementation in the codebase that might apply
rg "impl.*Into.*for.*i64" datafusion --type rustRepository: martin-augment/datafusion Length of output: 368 🏁 Script executed: # Let's look for how value() is typically used elsewhere with into()
rg "\.value\(.*\)\.into\(\)" datafusion --type rust -B 1 -A 1Repository: martin-augment/datafusion Length of output: 1999 🏁 Script executed: # Try to understand what type arr.value(idx) returns by looking at similar patterns
rg "\.value\(" datafusion --type rust | grep "Int64Array" -A 2 -B 2 | head -20Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Check if the project compiles - try to see if there are any build errors related to this
cd datafusion && cargo check --all 2>&1 | grep -A 5 "count_distinct" || echo "No errors found for count_distinct"Repository: martin-augment/datafusion Length of output: 155 🏁 Script executed: # Let's look at how value() results are used in scalar/mod.rs to understand the pattern
cat datafusion/common/src/scalar/mod.rs | grep -B 5 -A 5 "array.value.*into" | head -40Repository: martin-augment/datafusion Length of output: 1240 🏁 Script executed: # Check if there's a wrapper type or trait implementation for Int64Array
rg "struct Int64Array\|type Int64Array" datafusion --type rust -A 3Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Look for any custom trait implementations that might affect Into behavior
rg "pub trait.*Int64" datafusion --type rust -A 5 | head -30Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Search for the actual use of PrimitiveArray::value in arrow-rs documentation patterns
# Let's see what other safe patterns exist for nullable arrays
rg "\.iter\(\)" datafusion/functions-aggregate/benches/count_distinct.rs -B 2 -A 2Repository: martin-augment/datafusion Length of output: 345 🏁 Script executed: # Check if there's a helper function that wraps value() to return Option
rg "fn.*value.*Option" datafusion --type rust -B 2 -A 3 | head -30Repository: martin-augment/datafusion Length of output: 1928 🏁 Script executed: # Let's look for the actual implementation pattern - check if i64 implements Into<Option<i64>>
# Search for generic Into implementations that might apply
rg "impl.*Into.*Option" datafusion --type rust -B 2 -A 2 | head -30Repository: martin-augment/datafusion Length of output: 1991 🏁 Script executed: # Most importantly, let's look at how the code in scalar/mod.rs uses the pattern
# It checks is_null FIRST, then uses .into(). Let's verify this is the correct pattern
cat datafusion/common/src/scalar/mod.rs | sed -n '1,100p' | grep -A 20 "macro_rules"Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Check if the current code in count_distinct.rs even compiles with a simpler test
cd datafusion/functions-aggregate && cargo check --lib 2>&1 | tail -20Repository: martin-augment/datafusion Length of output: 155 🏁 Script executed: # Let's look at the exact context around that line in the file to see if there's something we're missing
sed -n '240,265p' datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion Length of output: 1205 🏁 Script executed: # Let's check what Into implementations exist for i64 in Arrow or standard library
# by looking at how .into() is used on primitive values elsewhere
rg "(\d+i64|i64_var)\.into\(\)" datafusion --type rust -B 1 -A 1 | head -30Repository: martin-augment/datafusion Length of output: 489 🏁 Script executed: # Check the exact signature of Int64Array::value by searching arrow documentation patterns
rg "pub fn value" datafusion --type rust | grep -i array | head -10Repository: martin-augment/datafusion Length of output: 192 🏁 Script executed: # Look at the actual benchmark code to understand what create_i64_array produces
sed -n '100,180p' datafusion/functions-aggregate/benches/count_distinct.rs | grep -A 20 "create_i64_array\|fn.*create_i64"Repository: martin-augment/datafusion Length of output: 837 🏁 Script executed: # The key question: does this code actually compile? Let's try a minimal check
# by looking for any compilation issues in recent commits or CI
git log --oneline datafusion/functions-aggregate/benches/count_distinct.rs | head -5Repository: martin-augment/datafusion Length of output: 97 🏁 Script executed: # Let's check if there's a wrapper type or special implementation around Int64Array
# that might make .into() return an Option
rg "struct.*Int64Array\|type.*Int64Array" datafusion --type rust -A 5 | head -20Repository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Most importantly, let's look at what the .into() call actually resolves to
# by checking if i64 has any Into implementations in arrow
rg "impl Into" arrow --type rust 2>/dev/null || rg "impl.*Into.*i64" datafusion --type rustRepository: martin-augment/datafusion Length of output: 696 🏁 Script executed: # Let's check the Arrow crate source directly - see if there's a custom Into for primitive values
web_search
arrow-rs 58.1.0 Int64Array value method signatureRepository: martin-augment/datafusion Length of output: 166 🌐 Web query:
💡 Result: No, in arrow-rs version 58.1.0, the PrimitiveArray::value method does not return Option. It returns the raw primitive value (T::Native) at the specified index without checking for nulls. The documentation explicitly states: "Note: This method does not check for nulls and the value is arbitrary if is_null returns true for the index." There is also an unsafe version value_unchecked. To get an Option<T::Native> that respects nulls: - Use array.iter, which yields Option<T::Native> via ArrayIter - Or check array.is_null(i) first, then call value(i) if not null - Or use array.value_as_datetime(i) or similar specialized methods that do return Option for certain types Citations:
🌐 Web query:
💡 Result: In the Rust Arrow crate (apache/arrow-rs), Int64Array is a type alias for PrimitiveArray. The value method is defined on PrimitiveArray where T: ArrowPrimitiveType, and for Int64Array it has the signature: rust pub fn value(&self, i: usize) -> i64 This method returns the primitive value at index i as an i64. It panics if i >= self.len. Note that it does not check for nulls; the value is arbitrary if the slot is null (use is_null(i) or is_valid(i) to check validity first). There is also an unchecked version: rust pub unsafe fn value_unchecked(&self, i: usize) -> i64 The method is also part of the ArrayAccessor trait implementation for &PrimitiveArray, with Item = T::Native (i64 for Int64Type). Citations:
🏁 Script executed: # Let's check the actual create_i64_array function to understand what data structure is created
sed -n '1,300p' datafusion/functions-aggregate/benches/count_distinct.rs | grep -B 5 -A 15 "fn create_i64_array"Repository: martin-augment/datafusion Length of output: 655 🏁 Script executed: # Look for any helper wrapper that might wrap Int64Array in a custom type
rg "newtype\|struct.*Array\|type.*Array" datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Verify the pattern one more time - check if i64 has any Into<Option> implementations
# by looking at what Rust primitives support
python3 << 'EOF'
# In standard Rust, i64 does not implement Into<Option<i64>>
# Let's verify by checking if there are any custom trait impls in the codebase
import subprocess
result = subprocess.run(['rg', 'impl.*Into.*Option', 'datafusion', '--type', 'rust'],
capture_output=True, text=True)
print("Custom Into<Option> implementations found:")
print(result.stdout if result.stdout else "None")
# Also check if there's any From<i64> for Option
result2 = subprocess.run(['rg', 'impl.*From.*i64.*for.*Option', 'datafusion', '--type', 'rust'],
capture_output=True, text=True)
print("\nFrom<i64> for Option implementations:")
print(result2.stdout if result2.stdout else "None")
EOFRepository: martin-augment/datafusion Length of output: 5956 🏁 Script executed: # Final check: look at the actual Array trait or implementations that might override .into()
rg "impl.*Into.*for.*Int64Array\|impl.*Into.*for.*PrimitiveArray" datafusion --type rustRepository: martin-augment/datafusion Length of output: 51 🏁 Script executed: # Check if the file even compiles by looking at recent changes or if there's a FIXME comment
sed -n '245,260p' datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion Length of output: 859 Incorrect null-handling pattern — code won't compile.
Proposed fix using proper null checking let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
for (idx, group_idx) in group_indices.iter().enumerate() {
- if let Some(val) = arr.value(idx).into() {
+ if !arr.is_null(idx) {
+ let val = arr.value(idx);
let single_val =
Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef;
accumulators[*group_idx]
.update_batch(std::slice::from_ref(&single_val))
.unwrap();
}
}🤖 Prompt for AI Agents |
||
|
|
||
| let _results: Vec<_> = accumulators | ||
| .iter_mut() | ||
| .map(|acc| acc.evaluate().unwrap()) | ||
| .collect(); | ||
| }) | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| criterion_group!( | ||
| benches, | ||
| count_distinct_benchmark, | ||
| count_distinct_groups_benchmark | ||
| ); | ||
| criterion_main!(benches); | ||


Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datafusion/functions-aggregate/benches/count_distinct.rs:92:
prepare_argsusesBox::leakto force'staticlifetimes, which permanently leaks heap allocations each time it is called (schema/expr/fields). This is invoked once per scenario today, but if it ever moves inside the benchmark loop or is reused more widely it can cause unbounded memory growth during benchmarking runs.Severity: medium
Other Locations
datafusion/functions-aggregate/benches/count_distinct.rs:94datafusion/functions-aggregate/benches/count_distinct.rs:96datafusion/functions-aggregate/benches/count_distinct.rs:98🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.