Skip to content

Commit 89b1497

Browse files
authored
Improve take performance on List arrays (#9643)
# Which issue does this PR close? - Closes #NNN. # Rationale for this change This PR builds on top of #9626, improving the results on those benchmarks. # What changes are included in this PR? 1. Similar to #9625, branch the function into the null and non-null paths 2. Copy the list elements in a single pass while building the offsets, allocating less intermediate state. # Are these changes tested? Added a few tests for sliced list arrays. # Are there any user-facing changes? No --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 84b3454 commit 89b1497

File tree

1 file changed

+131
-109
lines changed

1 file changed

+131
-109
lines changed

arrow-select/src/take.rs

Lines changed: 131 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ use arrow_buffer::{
2929
ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
3030
bit_util,
3131
};
32-
use arrow_data::ArrayDataBuilder;
32+
use arrow_data::{ArrayDataBuilder, transform::MutableArrayData};
3333
use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
3434

35-
use num_traits::{One, Zero};
35+
use num_traits::Zero;
3636

3737
/// Take elements by index from [Array], creating a new [Array] from those indexes.
3838
///
@@ -611,9 +611,8 @@ fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
611611

612612
/// `take` implementation for list arrays
613613
///
614-
/// Calculates the index and indexed offset for the inner array,
615-
/// applying `take` on the inner array, then reconstructing a list array
616-
/// with the indexed offsets
614+
/// Copies the selected list entries' child slices into a new child array
615+
/// via `MutableArrayData`, then reconstructs a list array with new offsets
617616
fn take_list<IndexType, OffsetType>(
618617
values: &GenericListArray<OffsetType::Native>,
619618
indices: &PrimitiveArray<IndexType>,
@@ -624,23 +623,79 @@ where
624623
OffsetType::Native: OffsetSizeTrait,
625624
PrimitiveArray<OffsetType>: From<Vec<OffsetType::Native>>,
626625
{
627-
// TODO: Some optimizations can be done here such as if it is
628-
// taking the whole list or a contiguous sublist
629-
let (list_indices, offsets, null_buf) =
630-
take_value_indices_from_list::<IndexType, OffsetType>(values, indices)?;
631-
632-
let taken = take_impl::<OffsetType>(values.values().as_ref(), &list_indices)?;
633-
let value_offsets = Buffer::from_vec(offsets);
634-
// create a new list with taken data and computed null information
626+
let list_offsets = values.value_offsets();
627+
let child_data = values.values().to_data();
628+
let nulls = take_nulls(values.nulls(), indices);
629+
630+
let mut new_offsets = Vec::with_capacity(indices.len() + 1);
631+
new_offsets.push(OffsetType::Native::zero());
632+
633+
let use_nulls = child_data.null_count() > 0;
634+
635+
let capacity = child_data
636+
.len()
637+
.checked_div(values.len())
638+
.map(|v| v * indices.len())
639+
.unwrap_or_default();
640+
641+
let mut array_data = MutableArrayData::new(vec![&child_data], use_nulls, capacity);
642+
643+
match nulls.as_ref().filter(|n| n.null_count() > 0) {
644+
None => {
645+
for index in indices.values() {
646+
let ix = index.as_usize();
647+
let start = list_offsets[ix].as_usize();
648+
let end = list_offsets[ix + 1].as_usize();
649+
array_data.extend(0, start, end);
650+
new_offsets.push(OffsetType::Native::from_usize(array_data.len()).unwrap());
651+
}
652+
}
653+
Some(output_nulls) => {
654+
assert_eq!(output_nulls.len(), indices.len());
655+
656+
let mut last_filled = 0;
657+
for i in output_nulls.valid_indices() {
658+
let current = OffsetType::Native::from_usize(array_data.len()).unwrap();
659+
// Filling offsets for the null values between the two valid indices
660+
if last_filled < i {
661+
new_offsets.extend(std::iter::repeat_n(current, i - last_filled));
662+
}
663+
664+
// SAFETY: `i` comes from validity bitmap over `indices`, so in-bounds.
665+
let ix = unsafe { indices.value_unchecked(i) }.as_usize();
666+
let start = list_offsets[ix].as_usize();
667+
let end = list_offsets[ix + 1].as_usize();
668+
array_data.extend(0, start, end);
669+
new_offsets.push(OffsetType::Native::from_usize(array_data.len()).unwrap());
670+
last_filled = i + 1;
671+
}
672+
673+
// Filling offsets for null values at the end
674+
let final_offset = OffsetType::Native::from_usize(array_data.len()).unwrap();
675+
new_offsets.extend(std::iter::repeat_n(
676+
final_offset,
677+
indices.len() - last_filled,
678+
));
679+
}
680+
};
681+
682+
assert_eq!(
683+
new_offsets.len(),
684+
indices.len() + 1,
685+
"New offsets was filled under/over the expected capacity"
686+
);
687+
688+
let child_data = array_data.freeze();
689+
let value_offsets = Buffer::from_vec(new_offsets);
690+
635691
let list_data = ArrayDataBuilder::new(values.data_type().clone())
636692
.len(indices.len())
637-
.null_bit_buffer(Some(null_buf.into()))
693+
.nulls(nulls)
638694
.offset(0)
639-
.add_child_data(taken.into_data())
695+
.add_child_data(child_data)
640696
.add_buffer(value_offsets);
641697

642698
let list_data = unsafe { list_data.build_unchecked() };
643-
644699
Ok(GenericListArray::<OffsetType::Native>::from(list_data))
645700
}
646701

@@ -925,78 +980,6 @@ fn take_run<T: RunEndIndexType, I: ArrowPrimitiveType>(
925980
Ok(array_data.into())
926981
}
927982

928-
/// Takes/filters a list array's inner data using the offsets of the list array.
929-
///
930-
/// Where a list array has indices `[0,2,5,10]`, taking indices of `[2,0]` returns
931-
/// an array of the indices `[5..10, 0..2]` and offsets `[0,5,7]` (5 elements and 2
932-
/// elements)
933-
#[allow(clippy::type_complexity)]
934-
fn take_value_indices_from_list<IndexType, OffsetType>(
935-
list: &GenericListArray<OffsetType::Native>,
936-
indices: &PrimitiveArray<IndexType>,
937-
) -> Result<
938-
(
939-
PrimitiveArray<OffsetType>,
940-
Vec<OffsetType::Native>,
941-
MutableBuffer,
942-
),
943-
ArrowError,
944-
>
945-
where
946-
IndexType: ArrowPrimitiveType,
947-
OffsetType: ArrowPrimitiveType,
948-
OffsetType::Native: OffsetSizeTrait + std::ops::Add + Zero + One,
949-
PrimitiveArray<OffsetType>: From<Vec<OffsetType::Native>>,
950-
{
951-
// TODO: benchmark this function, there might be a faster unsafe alternative
952-
let offsets: &[OffsetType::Native] = list.value_offsets();
953-
954-
let mut new_offsets = Vec::with_capacity(indices.len());
955-
let mut values = Vec::new();
956-
let mut current_offset = OffsetType::Native::zero();
957-
// add first offset
958-
new_offsets.push(OffsetType::Native::zero());
959-
960-
// Initialize null buffer
961-
let num_bytes = bit_util::ceil(indices.len(), 8);
962-
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
963-
let null_slice = null_buf.as_slice_mut();
964-
965-
// compute the value indices, and set offsets accordingly
966-
for i in 0..indices.len() {
967-
if indices.is_valid(i) {
968-
let ix = indices
969-
.value(i)
970-
.to_usize()
971-
.ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
972-
let start = offsets[ix];
973-
let end = offsets[ix + 1];
974-
current_offset += end - start;
975-
new_offsets.push(current_offset);
976-
977-
let mut curr = start;
978-
979-
// if start == end, this slot is empty
980-
while curr < end {
981-
values.push(curr);
982-
curr += One::one();
983-
}
984-
if !list.is_valid(ix) {
985-
bit_util::unset_bit(null_slice, i);
986-
}
987-
} else {
988-
bit_util::unset_bit(null_slice, i);
989-
new_offsets.push(current_offset);
990-
}
991-
}
992-
993-
Ok((
994-
PrimitiveArray::<OffsetType>::from(values),
995-
new_offsets,
996-
null_buf,
997-
))
998-
}
999-
1000983
/// Takes/filters a fixed size list array's inner data using the offsets of the list array.
1001984
fn take_value_indices_from_fixed_size_list<IndexType>(
1002985
list: &FixedSizeListArray,
@@ -2497,37 +2480,76 @@ mod tests {
24972480
)
24982481
}
24992482

2500-
#[test]
2501-
fn test_take_value_index_from_list() {
2502-
let list = build_generic_list::<i32, Int32Type>(vec![
2483+
fn test_take_sliced_list_generic<S: OffsetSizeTrait + 'static>() {
2484+
let list = build_generic_list::<S, Int32Type>(vec![
25032485
Some(vec![0, 1]),
25042486
Some(vec![2, 3, 4]),
2505-
Some(vec![5, 6, 7, 8, 9]),
2487+
None,
2488+
Some(vec![]),
2489+
Some(vec![5, 6]),
2490+
Some(vec![7]),
2491+
]);
2492+
let sliced = list.slice(1, 4);
2493+
let indices = UInt32Array::from(vec![Some(3), Some(0), None, Some(2), Some(1)]);
2494+
2495+
let taken = take(&sliced, &indices, None).unwrap();
2496+
let taken = taken.as_list::<S>();
2497+
2498+
let expected = build_generic_list::<S, Int32Type>(vec![
2499+
Some(vec![5, 6]),
2500+
Some(vec![2, 3, 4]),
2501+
None,
2502+
Some(vec![]),
2503+
None,
25062504
]);
2507-
let indices = UInt32Array::from(vec![2, 0]);
25082505

2509-
let (indexed, offsets, null_buf) = take_value_indices_from_list(&list, &indices).unwrap();
2506+
assert_eq!(taken, &expected);
2507+
}
2508+
2509+
fn test_take_sliced_list_with_value_nulls_generic<S: OffsetSizeTrait + 'static>() {
2510+
let list = GenericListArray::<S>::from_iter_primitive::<Int32Type, _, _>(vec![
2511+
Some(vec![Some(10)]),
2512+
Some(vec![None, Some(1)]),
2513+
None,
2514+
Some(vec![Some(2), None]),
2515+
Some(vec![]),
2516+
Some(vec![Some(3)]),
2517+
]);
2518+
let sliced = list.slice(1, 4);
2519+
let indices = UInt32Array::from(vec![Some(2), Some(0), None, Some(3), Some(1)]);
2520+
2521+
let taken = take(&sliced, &indices, None).unwrap();
2522+
let taken = taken.as_list::<S>();
2523+
2524+
let expected = GenericListArray::<S>::from_iter_primitive::<Int32Type, _, _>(vec![
2525+
Some(vec![Some(2), None]),
2526+
Some(vec![None, Some(1)]),
2527+
None,
2528+
Some(vec![]),
2529+
None,
2530+
]);
25102531

2511-
assert_eq!(indexed, Int32Array::from(vec![5, 6, 7, 8, 9, 0, 1]));
2512-
assert_eq!(offsets, vec![0, 5, 7]);
2513-
assert_eq!(null_buf.as_slice(), &[0b11111111]);
2532+
assert_eq!(taken, &expected);
25142533
}
25152534

25162535
#[test]
2517-
fn test_take_value_index_from_large_list() {
2518-
let list = build_generic_list::<i64, Int32Type>(vec![
2519-
Some(vec![0, 1]),
2520-
Some(vec![2, 3, 4]),
2521-
Some(vec![5, 6, 7, 8, 9]),
2522-
]);
2523-
let indices = UInt32Array::from(vec![2, 0]);
2536+
fn test_take_sliced_list() {
2537+
test_take_sliced_list_generic::<i32>();
2538+
}
2539+
2540+
#[test]
2541+
fn test_take_sliced_large_list() {
2542+
test_take_sliced_list_generic::<i64>();
2543+
}
25242544

2525-
let (indexed, offsets, null_buf) =
2526-
take_value_indices_from_list::<_, Int64Type>(&list, &indices).unwrap();
2545+
#[test]
2546+
fn test_take_sliced_list_with_value_nulls() {
2547+
test_take_sliced_list_with_value_nulls_generic::<i32>();
2548+
}
25272549

2528-
assert_eq!(indexed, Int64Array::from(vec![5, 6, 7, 8, 9, 0, 1]));
2529-
assert_eq!(offsets, vec![0, 5, 7]);
2530-
assert_eq!(null_buf.as_slice(), &[0b11111111]);
2550+
#[test]
2551+
fn test_take_sliced_large_list_with_value_nulls() {
2552+
test_take_sliced_list_with_value_nulls_generic::<i64>();
25312553
}
25322554

25332555
#[test]

0 commit comments

Comments
 (0)