-
Notifications
You must be signed in to change notification settings - Fork 0
21456: perf : Optimize count distinct using bitmaps instead of hashsets for smaller datatypes #307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8d49dfe
0b179ff
c6095ab
9d06408
f185fdc
f7c487a
3f091d9
e09f68a
f8c01a1
df90ef5
bf0f95c
48a6029
61dc8e1
5a2918a
3f4952e
289b354
554f60c
87a9af8
b564773
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| The AI agents should never link to any issue or a pull request | ||
| in any GitHub repository in the code reviews! | ||
|
|
||
| The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,7 @@ where | |
| ]) | ||
| } | ||
|
|
||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
|
|
@@ -149,6 +150,7 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato | |
| self.values.state() | ||
| } | ||
|
|
||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| self.values.update_batch(values) | ||
| } | ||
|
|
@@ -165,3 +167,354 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato | |
| size_of_val(self) + self.values.size() | ||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for u8 using a bool array. | ||
| /// Uses 256 bytes to track all possible u8 values. | ||
| #[derive(Debug)] | ||
| pub struct BoolArray256DistinctCountAccumulator { | ||
| seen: [bool; 256], | ||
| } | ||
|
|
||
| impl BoolArray256DistinctCountAccumulator { | ||
| pub fn new() -> Self { | ||
| Self { seen: [false; 256] } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.seen.iter().filter(|&&b| b).count() as i64 | ||
| } | ||
| } | ||
|
|
||
| impl Default for BoolArray256DistinctCountAccumulator { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for BoolArray256DistinctCountAccumulator { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.seen[value as usize] = true; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.seen[*value as usize] = true; | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let values: Vec<u8> = self | ||
| .seen | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None }) | ||
| .collect(); | ||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 256 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Severity: medium Other Locations
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Augment AI reviewer is correct! The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The |
||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bool array accumulators double-count memory in
|
||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for i8 using a bool array. | ||
| /// Uses 256 bytes to track all possible i8 values (mapped to 0..255). | ||
| #[derive(Debug)] | ||
| pub struct BoolArray256DistinctCountAccumulatorI8 { | ||
| seen: [bool; 256], | ||
| } | ||
|
|
||
| impl BoolArray256DistinctCountAccumulatorI8 { | ||
| pub fn new() -> Self { | ||
| Self { seen: [false; 256] } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.seen.iter().filter(|&&b| b).count() as i64 | ||
| } | ||
| } | ||
|
|
||
| impl Default for BoolArray256DistinctCountAccumulatorI8 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for BoolArray256DistinctCountAccumulatorI8 { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.seen[value as u8 as usize] = true; | ||
| } | ||
|
Comment on lines
+283
to
+285
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the if arr.null_count() == 0 {
for &value in arr.values() {
self.seen[value as u8 as usize] = true;
}
} else {
for value in arr.iter().flatten() {
self.seen[value as u8 as usize] = true;
}
}
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The flatten() operation checks each item in the array whether it is Some or None. This could be avoided for no-nulls arrays by using the constant check |
||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.seen[*value as u8 as usize] = true; | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let values: Vec<i8> = self | ||
| .seen | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map( | ||
| |(idx, &seen)| { | ||
| if seen { Some(idx as u8 as i8) } else { None } | ||
| }, | ||
| ) | ||
| .collect(); | ||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 256 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The |
||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap. | ||
| /// Uses 8KB (1024 x u64) to track all possible u16 values. | ||
| #[derive(Debug)] | ||
| pub struct Bitmap65536DistinctCountAccumulator { | ||
| bitmap: Box<[u64; 1024]>, | ||
| } | ||
|
|
||
| impl Bitmap65536DistinctCountAccumulator { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| bitmap: Box::new([0; 1024]), | ||
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn set_bit(&mut self, value: u16) { | ||
| let word = (value / 64) as usize; | ||
| let bit = value % 64; | ||
| self.bitmap[word] |= 1u64 << bit; | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.bitmap.iter().map(|w| w.count_ones() as i64).sum() | ||
| } | ||
| } | ||
|
|
||
| impl Default for Bitmap65536DistinctCountAccumulator { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for Bitmap65536DistinctCountAccumulator { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.set_bit(value); | ||
| } | ||
|
Comment on lines
+376
to
+378
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The flatten() operation checks each item in the array whether it is Some or None. This could be avoided for no-nulls arrays by using the constant check |
||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.set_bit(*value); | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let mut values = Vec::new(); | ||
| for (word_idx, &word) in self.bitmap.iter().enumerate() { | ||
| if word != 0 { | ||
| for bit in 0..64 { | ||
| if (word & (1u64 << bit)) != 0 { | ||
| values.push((word_idx as u16) * 64 + bit); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+401
to
+409
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iterating through all 64 bits for every non-zero word is inefficient, especially for sparse bitmaps. Using for (word_idx, &word) in self.bitmap.iter().enumerate() {
let mut w = word;
while w != 0 {
let bit = w.trailing_zeros();
values.push((word_idx as u16) * 64 + bit as u16);
w &= !(1u64 << bit);
}
}
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! By using u64::trailing_zeros() the algorithm could be optimized to process only the 1s in the bitmap from back to front. This way it will iterate only the 1s which could lead to a big gain for sparse bitmaps. |
||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 8192 | ||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap. | ||
| /// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535). | ||
| #[derive(Debug)] | ||
| pub struct Bitmap65536DistinctCountAccumulatorI16 { | ||
| bitmap: Box<[u64; 1024]>, | ||
| } | ||
|
|
||
| impl Bitmap65536DistinctCountAccumulatorI16 { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| bitmap: Box::new([0; 1024]), | ||
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn set_bit(&mut self, value: i16) { | ||
| let idx = value as u16; | ||
| let word = (idx / 64) as usize; | ||
| let bit = idx % 64; | ||
| self.bitmap[word] |= 1u64 << bit; | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.bitmap.iter().map(|w| w.count_ones() as i64).sum() | ||
| } | ||
| } | ||
|
|
||
| impl Default for Bitmap65536DistinctCountAccumulatorI16 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.set_bit(value); | ||
| } | ||
|
Comment on lines
+470
to
+472
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The flatten() operation checks each item in the array whether it is Some or None. This could be avoided for no-nulls arrays by using the constant check |
||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.set_bit(*value); | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let mut values = Vec::new(); | ||
| for (word_idx, &word) in self.bitmap.iter().enumerate() { | ||
| if word != 0 { | ||
| for bit in 0..64 { | ||
| if (word & (1u64 << bit)) != 0 { | ||
| values.push(((word_idx as u16) * 64 + bit) as i16); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+495
to
+503
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iterating through all 64 bits for every non-zero word is inefficient. Using for (word_idx, &word) in self.bitmap.iter().enumerate() {
let mut w = word;
while w != 0 {
let bit = w.trailing_zeros();
values.push(((word_idx as u16) * 64 + bit as u16) as i16);
w &= !(1u64 << bit);
}
}
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! By using u64::trailing_zeros() the algorithm could be optimized to process only the 1s in the bitmap from back to front. This way it will iterate only the 1s which could lead to a big gain for sparse bitmaps. |
||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 8192 | ||
| } | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterating over the array using
iter().flatten()is inefficient for primitive types because it performs an option check for every element. Since this is a performance-oriented optimization, it's better to checknull_count()and usevalues()directly when no nulls are present.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The flatten() operation checks each item in the array whether it is Some or None. This could be avoided for no-nulls arrays by using the constant check
arr.null_count() == 0. This single check could avoid N checks for None, where N is the size of the array.