Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 101 additions & 74 deletions arrow-select/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
array: &GenericByteArray<T>,
indices: &PrimitiveArray<IndexType>,
) -> Result<GenericByteArray<T>, ArrowError> {
let mut values: Vec<u8> = Vec::new();
let mut offsets = Vec::with_capacity(indices.len() + 1);
offsets.push(T::Offset::default());

let input_offsets = array.value_offsets();
let mut capacity = 0;
let nulls = take_nulls(array.nulls(), indices);

let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 {
offsets.reserve(indices.len());
for index in indices.values() {
let index = index.as_usize();
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
offsets.push(
T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
);
}
let mut values = Vec::with_capacity(capacity);

for index in indices.values() {
values.extend_from_slice(array.value(index.as_usize()).as_ref());
}
(offsets, values)
} else if indices.null_count() == 0 {
offsets.reserve(indices.len());
for index in indices.values() {
let index = index.as_usize();
if array.is_valid(index) {
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
// Branch on output nulls — `None` means every output slot is valid.
match nulls.as_ref().filter(|n| n.null_count() > 0) {
// Fast path: no nulls in output, every index is valid.
None => {
for index in indices.values() {
let index = index.as_usize();
let start = input_offsets[index].as_usize();
let end = input_offsets[index + 1].as_usize();
capacity += end - start;
offsets.push(
T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
);
}
offsets.push(
T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
);
}
let mut values = Vec::with_capacity(capacity);

for index in indices.values() {
let index = index.as_usize();
if array.is_valid(index) {
values.extend_from_slice(array.value(index).as_ref());
}
}
(offsets, values)
} else if array.null_count() == 0 {
offsets.reserve(indices.len());
for (i, index) in indices.values().iter().enumerate() {
let index = index.as_usize();
if indices.is_valid(i) {
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
values.reserve(capacity);

let dst = values.spare_capacity_mut();
debug_assert!(dst.len() >= capacity);
let mut offset = 0;

for index in indices.values() {
// SAFETY: in-bounds proven by the first loop's bounds-checked offset access.
// dst asserted above to include the required capacity.
unsafe {
let data: &[u8] = array.value_unchecked(index.as_usize()).as_ref();
std::ptr::copy_nonoverlapping(
data.as_ptr(),
dst[offset..].as_mut_ptr().cast::<u8>(),
data.len(),
);
offset += data.len();
}
}
offsets.push(
T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
);
}
let mut values = Vec::with_capacity(capacity);

for (i, index) in indices.values().iter().enumerate() {
if indices.is_valid(i) {
values.extend_from_slice(array.value(index.as_usize()).as_ref());
// SAFETY: wrote exactly `capacity` bytes above; reserved on line above.
unsafe {
values.set_len(capacity);
}
}
(offsets, values)
} else {
let nulls = nulls.as_ref().unwrap();
offsets.reserve(indices.len());
for (i, index) in indices.values().iter().enumerate() {
let index = index.as_usize();
if nulls.is_valid(i) {
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
// Nullable path: only process valid (non-null) output positions.
Some(output_nulls) => {
let mut ranges = Vec::with_capacity(indices.len() - output_nulls.null_count());
let mut last_filled = 0;

// Pre-fill offsets; we overwrite valid positions below.
offsets.resize(indices.len() + 1, T::Offset::default());

// Pass 1: find all valid ranges that need to be copied.
for i in output_nulls.valid_indices() {
let current_offset = T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
// Fill offsets for skipped null slots so they get zero-length ranges.
if last_filled < i {
offsets[last_filled + 1..=i].fill(current_offset);
}

// SAFETY: `i` comes from a validity bitmap over `indices`, so it is in-bounds.
let index = unsafe { indices.value_unchecked(i) }.as_usize();
let start = input_offsets[index].as_usize();
let end = input_offsets[index + 1].as_usize();
capacity += end - start;
offsets[i + 1] = T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;

debug_assert!(end >= start, "invalid range: start ({start}) > end ({end})");

ranges.push((start, end));
last_filled = i + 1;
}
offsets.push(
T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,

// Fill trailing null offsets after the last valid position.
let final_offset = T::Offset::from_usize(capacity)
.ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
offsets[last_filled + 1..].fill(final_offset);
// Pass 2: copy byte data for all collected ranges.
values.reserve(capacity);
debug_assert_eq!(
ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
capacity,
"capacity must equal total bytes across all ranges"
);
}
let mut values = Vec::with_capacity(capacity);

for (i, index) in indices.values().iter().enumerate() {
// check index is valid before using index. The value in
// NULL index slots may not be within bounds of array
let index = index.as_usize();
if nulls.is_valid(i) {
values.extend_from_slice(array.value(index).as_ref());

let src = array.value_data();
let src = src.as_ptr();
let dst = values.spare_capacity_mut();
debug_assert!(dst.len() >= capacity);

let mut offset = 0;

for (start, end) in ranges.into_iter() {
let value_len = end - start;
// SAFETY: caller guarantees each (start, end) is in-bounds of `src`.
// `dst` asserted above to include the required capacity.
// The regions don't overlap (src is input, dst is a fresh allocation).
unsafe {
std::ptr::copy_nonoverlapping(
src.add(start),
dst[offset..].as_mut_ptr().cast::<u8>(),
value_len,
);
offset += value_len;
}
}
// SAFETY: caller guarantees `capacity` == total bytes across all ranges,
// so the loop above wrote exactly `capacity` bytes.
unsafe { values.set_len(capacity) };
}
(offsets, values)
};

T::Offset::from_usize(values.len())
.ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;

// SAFETY: offsets are monotonically increasing and in-bounds of `values`,
// and `nulls` (if present) has length == `indices.len()`.
let array = unsafe {
let offsets = OffsetBuffer::new_unchecked(offsets.into());
GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
Expand Down
Loading