Skip to content

Commit d17c213

Browse files
committed
arrow-select: improve dictionary interleave fallback performance
The naive interleave_fallback would use MutableArray and extend it with the full values slice each time the target array changed in the indices slice. This commit introduces a new approach where dictionary values are concatenated once and then new offsets are computed over these taking the indices into account. This results in 50-75% performance improvement in microbenchmarks and will also improve memory usage during interleaves (used heavily in sorts). Note that this path is only taken when should_merge_dictionary_values returns false. ``` $ cargo bench --bench interleave_kernels -- 'dict' --baseline main interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000] time: [627.14 ns 634.76 ns 644.13 ns] change: [−65.614% −65.345% −65.002%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 6 (6.00%) high mild 1 (1.00%) high severe interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000] time: [934.35 ns 937.51 ns 940.60 ns] change: [−71.488% −71.340% −71.208%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000] time: [1.6485 µs 1.6528 µs 1.6566 µs] change: [−74.307% −74.190% −74.088%] (p = 0.00 < 0.05) Performance has improved. interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000] time: [1.6723 µs 1.6782 µs 1.6842 µs] change: [−74.664% −74.544% −74.438%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high severe interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000] time: [1.5985 µs 1.6064 µs 1.6148 µs] change: [−12.510% −12.116% −11.715%] (p = 0.00 < 0.05) Performance has improved. Found 19 outliers among 100 measurements (19.00%) 10 (10.00%) low mild 6 (6.00%) high mild 3 (3.00%) high severe interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000] time: [1.9310 µs 1.9466 µs 1.9680 µs] change: [−41.488% −41.091% −40.628%] (p = 0.00 < 0.05) Performance has improved. Found 15 outliers among 100 measurements (15.00%) 3 (3.00%) low mild 6 (6.00%) high mild 6 (6.00%) high severe interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000] time: [2.7812 µs 2.8516 µs 2.9401 µs] change: [−56.097% −55.276% −54.274%] (p = 0.00 < 0.05) Performance has improved. Found 15 outliers among 100 measurements (15.00%) 8 (8.00%) high mild 7 (7.00%) high severe interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000] time: [3.4926 µs 3.6558 µs 3.8427 µs] change: [−48.423% −46.405% −44.379%] (p = 0.00 < 0.05) Performance has improved. interleave dict_distinct 100 time: [2.0013 µs 2.0106 µs 2.0205 µs] change: [−1.6162% −1.0465% −0.4647%] (p = 0.00 < 0.05) Change within noise threshold. Found 4 outliers among 100 measurements (4.00%) 4 (4.00%) high mild interleave dict_distinct 1024 time: [1.9784 µs 1.9855 µs 1.9924 µs] change: [−2.4655% −1.8461% −1.2265%] (p = 0.00 < 0.05) Performance has improved. interleave dict_distinct 2048 time: [1.9832 µs 1.9959 µs 2.0087 µs] change: [−2.9917% −2.3003% −1.6062%] (p = 0.00 < 0.05) Performance has improved. ``` Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent dff6402 commit d17c213

File tree

1 file changed

+91
-1
lines changed

1 file changed

+91
-1
lines changed

arrow-select/src/interleave.rs

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Interleave elements from multiple arrays
1919
20+
use crate::concat::concat;
2021
use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
2122
use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
2223
use arrow_array::cast::AsArray;
@@ -196,7 +197,7 @@ fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
196197
) -> Result<ArrayRef, ArrowError> {
197198
let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
198199
if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
199-
return interleave_fallback(arrays, indices);
200+
return interleave_fallback_dictionary::<K>(&dictionaries, indices);
200201
}
201202

202203
let masks: Vec<_> = dictionaries
@@ -346,6 +347,57 @@ fn interleave_fallback(
346347
Ok(make_array(array_data.freeze()))
347348
}
348349

350+
/// interleave_fallback_dictionary is a fallback implementation for interleaving
351+
/// dictionaries when it was determined that the dictionary values should not
352+
/// be merged. This implementation concatenates the value slices and recomputes
353+
/// the resulting dictionary keys.
354+
fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
355+
dictionaries: &[&DictionaryArray<K>],
356+
indices: &[(usize, usize)],
357+
) -> Result<ArrayRef, ArrowError> {
358+
// Compute offsets for each dictionary's keys in the merged values array
359+
let mut offsets = Vec::with_capacity(dictionaries.len());
360+
let mut current_offset = 0usize;
361+
for dict in dictionaries.iter() {
362+
offsets.push(current_offset);
363+
current_offset += dict.values().len();
364+
}
365+
366+
let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
367+
let concatenated_values = concat(&all_values)?;
368+
369+
let mut has_nulls = false;
370+
let new_keys: Vec<K::Native> = indices
371+
.iter()
372+
.map(|(a, b)| {
373+
let dict = dictionaries[*a];
374+
let old_keys = dict.keys();
375+
if old_keys.is_valid(*b) {
376+
let old_key = old_keys.values()[*b].as_usize();
377+
K::Native::from_usize(offsets[*a] + old_key).unwrap()
378+
} else {
379+
has_nulls = true;
380+
K::Native::ZERO
381+
}
382+
})
383+
.collect();
384+
385+
let nulls = if has_nulls {
386+
let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
387+
let (a, b) = indices[i];
388+
dictionaries[a].keys().is_valid(b)
389+
});
390+
Some(NullBuffer::new(null_buffer))
391+
} else {
392+
None
393+
};
394+
395+
let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
396+
// SAFETY: keys_array is constructed from a valid set of keys.
397+
let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
398+
Ok(Arc::new(array))
399+
}
400+
349401
/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
350402
///
351403
/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
@@ -1182,4 +1234,42 @@ mod tests {
11821234
assert_eq!(v.len(), 1);
11831235
assert_eq!(v.data_type(), &DataType::Struct(fields));
11841236
}
1237+
1238+
#[test]
1239+
fn test_interleave_fallback_dictionary_with_nulls() {
1240+
let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1241+
let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1242+
let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1243+
1244+
let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1245+
let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1246+
let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1247+
1248+
let indices = vec![
1249+
(0, 0), // "foo"
1250+
(0, 1), // null
1251+
(1, 0), // "baz"
1252+
(1, 2), // null
1253+
(0, 2), // "bar"
1254+
(1, 1), // "qux"
1255+
];
1256+
1257+
let result =
1258+
interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1259+
let dict_result = result.as_dictionary::<Int32Type>();
1260+
1261+
let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1262+
let collected: Vec<_> = string_result.into_iter().collect();
1263+
assert_eq!(
1264+
collected,
1265+
vec![
1266+
Some("foo"),
1267+
None,
1268+
Some("baz"),
1269+
None,
1270+
Some("bar"),
1271+
Some("qux")
1272+
]
1273+
);
1274+
}
11851275
}

0 commit comments

Comments
 (0)