21575: chore: add count distinct group benchmarks#312
21575: chore: add count distinct group benchmarks#312martin-augment wants to merge 7 commits intomainfrom
Conversation
WalkthroughThis pull request adds governance rules for AI agents and expands benchmarking capabilities. It introduces new guidelines in ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces new guidelines for AI agents in .cursor/rules.md and AGENTS.md, prohibiting links to GitHub issues/PRs and reviews of specific configuration files. Additionally, it adds a comprehensive benchmark for grouped count(distinct) operations in datafusion/functions-aggregate/benches/count_distinct.rs, covering various data distributions and cardinalities. A critical issue was identified in the benchmark's fallback implementation, where per-row memory allocations and a type mismatch in value extraction would lead to poor performance and compilation errors.
| let arr = values.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| for (idx, group_idx) in group_indices.iter().enumerate() { | ||
| if let Some(val) = arr.value(idx).into() { | ||
| let single_val = | ||
| Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef; | ||
| accumulators[*group_idx] | ||
| .update_batch(std::slice::from_ref(&single_val)) | ||
| .unwrap(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The fallback path for count_distinct_groups_benchmark contains a likely logic error and a significant performance bottleneck.
- Logic Error: The expression
arr.value(idx).into()will not return anOption(asInt64Array::valuereturnsi64), which will cause a compilation error if anOptionwas expected in theif let Some(val)pattern. - Performance Bottleneck: Creating a new
ArcandInt64Arrayfor every row inside the timed loop (8192 rows per iteration) is extremely expensive and will cause the benchmark to measure allocation overhead rather than aggregation performance.
Using values.slice(idx, 1) is a much more efficient way to perform row-by-row updates while correctly handling nulls and avoiding unnecessary allocations.
for (idx, group_idx) in group_indices.iter().enumerate() {
let single_val = values.slice(idx, 1);
accumulators[*group_idx]
.update_batch(std::slice::from_ref(&single_val))
.unwrap();
}
Code ReviewThe new 1. Memory leaks in
|
| Severity | Issue |
|---|---|
| Medium | Box::leak leaks memory on every prepare_args call |
| Medium | arr.value(idx).into() null-check always passes |
| High | Fallback path benchmarks allocation overhead, not accumulator logic |
| Minor | String dispatch instead of enum |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit f61b1e7. Configure here.
| .unwrap(); | ||
| acc.evaluate(EmitTo::All).unwrap() | ||
| }) | ||
| }); |
There was a problem hiding this comment.
Groups accumulator benchmark path is unreachable dead code
High Severity
The prepare_args function sets is_distinct: true, but Count::groups_accumulator_supported always returns false when is_distinct is true. This means the if branch that benchmarks the actual groups accumulator (using create_groups_accumulator and EmitTo::All) is never executed. Every scenario falls through to the naive fallback else branch, which creates individual per-group accumulators. The benchmark claims to measure group accumulator performance but never actually exercises it.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit f61b1e7. Configure here.
|
|
||
| let arr = values.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| for (idx, group_idx) in group_indices.iter().enumerate() { | ||
| if let Some(val) = arr.value(idx).into() { |
There was a problem hiding this comment.
Null check using value().into() always matches Some
Low Severity
arr.value(idx) returns a raw i64, and calling .into() uses the blanket From<T> for Option<T> impl, which always produces Some(val). The if let Some(val) pattern therefore always matches and never filters out null entries. To properly handle nulls, arr.is_null(idx) needs to be checked, since value() does not reflect null status.
Reviewed by Cursor Bugbot for commit f61b1e7. Configure here.
🤖 Augment PR SummarySummary: Extends the aggregate benchmarks with grouped Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
|
|
||
| fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) { | ||
| let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); | ||
| let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); |
There was a problem hiding this comment.
datafusion/functions-aggregate/benches/count_distinct.rs:92: prepare_args uses Box::leak to force 'static lifetimes, which permanently leaks heap allocations each time it is called (schema/expr/fields). This is invoked once per scenario today, but if it ever moves inside the benchmark loop or is reused more widely it can cause unbounded memory growth during benchmarking runs.
Severity: medium
Other Locations
datafusion/functions-aggregate/benches/count_distinct.rs:94datafusion/functions-aggregate/benches/count_distinct.rs:96datafusion/functions-aggregate/benches/count_distinct.rs:98
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
|
|
||
| let (_schema, args) = prepare_args(DataType::Int64); | ||
|
|
||
| if count_fn.groups_accumulator_supported(args.clone()) { |
There was a problem hiding this comment.
datafusion/functions-aggregate/benches/count_distinct.rs:226: Count::groups_accumulator_supported currently returns false whenever args.is_distinct is true, so this if will never take the groups-accumulator branch for COUNT(DISTINCT ...). As a result the benchmark always measures the fallback path, which may not match the intent implied by the count_distinct_groups_* naming.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
|
|
||
| let arr = values.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| for (idx, group_idx) in group_indices.iter().enumerate() { | ||
| if let Some(val) = arr.value(idx).into() { |
There was a problem hiding this comment.
datafusion/functions-aggregate/benches/count_distinct.rs:250: arr.value(idx) ignores the null bitmap, and i64.into() always yields Some(_), so this if let doesn't actually filter nulls and would treat null slots as a value if nulls are introduced later. This looks like it’s trying to handle nulls but currently won’t.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
datafusion/functions-aggregate/benches/count_distinct.rs (2)
90-112: Intentional memory leaks for benchmark lifetime requirements.The use of
Box::leakto create'staticreferences is acceptable for benchmark code since the process terminates after running. However, consider adding a brief comment explaining why this pattern is necessary (to satisfyAccumulatorArgs<'static>lifetime constraints) for future maintainers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/functions-aggregate/benches/count_distinct.rs` around lines 90 - 112, Add a short explanatory comment in prepare_args near the Box::leak usages stating these leaks are intentional to satisfy the AccumulatorArgs<'static> lifetime requirement for benchmarks and are acceptable because the process exits after the benchmark (thus no long-term leak); reference the function name prepare_args and the specific leaked values schema_leaked, expr_leaked, return_field_leaked, and expr_field_leaked so future maintainers understand why 'static references are created here.
200-267: Reminder: Run formatting and linting before committing.As per coding guidelines for Rust files, please ensure you run:
cargo fmt --all cargo clippy --all-targets --all-features -- -D warningsThis will help catch any compilation issues (like the potential issue at line 250) and ensure consistent formatting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/functions-aggregate/benches/count_distinct.rs` around lines 200 - 267, Run rust formatting and lints (cargo fmt --all; cargo clippy --all-targets --all-features -- -D warnings) and fix the invalid use of arr.value(idx).into() in count_distinct_groups_benchmark: arr.value(idx) returns an i64 (not an Option), so check validity with arr.is_valid(idx) and then wrap the value as Some(val) when constructing single_val before calling update_batch; keep the rest of the benchmark logic (Count::create_groups_accumulator, prepare_accumulator, update_batch/evaluate) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@datafusion/functions-aggregate/benches/count_distinct.rs`:
- Around line 248-257: The null-handling is wrong because Int64Array::value(idx)
returns i64, not Option; instead check arr.is_null(idx) before calling
arr.value(idx). In the block where you downcast values
(values.as_any().downcast_ref::<Int64Array>()), iterate group_indices as before
but skip when arr.is_null(idx); when not null read let val = arr.value(idx) and
build the single_val Arc<Int64Array> from Some(val) and call
accumulators[*group_idx].update_batch(...).unwrap() as you did previously.
---
Nitpick comments:
In `@datafusion/functions-aggregate/benches/count_distinct.rs`:
- Around line 90-112: Add a short explanatory comment in prepare_args near the
Box::leak usages stating these leaks are intentional to satisfy the
AccumulatorArgs<'static> lifetime requirement for benchmarks and are acceptable
because the process exits after the benchmark (thus no long-term leak);
reference the function name prepare_args and the specific leaked values
schema_leaked, expr_leaked, return_field_leaked, and expr_field_leaked so future
maintainers understand why 'static references are created here.
- Around line 200-267: Run rust formatting and lints (cargo fmt --all; cargo
clippy --all-targets --all-features -- -D warnings) and fix the invalid use of
arr.value(idx).into() in count_distinct_groups_benchmark: arr.value(idx) returns
an i64 (not an Option), so check validity with arr.is_valid(idx) and then wrap
the value as Some(val) when constructing single_val before calling update_batch;
keep the rest of the benchmark logic (Count::create_groups_accumulator,
prepare_accumulator, update_batch/evaluate) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d089e98c-f8d9-45bc-961f-2fa89ccfb731
📒 Files selected for processing (3)
.cursor/rules.mdAGENTS.mddatafusion/functions-aggregate/benches/count_distinct.rs
| let arr = values.as_any().downcast_ref::<Int64Array>().unwrap(); | ||
| for (idx, group_idx) in group_indices.iter().enumerate() { | ||
| if let Some(val) = arr.value(idx).into() { | ||
| let single_val = | ||
| Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef; | ||
| accumulators[*group_idx] | ||
| .update_batch(std::slice::from_ref(&single_val)) | ||
| .unwrap(); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify if this code compiles and check Arrow's Int64Array::value signature
ast-grep --pattern 'fn value($_, $_) -> $RET'
# Also check if there's any Into<Option> impl for i64 in the codebase
rg -n 'impl.*Into.*Option.*for.*i64' --type rustRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# First, let's look at the actual file and the lines in question
head -260 datafusion/functions-aggregate/benches/count_distinct.rs | tail -20Repository: martin-augment/datafusion
Length of output: 1048
🏁 Script executed:
# Search for Int64Array definition and its value method
fd -t f -e rs | xargs grep -l "impl.*Int64Array" | head -5Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for where Int64Array comes from (likely arrow crate)
rg "type Int64Array" --type rust -A 2Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check the imports in the count_distinct.rs file
head -30 datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion
Length of output: 1360
🏁 Script executed:
# Find and check the Arrow version and Int64Array definition
cd datafusion/functions-aggregate && cat Cargo.toml | grep -A 5 arrowRepository: martin-augment/datafusion
Length of output: 466
🏁 Script executed:
# Search for the actual value() method signature in the codebase or check error when trying to compile
# Let's see if we can find any reference to Int64Array::value in the codebase
rg "\.value\(" datafusion/functions-aggregate/benches/count_distinct.rs -B 2 -A 2Repository: martin-augment/datafusion
Length of output: 433
🏁 Script executed:
# Let's check what the actual method returns - look for arrow documentation or implementation
# Search for how Int64Array is used elsewhere in the codebase with .into()
rg "arr\.value\(" datafusion --type rust -B 1 -A 1 | head -30Repository: martin-augment/datafusion
Length of output: 2183
🏁 Script executed:
# Check if there's a pattern matching on value() results elsewhere
rg "if let Some.*\.value\(" datafusion --type rustRepository: martin-augment/datafusion
Length of output: 192
🏁 Script executed:
# Let's check the Arrow crate version to understand what Int64Array::value() returns
rg "arrow =" Cargo.toml -A 1 | grep version | head -1Repository: martin-augment/datafusion
Length of output: 111
🏁 Script executed:
# Check if there's any custom Into implementation in the codebase that might apply
rg "impl.*Into.*for.*i64" datafusion --type rustRepository: martin-augment/datafusion
Length of output: 368
🏁 Script executed:
# Let's look for how value() is typically used elsewhere with into()
rg "\.value\(.*\)\.into\(\)" datafusion --type rust -B 1 -A 1Repository: martin-augment/datafusion
Length of output: 1999
🏁 Script executed:
# Try to understand what type arr.value(idx) returns by looking at similar patterns
rg "\.value\(" datafusion --type rust | grep "Int64Array" -A 2 -B 2 | head -20Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if the project compiles - try to see if there are any build errors related to this
cd datafusion && cargo check --all 2>&1 | grep -A 5 "count_distinct" || echo "No errors found for count_distinct"Repository: martin-augment/datafusion
Length of output: 155
🏁 Script executed:
# Let's look at how value() results are used in scalar/mod.rs to understand the pattern
cat datafusion/common/src/scalar/mod.rs | grep -B 5 -A 5 "array.value.*into" | head -40Repository: martin-augment/datafusion
Length of output: 1240
🏁 Script executed:
# Check if there's a wrapper type or trait implementation for Int64Array
rg "struct Int64Array\|type Int64Array" datafusion --type rust -A 3Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for any custom trait implementations that might affect Into behavior
rg "pub trait.*Int64" datafusion --type rust -A 5 | head -30Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Search for the actual use of PrimitiveArray::value in arrow-rs documentation patterns
# Let's see what other safe patterns exist for nullable arrays
rg "\.iter\(\)" datafusion/functions-aggregate/benches/count_distinct.rs -B 2 -A 2Repository: martin-augment/datafusion
Length of output: 345
🏁 Script executed:
# Check if there's a helper function that wraps value() to return Option
rg "fn.*value.*Option" datafusion --type rust -B 2 -A 3 | head -30Repository: martin-augment/datafusion
Length of output: 1928
🏁 Script executed:
# Let's look for the actual implementation pattern - check if i64 implements Into<Option<i64>>
# Search for generic Into implementations that might apply
rg "impl.*Into.*Option" datafusion --type rust -B 2 -A 2 | head -30Repository: martin-augment/datafusion
Length of output: 1991
🏁 Script executed:
# Most importantly, let's look at how the code in scalar/mod.rs uses the pattern
# It checks is_null FIRST, then uses .into(). Let's verify this is the correct pattern
cat datafusion/common/src/scalar/mod.rs | sed -n '1,100p' | grep -A 20 "macro_rules"Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if the current code in count_distinct.rs even compiles with a simpler test
cd datafusion/functions-aggregate && cargo check --lib 2>&1 | tail -20Repository: martin-augment/datafusion
Length of output: 155
🏁 Script executed:
# Let's look at the exact context around that line in the file to see if there's something we're missing
sed -n '240,265p' datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion
Length of output: 1205
🏁 Script executed:
# Let's check what Into implementations exist for i64 in Arrow or standard library
# by looking at how .into() is used on primitive values elsewhere
rg "(\d+i64|i64_var)\.into\(\)" datafusion --type rust -B 1 -A 1 | head -30Repository: martin-augment/datafusion
Length of output: 489
🏁 Script executed:
# Check the exact signature of Int64Array::value by searching arrow documentation patterns
rg "pub fn value" datafusion --type rust | grep -i array | head -10Repository: martin-augment/datafusion
Length of output: 192
🏁 Script executed:
# Look at the actual benchmark code to understand what create_i64_array produces
sed -n '100,180p' datafusion/functions-aggregate/benches/count_distinct.rs | grep -A 20 "create_i64_array\|fn.*create_i64"Repository: martin-augment/datafusion
Length of output: 837
🏁 Script executed:
# The key question: does this code actually compile? Let's try a minimal check
# by looking for any compilation issues in recent commits or CI
git log --oneline datafusion/functions-aggregate/benches/count_distinct.rs | head -5Repository: martin-augment/datafusion
Length of output: 97
🏁 Script executed:
# Let's check if there's a wrapper type or special implementation around Int64Array
# that might make .into() return an Option
rg "struct.*Int64Array\|type.*Int64Array" datafusion --type rust -A 5 | head -20Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Most importantly, let's look at what the .into() call actually resolves to
# by checking if i64 has any Into implementations in arrow
rg "impl Into" arrow --type rust 2>/dev/null || rg "impl.*Into.*i64" datafusion --type rustRepository: martin-augment/datafusion
Length of output: 696
🏁 Script executed:
# Let's check the Arrow crate source directly - see if there's a custom Into for primitive values
web_search
arrow-rs 58.1.0 Int64Array value method signatureRepository: martin-augment/datafusion
Length of output: 166
🌐 Web query:
arrow-rs 58.1.0 PrimitiveArray value method returns Option
💡 Result:
No, in arrow-rs version 58.1.0, the PrimitiveArray::value method does not return Option. It returns the raw primitive value (T::Native) at the specified index without checking for nulls. The documentation explicitly states: "Note: This method does not check for nulls and the value is arbitrary if is_null returns true for the index." There is also an unsafe version value_unchecked. To get an Option<T::Native> that respects nulls: - Use array.iter, which yields Option<T::Native> via ArrayIter - Or check array.is_null(i) first, then call value(i) if not null - Or use array.value_as_datetime(i) or similar specialized methods that do return Option for certain types
Citations:
- 1: https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveArray.html
- 2: https://arrow.apache.org/rust/arrow/array/struct.PrimitiveArray.html
- 3: https://github.com/apache/arrow-rs/blob/main/arrow-array/src/array/primitive_array.rs
- 4: https://docs.rs/arrow-array/58.1.0/x86_64-unknown-linux-gnu/src/arrow_array/array/primitive_array.rs.html
- 5: https://docs.rs/arrow/latest/arrow/array/iterator/struct.ArrayIter.html
🌐 Web query:
arrow-rs Int64Array::value() signature rust arrow crate
💡 Result:
In the Rust Arrow crate (apache/arrow-rs), Int64Array is a type alias for PrimitiveArray. The value method is defined on PrimitiveArray where T: ArrowPrimitiveType, and for Int64Array it has the signature: rust pub fn value(&self, i: usize) -> i64 This method returns the primitive value at index i as an i64. It panics if i >= self.len. Note that it does not check for nulls; the value is arbitrary if the slot is null (use is_null(i) or is_valid(i) to check validity first). There is also an unchecked version: rust pub unsafe fn value_unchecked(&self, i: usize) -> i64 The method is also part of the ArrayAccessor trait implementation for &PrimitiveArray, with Item = T::Native (i64 for Int64Type).
Citations:
- 1: https://docs.rs/arrow/latest/arrow/array/type.Int64Array.html
- 2: https://arrow.apache.org/rust/arrow_array/array/primitive_array/type.Int64Array.html
- 3: https://arrow.apache.org/rust/arrow/array/type.Int64Array.html
- 4: https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveArray.html
- 5: https://arrow.apache.org/rust/arrow_array/array/type.Int64Array.html
- 6: https://github.com/apache/arrow-rs/blob/main/arrow-array/src/array/primitive_array.rs
🏁 Script executed:
# Let's check the actual create_i64_array function to understand what data structure is created
sed -n '1,300p' datafusion/functions-aggregate/benches/count_distinct.rs | grep -B 5 -A 15 "fn create_i64_array"Repository: martin-augment/datafusion
Length of output: 655
🏁 Script executed:
# Look for any helper wrapper that might wrap Int64Array in a custom type
rg "newtype\|struct.*Array\|type.*Array" datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Verify the pattern one more time - check if i64 has any Into<Option> implementations
# by looking at what Rust primitives support
python3 << 'EOF'
# In standard Rust, i64 does not implement Into<Option<i64>>
# Let's verify by checking if there are any custom trait impls in the codebase
import subprocess
result = subprocess.run(['rg', 'impl.*Into.*Option', 'datafusion', '--type', 'rust'],
capture_output=True, text=True)
print("Custom Into<Option> implementations found:")
print(result.stdout if result.stdout else "None")
# Also check if there's any From<i64> for Option
result2 = subprocess.run(['rg', 'impl.*From.*i64.*for.*Option', 'datafusion', '--type', 'rust'],
capture_output=True, text=True)
print("\nFrom<i64> for Option implementations:")
print(result2.stdout if result2.stdout else "None")
EOFRepository: martin-augment/datafusion
Length of output: 5956
🏁 Script executed:
# Final check: look at the actual Array trait or implementations that might override .into()
rg "impl.*Into.*for.*Int64Array\|impl.*Into.*for.*PrimitiveArray" datafusion --type rustRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if the file even compiles by looking at recent changes or if there's a FIXME comment
sed -n '245,260p' datafusion/functions-aggregate/benches/count_distinct.rsRepository: martin-augment/datafusion
Length of output: 859
Incorrect null-handling pattern — code won't compile.
Int64Array::value(idx) returns i64 directly, not an Option. Calling .into() on it and pattern-matching with if let Some(val) is incorrect — i64 does not implement Into<Option<i64>>.
Proposed fix using proper null checking
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
for (idx, group_idx) in group_indices.iter().enumerate() {
- if let Some(val) = arr.value(idx).into() {
+ if !arr.is_null(idx) {
+ let val = arr.value(idx);
let single_val =
Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef;
accumulators[*group_idx]
.update_batch(std::slice::from_ref(&single_val))
.unwrap();
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@datafusion/functions-aggregate/benches/count_distinct.rs` around lines 248 -
257, The null-handling is wrong because Int64Array::value(idx) returns i64, not
Option; instead check arr.is_null(idx) before calling arr.value(idx). In the
block where you downcast values (values.as_any().downcast_ref::<Int64Array>()),
iterate group_indices as before but skip when arr.is_null(idx); when not null
read let val = arr.value(idx) and build the single_val Arc<Int64Array> from
Some(val) and call accumulators[*group_idx].update_batch(...).unwrap() as you
did previously.


21575: To review by AI