Skip to content

Commit a41e896

Browse files
committed
Improve take_bytes perf for null cases
1 parent c194e54 commit a41e896

File tree

1 file changed

+89
-77
lines changed

1 file changed

+89
-77
lines changed

arrow-select/src/take.rs

Lines changed: 89 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,42 @@ fn take_boolean<IndexType: ArrowPrimitiveType>(
490490
BooleanArray::new(val_buf, null_buf)
491491
}
492492

493+
/// Copies byte ranges from `src` into a new contiguous buffer.
494+
///
495+
/// # Safety
496+
/// Each `(start, end)` in `ranges` must be in-bounds of `src`, and
497+
/// `capacity` must equal the total bytes across all ranges.
498+
unsafe fn copy_byte_ranges(src: &[u8], ranges: &[(usize, usize)], capacity: usize) -> Vec<u8> {
499+
debug_assert_eq!(
500+
ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
501+
capacity,
502+
"capacity must equal total bytes across all ranges"
503+
);
504+
let src_len = src.len();
505+
let mut values = Vec::with_capacity(capacity);
506+
let src = src.as_ptr();
507+
let mut dst = values.as_mut_ptr();
508+
for &(start, end) in ranges {
509+
debug_assert!(start <= end, "invalid range: start ({start}) > end ({end})");
510+
debug_assert!(
511+
end <= src_len,
512+
"range end ({end}) out of bounds (src len {src_len})"
513+
);
514+
let len = end - start;
515+
// SAFETY: caller guarantees each (start, end) is in-bounds of `src`.
516+
// `dst` advances within the `capacity` bytes we allocated.
517+
// The regions don't overlap (src is input, dst is a fresh allocation).
518+
unsafe {
519+
std::ptr::copy_nonoverlapping(src.add(start), dst, len);
520+
dst = dst.add(len);
521+
}
522+
}
523+
// SAFETY: caller guarantees `capacity` == total bytes across all ranges,
524+
// so the loop above wrote exactly `capacity` bytes.
525+
unsafe { values.set_len(capacity) };
526+
values
527+
}
528+
493529
/// `take` implementation for string arrays
494530
fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
495531
array: &GenericByteArray<T>,
@@ -499,95 +535,71 @@ fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
499535
offsets.push(T::Offset::default());
500536

501537
let input_offsets = array.value_offsets();
538+
let input_values = array.value_data();
502539
let mut capacity = 0;
503540
let nulls = take_nulls(array.nulls(), indices);
504541

505-
let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 {
506-
offsets.reserve(indices.len());
507-
for index in indices.values() {
508-
let index = index.as_usize();
509-
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
510-
offsets.push(
511-
T::Offset::from_usize(capacity)
512-
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
513-
);
514-
}
515-
let mut values = Vec::with_capacity(capacity);
516-
517-
for index in indices.values() {
518-
values.extend_from_slice(array.value(index.as_usize()).as_ref());
519-
}
520-
(offsets, values)
521-
} else if indices.null_count() == 0 {
522-
offsets.reserve(indices.len());
523-
for index in indices.values() {
524-
let index = index.as_usize();
525-
if array.is_valid(index) {
526-
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
542+
// Pass 1: compute offsets and collect byte ranges.
543+
// Branch on output nulls — `None` means every output slot is valid.
544+
let ranges = match nulls.as_ref().filter(|n| n.null_count() > 0) {
545+
// Fast path: no nulls in output, every index is valid.
546+
None => {
547+
let mut ranges = Vec::with_capacity(indices.len());
548+
for index in indices.values() {
549+
let index = index.as_usize();
550+
let start = input_offsets[index].as_usize();
551+
let end = input_offsets[index + 1].as_usize();
552+
capacity += end - start;
553+
offsets.push(
554+
T::Offset::from_usize(capacity)
555+
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
556+
);
557+
ranges.push((start, end));
527558
}
528-
offsets.push(
529-
T::Offset::from_usize(capacity)
530-
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
531-
);
559+
ranges
532560
}
533-
let mut values = Vec::with_capacity(capacity);
561+
// Nullable path: only process valid (non-null) output positions.
562+
Some(output_nulls) => {
563+
let mut ranges = Vec::with_capacity(indices.len() - output_nulls.null_count());
564+
let mut last_filled = 0;
565+
566+
// Pre-fill offsets; we overwrite valid positions below.
567+
offsets.resize(indices.len() + 1, T::Offset::default());
568+
569+
for i in output_nulls.valid_indices() {
570+
let current_offset = T::Offset::from_usize(capacity)
571+
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
572+
// Fill offsets for skipped null slots so they get zero-length ranges.
573+
if last_filled < i {
574+
offsets[last_filled + 1..=i].fill(current_offset);
575+
}
534576

535-
for index in indices.values() {
536-
let index = index.as_usize();
537-
if array.is_valid(index) {
538-
values.extend_from_slice(array.value(index).as_ref());
539-
}
540-
}
541-
(offsets, values)
542-
} else if array.null_count() == 0 {
543-
offsets.reserve(indices.len());
544-
for (i, index) in indices.values().iter().enumerate() {
545-
let index = index.as_usize();
546-
if indices.is_valid(i) {
547-
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
577+
// SAFETY: `i` comes from a validity bitmap over `indices`, so it is in-bounds.
578+
let index = unsafe { indices.value_unchecked(i) }.as_usize();
579+
let start = input_offsets[index].as_usize();
580+
let end = input_offsets[index + 1].as_usize();
581+
capacity += end - start;
582+
offsets[i + 1] = T::Offset::from_usize(capacity)
583+
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
584+
ranges.push((start, end));
585+
last_filled = i + 1;
548586
}
549-
offsets.push(
550-
T::Offset::from_usize(capacity)
551-
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
552-
);
553-
}
554-
let mut values = Vec::with_capacity(capacity);
555587

556-
for (i, index) in indices.values().iter().enumerate() {
557-
if indices.is_valid(i) {
558-
values.extend_from_slice(array.value(index.as_usize()).as_ref());
559-
}
588+
// Fill trailing null offsets after the last valid position.
589+
let final_offset = T::Offset::from_usize(capacity)
590+
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
591+
offsets[last_filled + 1..].fill(final_offset);
592+
ranges
560593
}
561-
(offsets, values)
562-
} else {
563-
let nulls = nulls.as_ref().unwrap();
564-
offsets.reserve(indices.len());
565-
for (i, index) in indices.values().iter().enumerate() {
566-
let index = index.as_usize();
567-
if nulls.is_valid(i) {
568-
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
569-
}
570-
offsets.push(
571-
T::Offset::from_usize(capacity)
572-
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
573-
);
574-
}
575-
let mut values = Vec::with_capacity(capacity);
576-
577-
for (i, index) in indices.values().iter().enumerate() {
578-
// check index is valid before using index. The value in
579-
// NULL index slots may not be within bounds of array
580-
let index = index.as_usize();
581-
if nulls.is_valid(i) {
582-
values.extend_from_slice(array.value(index).as_ref());
583-
}
584-
}
585-
(offsets, values)
586594
};
587595

588-
T::Offset::from_usize(values.len())
589-
.ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;
596+
// Pass 2: copy byte data for all collected ranges.
597+
let values = unsafe { copy_byte_ranges(input_values, &ranges, capacity) };
598+
599+
debug_assert_eq!(capacity, values.len());
590600

601+
// SAFETY: offsets are monotonically increasing and in-bounds of `values`,
602+
// and `nulls` (if present) has length == `indices.len()`.
591603
let array = unsafe {
592604
let offsets = OffsetBuffer::new_unchecked(offsets.into());
593605
GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)

0 commit comments

Comments
 (0)