From 57cacc495ff0b9ff67087ead13ebab876b3ae0cb Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Sun, 29 Mar 2026 21:32:04 +0100 Subject: [PATCH 1/3] Improve take_bytes perf for null cases --- arrow-select/src/take.rs | 166 +++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 77 deletions(-) diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index ee813f5353c2..d53d83a3b43d 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -490,6 +490,42 @@ fn take_boolean( BooleanArray::new(val_buf, null_buf) } +/// Copies byte ranges from `src` into a new contiguous buffer. +/// +/// # Safety +/// Each `(start, end)` in `ranges` must be in-bounds of `src`, and +/// `capacity` must equal the total bytes across all ranges. +unsafe fn copy_byte_ranges(src: &[u8], ranges: &[(usize, usize)], capacity: usize) -> Vec { + debug_assert_eq!( + ranges.iter().map(|(s, e)| e - s).sum::(), + capacity, + "capacity must equal total bytes across all ranges" + ); + let src_len = src.len(); + let mut values = Vec::with_capacity(capacity); + let src = src.as_ptr(); + let mut dst = values.as_mut_ptr(); + for &(start, end) in ranges { + debug_assert!(start <= end, "invalid range: start ({start}) > end ({end})"); + debug_assert!( + end <= src_len, + "range end ({end}) out of bounds (src len {src_len})" + ); + let len = end - start; + // SAFETY: caller guarantees each (start, end) is in-bounds of `src`. + // `dst` advances within the `capacity` bytes we allocated. + // The regions don't overlap (src is input, dst is a fresh allocation). + unsafe { + std::ptr::copy_nonoverlapping(src.add(start), dst, len); + dst = dst.add(len); + } + } + // SAFETY: caller guarantees `capacity` == total bytes across all ranges, + // so the loop above wrote exactly `capacity` bytes. + unsafe { values.set_len(capacity) }; + values +} + /// `take` implementation for string arrays fn take_bytes( array: &GenericByteArray, @@ -499,95 +535,71 @@ fn take_bytes( offsets.push(T::Offset::default()); let input_offsets = array.value_offsets(); + let input_values = array.value_data(); let mut capacity = 0; let nulls = take_nulls(array.nulls(), indices); - let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 { - offsets.reserve(indices.len()); - for index in indices.values() { - let index = index.as_usize(); - capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize(); - offsets.push( - T::Offset::from_usize(capacity) - .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, - ); - } - let mut values = Vec::with_capacity(capacity); - - for index in indices.values() { - values.extend_from_slice(array.value(index.as_usize()).as_ref()); - } - (offsets, values) - } else if indices.null_count() == 0 { - offsets.reserve(indices.len()); - for index in indices.values() { - let index = index.as_usize(); - if array.is_valid(index) { - capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize(); + // Pass 1: compute offsets and collect byte ranges. + // Branch on output nulls — `None` means every output slot is valid. + let ranges = match nulls.as_ref().filter(|n| n.null_count() > 0) { + // Fast path: no nulls in output, every index is valid. + None => { + let mut ranges = Vec::with_capacity(indices.len()); + for index in indices.values() { + let index = index.as_usize(); + let start = input_offsets[index].as_usize(); + let end = input_offsets[index + 1].as_usize(); + capacity += end - start; + offsets.push( + T::Offset::from_usize(capacity) + .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, + ); + ranges.push((start, end)); } - offsets.push( - T::Offset::from_usize(capacity) - .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, - ); + ranges } - let mut values = Vec::with_capacity(capacity); + // Nullable path: only process valid (non-null) output positions. + Some(output_nulls) => { + let mut ranges = Vec::with_capacity(indices.len() - output_nulls.null_count()); + let mut last_filled = 0; + + // Pre-fill offsets; we overwrite valid positions below. + offsets.resize(indices.len() + 1, T::Offset::default()); + + for i in output_nulls.valid_indices() { + let current_offset = T::Offset::from_usize(capacity) + .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; + // Fill offsets for skipped null slots so they get zero-length ranges. + if last_filled < i { + offsets[last_filled + 1..=i].fill(current_offset); + } - for index in indices.values() { - let index = index.as_usize(); - if array.is_valid(index) { - values.extend_from_slice(array.value(index).as_ref()); - } - } - (offsets, values) - } else if array.null_count() == 0 { - offsets.reserve(indices.len()); - for (i, index) in indices.values().iter().enumerate() { - let index = index.as_usize(); - if indices.is_valid(i) { - capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize(); + // SAFETY: `i` comes from a validity bitmap over `indices`, so it is in-bounds. + let index = unsafe { indices.value_unchecked(i) }.as_usize(); + let start = input_offsets[index].as_usize(); + let end = input_offsets[index + 1].as_usize(); + capacity += end - start; + offsets[i + 1] = T::Offset::from_usize(capacity) + .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; + ranges.push((start, end)); + last_filled = i + 1; } - offsets.push( - T::Offset::from_usize(capacity) - .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, - ); - } - let mut values = Vec::with_capacity(capacity); - for (i, index) in indices.values().iter().enumerate() { - if indices.is_valid(i) { - values.extend_from_slice(array.value(index.as_usize()).as_ref()); - } + // Fill trailing null offsets after the last valid position. + let final_offset = T::Offset::from_usize(capacity) + .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; + offsets[last_filled + 1..].fill(final_offset); + ranges } - (offsets, values) - } else { - let nulls = nulls.as_ref().unwrap(); - offsets.reserve(indices.len()); - for (i, index) in indices.values().iter().enumerate() { - let index = index.as_usize(); - if nulls.is_valid(i) { - capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize(); - } - offsets.push( - T::Offset::from_usize(capacity) - .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, - ); - } - let mut values = Vec::with_capacity(capacity); - - for (i, index) in indices.values().iter().enumerate() { - // check index is valid before using index. The value in - // NULL index slots may not be within bounds of array - let index = index.as_usize(); - if nulls.is_valid(i) { - values.extend_from_slice(array.value(index).as_ref()); - } - } - (offsets, values) }; - T::Offset::from_usize(values.len()) - .ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?; + // Pass 2: copy byte data for all collected ranges. + let values = unsafe { copy_byte_ranges(input_values, &ranges, capacity) }; + + debug_assert_eq!(capacity, values.len()); + // SAFETY: offsets are monotonically increasing and in-bounds of `values`, + // and `nulls` (if present) has length == `indices.len()`. let array = unsafe { let offsets = OffsetBuffer::new_unchecked(offsets.into()); GenericByteArray::::new_unchecked(offsets, values.into(), nulls) From 089c9a97f33d9316e12832105df48f4fc76d3f33 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 30 Mar 2026 13:52:45 +0100 Subject: [PATCH 2/3] Restore non-null fastpath but unsafely --- arrow-select/src/take.rs | 46 +++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index d53d83a3b43d..6bb6a223c1e0 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -495,14 +495,19 @@ fn take_boolean( /// # Safety /// Each `(start, end)` in `ranges` must be in-bounds of `src`, and /// `capacity` must equal the total bytes across all ranges. -unsafe fn copy_byte_ranges(src: &[u8], ranges: &[(usize, usize)], capacity: usize) -> Vec { +unsafe fn copy_byte_ranges( + src: &[u8], + ranges: &[(usize, usize)], + capacity: usize, + values: &mut Vec, +) { + values.reserve(capacity); debug_assert_eq!( ranges.iter().map(|(s, e)| e - s).sum::(), capacity, "capacity must equal total bytes across all ranges" ); let src_len = src.len(); - let mut values = Vec::with_capacity(capacity); let src = src.as_ptr(); let mut dst = values.as_mut_ptr(); for &(start, end) in ranges { @@ -523,7 +528,6 @@ unsafe fn copy_byte_ranges(src: &[u8], ranges: &[(usize, usize)], capacity: usiz // SAFETY: caller guarantees `capacity` == total bytes across all ranges, // so the loop above wrote exactly `capacity` bytes. unsafe { values.set_len(capacity) }; - values } /// `take` implementation for string arrays @@ -531,20 +535,18 @@ fn take_bytes( array: &GenericByteArray, indices: &PrimitiveArray, ) -> Result, ArrowError> { + let mut values = Vec::new(); let mut offsets = Vec::with_capacity(indices.len() + 1); offsets.push(T::Offset::default()); let input_offsets = array.value_offsets(); - let input_values = array.value_data(); let mut capacity = 0; let nulls = take_nulls(array.nulls(), indices); - // Pass 1: compute offsets and collect byte ranges. // Branch on output nulls — `None` means every output slot is valid. - let ranges = match nulls.as_ref().filter(|n| n.null_count() > 0) { + match nulls.as_ref().filter(|n| n.null_count() > 0) { // Fast path: no nulls in output, every index is valid. None => { - let mut ranges = Vec::with_capacity(indices.len()); for index in indices.values() { let index = index.as_usize(); let start = input_offsets[index].as_usize(); @@ -554,9 +556,26 @@ fn take_bytes( T::Offset::from_usize(capacity) .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?, ); - ranges.push((start, end)); } - ranges + + values.reserve(capacity); + + let mut dst = values.as_mut_ptr(); + + for index in indices.values() { + // SAFETY: in-bounds proven by the first loop's bounds-checked offset access. + // dst stays within reserved capacity computed from the same indices. + unsafe { + let data: &[u8] = array.value_unchecked(index.as_usize()).as_ref(); + std::ptr::copy_nonoverlapping(data.as_ptr(), dst, data.len()); + dst = dst.add(data.len()); + } + } + + // SAFETY: wrote exactly `capacity` bytes above; reserved on line above. + unsafe { + values.set_len(capacity); + } } // Nullable path: only process valid (non-null) output positions. Some(output_nulls) => { @@ -566,6 +585,7 @@ fn take_bytes( // Pre-fill offsets; we overwrite valid positions below. offsets.resize(indices.len() + 1, T::Offset::default()); + // Pass 1: find all valid ranges that need to be copied. for i in output_nulls.valid_indices() { let current_offset = T::Offset::from_usize(capacity) .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; @@ -589,15 +609,11 @@ fn take_bytes( let final_offset = T::Offset::from_usize(capacity) .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; offsets[last_filled + 1..].fill(final_offset); - ranges + // Pass 2: copy byte data for all collected ranges. + unsafe { copy_byte_ranges(array.value_data(), &ranges, capacity, &mut values) }; } }; - // Pass 2: copy byte data for all collected ranges. - let values = unsafe { copy_byte_ranges(input_values, &ranges, capacity) }; - - debug_assert_eq!(capacity, values.len()); - // SAFETY: offsets are monotonically increasing and in-bounds of `values`, // and `nulls` (if present) has length == `indices.len()`. let array = unsafe { From 1b3d1fa8e235de32005f76a7068efb457da04562 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 1 Apr 2026 12:03:18 +0100 Subject: [PATCH 3/3] Inline both branches and `spare_capacity_mut` Signed-off-by: Adam Gutglick --- arrow-select/src/take.rs | 91 ++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index 6bb6a223c1e0..e07eca52b0bf 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -490,52 +490,12 @@ fn take_boolean( BooleanArray::new(val_buf, null_buf) } -/// Copies byte ranges from `src` into a new contiguous buffer. -/// -/// # Safety -/// Each `(start, end)` in `ranges` must be in-bounds of `src`, and -/// `capacity` must equal the total bytes across all ranges. -unsafe fn copy_byte_ranges( - src: &[u8], - ranges: &[(usize, usize)], - capacity: usize, - values: &mut Vec, -) { - values.reserve(capacity); - debug_assert_eq!( - ranges.iter().map(|(s, e)| e - s).sum::(), - capacity, - "capacity must equal total bytes across all ranges" - ); - let src_len = src.len(); - let src = src.as_ptr(); - let mut dst = values.as_mut_ptr(); - for &(start, end) in ranges { - debug_assert!(start <= end, "invalid range: start ({start}) > end ({end})"); - debug_assert!( - end <= src_len, - "range end ({end}) out of bounds (src len {src_len})" - ); - let len = end - start; - // SAFETY: caller guarantees each (start, end) is in-bounds of `src`. - // `dst` advances within the `capacity` bytes we allocated. - // The regions don't overlap (src is input, dst is a fresh allocation). - unsafe { - std::ptr::copy_nonoverlapping(src.add(start), dst, len); - dst = dst.add(len); - } - } - // SAFETY: caller guarantees `capacity` == total bytes across all ranges, - // so the loop above wrote exactly `capacity` bytes. - unsafe { values.set_len(capacity) }; -} - /// `take` implementation for string arrays fn take_bytes( array: &GenericByteArray, indices: &PrimitiveArray, ) -> Result, ArrowError> { - let mut values = Vec::new(); + let mut values: Vec = Vec::new(); let mut offsets = Vec::with_capacity(indices.len() + 1); offsets.push(T::Offset::default()); @@ -560,15 +520,21 @@ fn take_bytes( values.reserve(capacity); - let mut dst = values.as_mut_ptr(); + let dst = values.spare_capacity_mut(); + debug_assert!(dst.len() >= capacity); + let mut offset = 0; for index in indices.values() { // SAFETY: in-bounds proven by the first loop's bounds-checked offset access. - // dst stays within reserved capacity computed from the same indices. + // dst asserted above to include the required capacity. unsafe { let data: &[u8] = array.value_unchecked(index.as_usize()).as_ref(); - std::ptr::copy_nonoverlapping(data.as_ptr(), dst, data.len()); - dst = dst.add(data.len()); + std::ptr::copy_nonoverlapping( + data.as_ptr(), + dst[offset..].as_mut_ptr().cast::(), + data.len(), + ); + offset += data.len(); } } @@ -601,6 +567,9 @@ fn take_bytes( capacity += end - start; offsets[i + 1] = T::Offset::from_usize(capacity) .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; + + debug_assert!(end >= start, "invalid range: start ({start}) > end ({end})"); + ranges.push((start, end)); last_filled = i + 1; } @@ -610,7 +579,37 @@ fn take_bytes( .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?; offsets[last_filled + 1..].fill(final_offset); // Pass 2: copy byte data for all collected ranges. - unsafe { copy_byte_ranges(array.value_data(), &ranges, capacity, &mut values) }; + values.reserve(capacity); + debug_assert_eq!( + ranges.iter().map(|(s, e)| e - s).sum::(), + capacity, + "capacity must equal total bytes across all ranges" + ); + + let src = array.value_data(); + let src = src.as_ptr(); + let dst = values.spare_capacity_mut(); + debug_assert!(dst.len() >= capacity); + + let mut offset = 0; + + for (start, end) in ranges.into_iter() { + let value_len = end - start; + // SAFETY: caller guarantees each (start, end) is in-bounds of `src`. + // `dst` asserted above to include the required capacity. + // The regions don't overlap (src is input, dst is a fresh allocation). + unsafe { + std::ptr::copy_nonoverlapping( + src.add(start), + dst[offset..].as_mut_ptr().cast::(), + value_len, + ); + offset += value_len; + } + } + // SAFETY: caller guarantees `capacity` == total bytes across all ranges, + // so the loop above wrote exactly `capacity` bytes. + unsafe { values.set_len(capacity) }; } };