From ca2ece4fdbf1bfc33a0dcf41fcceef3474db6e45 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Wed, 10 Dec 2025 13:15:46 +0100 Subject: [PATCH] arrow-select: improve dictionary interleave fallback performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The naive interleave_fallback would use MutableArray and extend it with the full values slice each time the target array changed in the indices slice. This commit introduces a new approach where dictionary values are concatenated once and then new offsets are computed over these taking the indices into account. This results in 50-75% performance improvement in microbenchmarks and will also improve memory usage during interleaves (used heavily in sorts). Note that this path is only taken when should_merge_dictionary_values returns false. ``` $ cargo bench --bench interleave_kernels -- 'dict' --baseline main interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000] time: [627.14 ns 634.76 ns 644.13 ns] change: [−65.614% −65.345% −65.002%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 6 (6.00%) high mild 1 (1.00%) high severe interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000] time: [934.35 ns 937.51 ns 940.60 ns] change: [−71.488% −71.340% −71.208%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000] time: [1.6485 µs 1.6528 µs 1.6566 µs] change: [−74.307% −74.190% −74.088%] (p = 0.00 < 0.05) Performance has improved. interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000] time: [1.6723 µs 1.6782 µs 1.6842 µs] change: [−74.664% −74.544% −74.438%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high severe interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000] time: [1.5985 µs 1.6064 µs 1.6148 µs] change: [−12.510% −12.116% −11.715%] (p = 0.00 < 0.05) Performance has improved. Found 19 outliers among 100 measurements (19.00%) 10 (10.00%) low mild 6 (6.00%) high mild 3 (3.00%) high severe interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000] time: [1.9310 µs 1.9466 µs 1.9680 µs] change: [−41.488% −41.091% −40.628%] (p = 0.00 < 0.05) Performance has improved. Found 15 outliers among 100 measurements (15.00%) 3 (3.00%) low mild 6 (6.00%) high mild 6 (6.00%) high severe interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000] time: [2.7812 µs 2.8516 µs 2.9401 µs] change: [−56.097% −55.276% −54.274%] (p = 0.00 < 0.05) Performance has improved. Found 15 outliers among 100 measurements (15.00%) 8 (8.00%) high mild 7 (7.00%) high severe interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000] time: [3.4926 µs 3.6558 µs 3.8427 µs] change: [−48.423% −46.405% −44.379%] (p = 0.00 < 0.05) Performance has improved. interleave dict_distinct 100 time: [2.0013 µs 2.0106 µs 2.0205 µs] change: [−1.6162% −1.0465% −0.4647%] (p = 0.00 < 0.05) Change within noise threshold. Found 4 outliers among 100 measurements (4.00%) 4 (4.00%) high mild interleave dict_distinct 1024 time: [1.9784 µs 1.9855 µs 1.9924 µs] change: [−2.4655% −1.8461% −1.2265%] (p = 0.00 < 0.05) Performance has improved. interleave dict_distinct 2048 time: [1.9832 µs 1.9959 µs 2.0087 µs] change: [−2.9917% −2.3003% −1.6062%] (p = 0.00 < 0.05) Performance has improved. ``` Signed-off-by: Alfonso Subiotto Marques --- arrow-select/src/concat.rs | 2 +- arrow-select/src/dictionary.rs | 17 +++- arrow-select/src/interleave.rs | 155 ++++++++++++++++++++++++++++++++- 3 files changed, 168 insertions(+), 6 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 29c52b192bc1..81b24827e34b 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -107,7 +107,7 @@ fn concat_dictionaries( .inspect(|d| output_len += d.len()) .collect(); - if !should_merge_dictionary_values::(&dictionaries, output_len) { + if !should_merge_dictionary_values::(&dictionaries, output_len).0 { return concat_fallback(arrays, Capacities::Array(output_len)); } diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs index 266842bcc79e..5b32f4e761f8 100644 --- a/arrow-select/src/dictionary.rs +++ b/arrow-select/src/dictionary.rs @@ -174,10 +174,14 @@ type PtrEq = fn(&dyn Array, &dyn Array) -> bool; /// some return over the naive approach used by MutableArrayData /// /// `len` is the total length of the merged output +/// +/// Returns `(should_merge, has_overflow)` where: +/// - `should_merge`: whether dictionary values should be merged +/// - `has_overflow`: whether the combined dictionary values would overflow the key type pub(crate) fn should_merge_dictionary_values( dictionaries: &[&DictionaryArray], len: usize, -) -> bool { +) -> (bool, bool) { use DataType::*; let first_values = dictionaries[0].values().as_ref(); let ptr_eq: PtrEq = match first_values.data_type() { @@ -187,7 +191,11 @@ pub(crate) fn should_merge_dictionary_values( LargeBinary => bytes_ptr_eq::, dt => { if !dt.is_primitive() { - return false; + return ( + false, + K::Native::from_usize(dictionaries.iter().map(|d| d.values().len()).sum()) + .is_none(), + ); } |a, b| a.to_data().ptr_eq(&b.to_data()) } @@ -206,7 +214,10 @@ pub(crate) fn should_merge_dictionary_values( let overflow = K::Native::from_usize(total_values).is_none(); let values_exceed_length = total_values >= len; - !single_dictionary && (overflow || values_exceed_length) + ( + !single_dictionary && (overflow || values_exceed_length), + overflow, + ) } /// Given an array of dictionaries and an optional key mask compute a values array diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index 1453995a0a80..cb3ca655dc67 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -17,6 +17,7 @@ //! Interleave elements from multiple arrays +use crate::concat::concat; use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values}; use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder}; use arrow_array::cast::AsArray; @@ -195,8 +196,14 @@ fn interleave_dictionaries( indices: &[(usize, usize)], ) -> Result { let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::()).collect(); - if !should_merge_dictionary_values::(&dictionaries, indices.len()) { - return interleave_fallback(arrays, indices); + let (should_merge, has_overflow) = + should_merge_dictionary_values::(&dictionaries, indices.len()); + if !should_merge { + return if has_overflow { + interleave_fallback(arrays, indices) + } else { + interleave_fallback_dictionary::(&dictionaries, indices) + }; } let masks: Vec<_> = dictionaries @@ -346,6 +353,76 @@ fn interleave_fallback( Ok(make_array(array_data.freeze())) } +/// Fallback implementation for interleaving dictionaries when it was determined +/// that the dictionary values should not be merged. This implementation concatenates +/// the value slices and recomputes the resulting dictionary keys. +/// +/// # Panics +/// +/// This function assumes that the combined dictionary values will not overflow the +/// key type. Callers must verify this condition [`should_merge_dictionary_values`] +/// before calling this function. +fn interleave_fallback_dictionary( + dictionaries: &[&DictionaryArray], + indices: &[(usize, usize)], +) -> Result { + let relative_offsets: Vec = dictionaries + .iter() + .scan(0usize, |offset, dict| { + let current = *offset; + *offset += dict.values().len(); + Some(current) + }) + .collect(); + let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect(); + let concatenated_values = concat(&all_values)?; + + let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some()); + let (new_keys, nulls) = if any_nulls { + let mut has_nulls = false; + let new_keys: Vec = indices + .iter() + .map(|(array, row)| { + let old_keys = dictionaries[*array].keys(); + if old_keys.is_valid(*row) { + let old_key = old_keys.values()[*row].as_usize(); + K::Native::from_usize(relative_offsets[*array] + old_key) + .expect("key overflow should be checked by caller") + } else { + has_nulls = true; + K::Native::ZERO + } + }) + .collect(); + + let nulls = if has_nulls { + let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| { + let (array, row) = indices[i]; + dictionaries[array].keys().is_valid(row) + }); + Some(NullBuffer::new(null_buffer)) + } else { + None + }; + (new_keys, nulls) + } else { + let new_keys: Vec = indices + .iter() + .map(|(array, row)| { + let old_key = dictionaries[*array].keys().values()[*row].as_usize(); + K::Native::from_usize(relative_offsets[*array] + old_key) + .expect("key overflow should be checked by caller") + }) + .collect(); + (new_keys, None) + }; + + let keys_array = PrimitiveArray::::new(new_keys.into(), nulls); + // SAFETY: keys_array is constructed from a valid set of keys. + let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) }; + Ok(Arc::new(array)) +} + /// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`]. /// /// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`]. @@ -412,6 +489,7 @@ mod tests { use super::*; use arrow_array::Int32RunArray; use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder}; + use arrow_array::types::Int8Type; use arrow_schema::Field; #[test] @@ -509,6 +587,41 @@ mod tests { assert_eq!(actual, expected); } + #[test] + fn test_interleave_dictionary_overflow_same_values() { + let values: ArrayRef = Arc::new(StringArray::from_iter_values( + (0..50).map(|i| format!("v{i}")), + )); + + // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 100] + // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows Int8 + // (max 127). + // This test case falls back to interleave_fallback because the + // dictionaries share the same underlying values slice. + let dict1 = DictionaryArray::::new( + Int8Array::from_iter_values([0, 1, 2]), + values.clone(), + ); + let dict2 = DictionaryArray::::new( + Int8Array::from_iter_values([0, 1, 2]), + values.clone(), + ); + let dict3 = + DictionaryArray::::new(Int8Array::from_iter_values([49]), values.clone()); + + let indices = &[(0, 0), (1, 0), (2, 0)]; + let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap(); + + let dict_result = result.as_dictionary::(); + let string_result: Vec<_> = dict_result + .downcast_dict::() + .unwrap() + .into_iter() + .map(|x| x.unwrap()) + .collect(); + assert_eq!(string_result, vec!["v0", "v0", "v49"]); + } + #[test] fn test_lists() { // [[1, 2], null, [3]] @@ -1182,4 +1295,42 @@ mod tests { assert_eq!(v.len(), 1); assert_eq!(v.data_type(), &DataType::Struct(fields)); } + + #[test] + fn test_interleave_fallback_dictionary_with_nulls() { + let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]); + let input_1_values = StringArray::from_iter_values(["foo", "bar"]); + let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values)); + + let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]); + let input_2_values = StringArray::from_iter_values(["baz", "qux"]); + let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values)); + + let indices = vec![ + (0, 0), // "foo" + (0, 1), // null + (1, 0), // "baz" + (1, 2), // null + (0, 2), // "bar" + (1, 1), // "qux" + ]; + + let result = + interleave_fallback_dictionary::(&[&dict_a, &dict_b], &indices).unwrap(); + let dict_result = result.as_dictionary::(); + + let string_result = dict_result.downcast_dict::().unwrap(); + let collected: Vec<_> = string_result.into_iter().collect(); + assert_eq!( + collected, + vec![ + Some("foo"), + None, + Some("baz"), + None, + Some("bar"), + Some("qux") + ] + ); + } }