Skip to content
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::encodings::rle::RleDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::FromBytes;
use crate::util::bit_util::FromBitpacked;

/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
macro_rules! make_reader {
Expand Down Expand Up @@ -128,7 +128,7 @@ struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {

impl<K, V> ByteArrayDictionaryReader<K, V>
where
K: FromBytes + Ord + ArrowNativeType,
K: FromBitpacked + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn new(
Expand All @@ -148,7 +148,7 @@ where

impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
where
K: FromBytes + Ord + ArrowNativeType,
K: FromBitpacked + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -226,7 +226,7 @@ struct DictionaryDecoder<K, V> {

impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
where
K: FromBytes + Ord + ArrowNativeType,
K: FromBitpacked + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
type Buffer = DictionaryBuffer<K, V>;
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::encodings::decoding::byte_stream_split_decoder::{
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{self, BitReader};
use crate::util::bit_util::{self, BitReader, FromBitpacked};

mod byte_stream_split_decoder;

Expand Down Expand Up @@ -455,7 +455,10 @@ impl<T: DataType> RleValueDecoder<T> {
}
}

impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
impl<T: DataType> Decoder<T> for RleValueDecoder<T>
where
T::T: FromBitpacked,
{
#[inline]
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
// Only support RLE value reader for boolean values with bit width of 1.
Expand Down Expand Up @@ -658,7 +661,7 @@ where

impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
where
T::T: Default + FromPrimitive + WrappingAdd + Copy,
T::T: Default + FromPrimitive + FromBitpacked + WrappingAdd + Copy,
{
// # of total values is derived from encoding
#[inline]
Expand Down
38 changes: 29 additions & 9 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{cmp, mem::size_of};
use bytes::Bytes;

use crate::errors::{ParquetError, Result};
use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};
use crate::util::bit_util::{self, BitReader, BitWriter, FromBitpacked};

/// Maximum groups of 8 values per bit-packed run. Current value is 64.
const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl RleEncoder {

/// Initialize the encoder from existing `buffer`
pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
assert!(bit_width <= 64);
debug_assert!(bit_width <= 64);
let bit_writer = BitWriter::new_from_buf(buffer);
RleEncoder {
bit_width,
Expand Down Expand Up @@ -352,7 +352,7 @@ impl RleDecoder {
// that damage L1d-cache occupancy. This results in a ~18% performance drop
#[inline(never)]
#[allow(unused)]
Comment thread
Dandandan marked this conversation as resolved.
pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
pub fn get<T: FromBitpacked>(&mut self) -> Result<Option<T>> {
assert!(size_of::<T>() <= 8);

while self.rle_left == 0 && self.bit_packed_left == 0 {
Expand Down Expand Up @@ -388,7 +388,7 @@ impl RleDecoder {
}

#[inline(never)]
pub fn get_batch<T: FromBytes + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
pub fn get_batch<T: FromBitpacked + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
assert!(size_of::<T>() <= 8);

let mut values_read = 0;
Expand Down Expand Up @@ -469,7 +469,7 @@ impl RleDecoder {
where
T: Default + Clone,
{
assert!(buffer.len() >= max_values);
debug_assert!(buffer.len() >= max_values);

let mut values_read = 0;
while values_read < max_values {
Expand Down Expand Up @@ -507,10 +507,30 @@ impl RleDecoder {
self.bit_packed_left = 0;
break;
}
buffer[values_read..values_read + num_values]
.iter_mut()
.zip(index_buf[..num_values].iter())
.for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
{
let out = &mut buffer[values_read..values_read + num_values];
let idx = &index_buf[..num_values];
let mut out_chunks = out.chunks_exact_mut(8);
let idx_chunks = idx.chunks_exact(8);
for (out_chunk, idx_chunk) in out_chunks.by_ref().zip(idx_chunks) {
let dict_len = dict.len();
assert!(
idx_chunk.iter().all(|&i| (i as usize) < dict_len),
"dictionary index out of bounds"
);
for (b, i) in out_chunk.iter_mut().zip(idx_chunk.iter()) {
// SAFETY: all indices checked above to be in bounds
b.clone_from(unsafe { dict.get_unchecked(*i as usize) });
}
}
for (b, i) in out_chunks
.into_remainder()
.iter_mut()
.zip(idx.chunks_exact(8).remainder().iter())
{
b.clone_from(&dict[*i as usize]);
}
}
self.bit_packed_left -= num_values as u32;
values_read += num_values;
if num_values < to_read {
Expand Down
83 changes: 72 additions & 11 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ pub unsafe trait FromBytes: Sized {
fn from_le_bytes(bs: Self::Buffer) -> Self;
}

/// Types that can be decoded from bitpacked representations.
///
/// This is implemented for primitive types and bool that can be
/// directly converted from a u64 value. Types like Int96, ByteArray,
/// and FixedLenByteArray that cannot be represented in 64 bits do not
/// implement this trait.
pub trait FromBitpacked: FromBytes {
/// Convert directly from a u64 value by truncation, avoiding byte slice copies.
Comment thread
Dandandan marked this conversation as resolved.
fn from_u64(v: u64) -> Self;
Comment thread
Dandandan marked this conversation as resolved.
Comment thread
Dandandan marked this conversation as resolved.
}

macro_rules! from_le_bytes {
($($ty: ty),*) => {
$(
Expand All @@ -60,11 +71,55 @@ macro_rules! from_le_bytes {
<$ty>::from_le_bytes(bs)
}
}
impl FromBitpacked for $ty {
#[inline]
fn from_u64(v: u64) -> Self {
v as Self
}
}
)*
};
}

from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64, f32, f64 }
from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64 }

// SAFETY: all bit patterns are valid for f32 and f64.
unsafe impl FromBytes for f32 {
const BIT_CAPACITY: usize = 32;
type Buffer = [u8; 4];
fn try_from_le_slice(b: &[u8]) -> Result<Self> {
Ok(Self::from_le_bytes(array_from_slice(b)?))
}
fn from_le_bytes(bs: Self::Buffer) -> Self {
f32::from_le_bytes(bs)
}
}

impl FromBitpacked for f32 {
#[inline]
fn from_u64(v: u64) -> Self {
f32::from_bits(v as u32)
}
}

// SAFETY: all bit patterns are valid for f64.
unsafe impl FromBytes for f64 {
const BIT_CAPACITY: usize = 64;
type Buffer = [u8; 8];
fn try_from_le_slice(b: &[u8]) -> Result<Self> {
Ok(Self::from_le_bytes(array_from_slice(b)?))
}
fn from_le_bytes(bs: Self::Buffer) -> Self {
f64::from_le_bytes(bs)
}
}

impl FromBitpacked for f64 {
#[inline]
fn from_u64(v: u64) -> Self {
f64::from_bits(v)
}
}
Comment thread
Dandandan marked this conversation as resolved.

// SAFETY: the 0000000x bit pattern is always valid for `bool`.
unsafe impl FromBytes for bool {
Expand All @@ -79,6 +134,13 @@ unsafe impl FromBytes for bool {
}
}

impl FromBitpacked for bool {
#[inline]
fn from_u64(v: u64) -> Self {
v != 0
Comment thread
Dandandan marked this conversation as resolved.
}
}

// SAFETY: BIT_CAPACITY is 0.
unsafe impl FromBytes for Int96 {
const BIT_CAPACITY: usize = 0;
Expand Down Expand Up @@ -139,7 +201,7 @@ pub(crate) fn read_num_bytes<T>(size: usize, src: &[u8]) -> T
where
T: FromBytes,
{
assert!(size <= src.len());
debug_assert!(size <= src.len());
let mut buffer = <T as FromBytes>::Buffer::default();
buffer.as_mut()[..size].copy_from_slice(&src[..size]);
<T>::from_le_bytes(buffer)
Expand Down Expand Up @@ -413,9 +475,9 @@ impl BitReader {
/// Reads a value of type `T` and of size `num_bits`.
///
/// Returns `None` if there's not enough data available. `Some` otherwise.
pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
assert!(num_bits <= 64);
assert!(num_bits <= size_of::<T>() * 8);
pub fn get_value<T: FromBitpacked>(&mut self, num_bits: usize) -> Option<T> {
debug_assert!(num_bits <= 64);
debug_assert!(num_bits <= size_of::<T>() * 8);

if self.byte_offset * 8 + self.bit_offset + num_bits > self.buffer.len() * 8 {
return None;
Expand Down Expand Up @@ -445,8 +507,7 @@ impl BitReader {
}
}

// TODO: better to avoid copying here
T::try_from_le_slice(v.as_bytes()).ok()
Some(T::from_u64(v))
Comment thread
Dandandan marked this conversation as resolved.
}

/// Read multiple values from their packed representation where each element is represented
Expand All @@ -457,8 +518,8 @@ impl BitReader {
/// This function panics if
/// - `num_bits` is larger than the bit-capacity of `T`
///
pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
assert!(num_bits <= size_of::<T>() * 8);
pub fn get_batch<T: FromBitpacked>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
debug_assert!(num_bits <= size_of::<T>() * 8);

let mut values_to_read = batch.len();
let needed_bits = num_bits * values_to_read;
Expand Down Expand Up @@ -602,7 +663,7 @@ impl BitReader {
///
/// Return the number of values skipped (up to num_values)
pub fn skip(&mut self, num_values: usize, num_bits: usize) -> usize {
assert!(num_bits <= 64);
debug_assert!(num_bits <= 64);

let needed_bits = num_bits * num_values;
let remaining_bits = (self.buffer.len() - self.byte_offset) * 8 - self.bit_offset;
Expand Down Expand Up @@ -1024,7 +1085,7 @@ mod tests {

fn test_get_batch_helper<T>(total: usize, num_bits: usize)
where
T: FromBytes + Default + Clone + Debug + Eq,
T: FromBitpacked + Default + Clone + Debug + Eq,
{
assert!(num_bits <= 64);
let num_bytes = ceil(num_bits, 8);
Expand Down
Loading