Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 133 additions & 4 deletions arrow-select/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,141 @@ fn take_bits<I: ArrowPrimitiveType>(
});
BooleanBuffer::new(output_buffer.into(), 0, len)
}
None => {
BooleanBuffer::collect_bool(len, |idx: usize| {
// SAFETY: idx<indices.len()
values.value(unsafe { indices.value_unchecked(idx).as_usize() })
None => take_bits_non_null_indices(indices, values),
}
}

fn take_bits_non_null_indices<I: ArrowPrimitiveType>(
indices: &PrimitiveArray<I>,
values: &BooleanBuffer,
) -> BooleanBuffer {
let values_slice: &[u8] = values.values();

// SAFETY: u8 is trivially transmutable to u32
let (prefix, aligned, suffix) = unsafe { values_slice.align_to::<u32>() };

// By acessing the values buffer as [u32], we allow LLVM to use gather instructions,
// which only exists for 32 and 64 bits values, which in turn allows better vectorization of
// the rest of code, except for the final bitmask packing, which requires usage of instrinsics.
// This only fails for BooleanBuffer'a created with Vec's of integers of 8 or 16 bits, which should
// be uncommon. Even then, because there are gather instructions only for unaligned data, we could use
// ptr::read_unaligned without any performace penalty if either the values buffer len is multiple of 4
// or if it's sliced and there's valid memory after the slice end allowing up to 24 bits of memory read
// This is currently unimplemented due to increased unsafe usage and probable low usefulness
if prefix.is_empty() && suffix.is_empty() {
let values_len = I::Native::usize_as(values.len());
let indices_chunks = indices.values().chunks_exact(64);
let remainder = indices_chunks.remainder();

let iter = indices_chunks.map(|indices_chunk| {
let indices_chunk: &[I::Native; 64] = indices_chunk.try_into().unwrap(); // unwrap should be optimized out

let in_bounds = indices_chunk
.iter()
.fold(true, |acc, bit_idx| acc & (*bit_idx < values_len));

// todo: print the exact out of bounds index
assert!(in_bounds, "Out-of-bounds index");

pack_bitmask(|i| {
let bit_idx = indices_chunk[i].as_usize() + values.offset();
let data_idx = bit_idx / 32;
let bit_offset = bit_idx % 32;

// SAFETY: bounds checked above
let value = unsafe { aligned.get_unchecked(data_idx).to_be() };

value & (1 << bit_offset) != 0
})
});

let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };

// TODO: to avoid buffer grow+copy below, add MutableBuffer::extend_from_trusted_len_iter
// or use Vec<u64>, which would be aligned to 8 bytes instead of 64
if !remainder.is_empty() {
let mut packed = 0;

for (bit_idx, i) in remainder.iter().enumerate() {
packed |= (values.value(i.as_usize()) as u64) << bit_idx;
}

buffer.push(packed)
}

BooleanBuffer::new(buffer.into(), 0, indices.len())
} else {
BooleanBuffer::collect_bool(indices.len(), |idx: usize| {
// SAFETY: idx<indices.len()
values.value(unsafe { indices.value_unchecked(idx).as_usize() })
})
}
}

#[cfg(target_arch = "x86")]
use std::arch::x86::*;

#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;

#[inline(always)]
fn pack_bitmask(f: impl Fn(usize) -> bool) -> u64 {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
if cfg!(target_feature = "avx2") {
return unsafe { pack_bitmask_avx2(f) };
} else if cfg!(target_feature = "sse2") {
return unsafe { pack_bitmask_sse2(f) };
}

pack_bitmask_portable(f)
}

#[inline(always)]
fn pack_bitmask_portable(f: impl Fn(usize) -> bool) -> u64 {
let mut mask = 0;

for i in 0..64 {
mask |= (f(i) as u64) << i;
}

mask
}

#[inline(always)]
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
unsafe fn pack_bitmask_avx2(f: impl Fn(usize) -> bool) -> u64 {
unsafe {
let unpacked: [u8; 64] = std::array::from_fn(|i| if f(i) { u8::MAX } else { 0 });

let low = _mm256_loadu_si256(unpacked.as_ptr() as *const _);
let low = _mm256_movemask_epi8(low) as u32 as u64;

let high = _mm256_loadu_si256(unpacked[32..].as_ptr() as *const _);
let high = _mm256_movemask_epi8(high) as u32 as u64;

(high << 32) | low
}
}

#[inline(always)]
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
unsafe fn pack_bitmask_sse2(f: impl Fn(usize) -> bool) -> u64 {
let unpacked: [u8; 64] = std::array::from_fn(|i| if f(i) { u8::MAX } else { 0 });

unsafe {
let lolo = _mm_loadu_si128(unpacked.as_ptr() as *const _);
let lolo = _mm_movemask_epi8(lolo) as u32 as u64;

let lo = _mm_loadu_si128(unpacked[16..].as_ptr() as *const _);
let lo = _mm_movemask_epi8(lo) as u32 as u64;

let hi = _mm_loadu_si128(unpacked[32..].as_ptr() as *const _);
let hi = _mm_movemask_epi8(hi) as u32 as u64;

let hihi = _mm_loadu_si128(unpacked[48..].as_ptr() as *const _);
let hihi = _mm_movemask_epi8(hihi) as u32 as u64;

(hihi << 48) | (hi << 32) | (lo << 16) | lolo
}
}

Expand Down
Loading