From a6a4642a8597556c582af1eb6a13086c7cbf4cbc Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sat, 3 Jan 2026 01:02:45 -0800 Subject: [PATCH 1/4] Minor: pre allocate view vec --- parquet/src/arrow/array_reader/byte_view_array.rs | 11 ++++------- parquet/src/arrow/buffer/view_buffer.rs | 8 ++++++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1933654118f3..58e58d712e74 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -162,13 +162,10 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::default(); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); + let num_values = num_values as usize; + let mut buffer = ViewBuffer::with_capacity(num_values); + let mut decoder = + ByteViewArrayDecoderPlain::new(buf, num_values, Some(num_values), self.validate_utf8); decoder.read(&mut buffer, usize::MAX)?; self.dict = Some(buffer); Ok(()) diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index a93674663f7b..6e2016fae9a2 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -33,6 +33,14 @@ pub struct ViewBuffer { } impl ViewBuffer { + /// Create a new ViewBuffer with capacity for the specified number of views + pub fn with_capacity(capacity: usize) -> Self { + Self { + views: Vec::with_capacity(capacity), + buffers: Vec::new(), + } + } + pub fn is_empty(&self) -> bool { self.views.is_empty() } From 530c8ec82b710d516aecd3dd26281558519f348f Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sat, 10 Jan 2026 13:17:42 -0800 Subject: [PATCH 2/4] Pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated. Api change - making batch size required for ArrayReader and buffers. --- parquet/benches/arrow_reader.rs | 92 ++++++++++++++----- parquet/src/arrow/array_reader/builder.rs | 60 +++++++++--- parquet/src/arrow/array_reader/byte_array.rs | 16 ++-- .../array_reader/byte_array_dictionary.rs | 23 +++-- .../src/arrow/array_reader/byte_view_array.rs | 10 +- .../array_reader/fixed_len_byte_array.rs | 18 +++- parquet/src/arrow/array_reader/list_array.rs | 2 +- parquet/src/arrow/array_reader/null_array.rs | 10 +- .../src/arrow/array_reader/primitive_array.rs | 57 ++++++++---- parquet/src/arrow/arrow_reader/mod.rs | 16 +++- parquet/src/arrow/buffer/dictionary_buffer.rs | 26 +++--- parquet/src/arrow/buffer/offset_buffer.rs | 32 ++++--- parquet/src/arrow/buffer/view_buffer.rs | 10 +- .../arrow/push_decoder/reader_builder/mod.rs | 14 +-- parquet/src/arrow/record_reader/buffer.rs | 12 ++- parquet/src/arrow/record_reader/mod.rs | 34 +++++-- 16 files changed, 307 insertions(+), 125 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 14fa16b3531e..9325a1faf5b6 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{ ListArrayReader, make_byte_array_reader, make_byte_view_array_reader, make_fixed_len_byte_array_reader, }; +use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use parquet::basic::Type; use parquet::data_type::{ByteArray, FixedLenByteArrayType}; use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; @@ -709,15 +710,23 @@ fn create_primitive_array_reader( use parquet::arrow::array_reader::PrimitiveArrayReader; match column_desc.physical_type() { Type::INT32 => { - let reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap(); Box::new(reader) } Type::INT64 => { - let reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap(); Box::new(reader) } _ => unreachable!(), @@ -730,9 +739,13 @@ fn create_f16_by_bytes_reader( ) -> Box { let physical_type = column_desc.physical_type(); match physical_type { - Type::FIXED_LEN_BYTE_ARRAY => { - make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() - } + Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap(), _ => unimplemented!(), } } @@ -743,12 +756,20 @@ fn create_decimal_by_bytes_reader( ) -> Box { let physical_type = column_desc.physical_type(); match physical_type { - Type::BYTE_ARRAY => { - make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() - } - Type::FIXED_LEN_BYTE_ARRAY => { - make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() - } + Type::BYTE_ARRAY => make_byte_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap(), + Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap(), _ => unimplemented!(), } } @@ -757,28 +778,52 @@ fn create_fixed_len_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + make_fixed_len_byte_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap() } fn create_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + make_byte_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap() } fn create_byte_view_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + make_byte_view_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap() } fn create_string_view_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + make_byte_view_array_reader( + Box::new(page_iterator), + column_desc, + None, + DEFAULT_BATCH_SIZE, + ) + .unwrap() } fn create_string_byte_array_dictionary_reader( @@ -788,8 +833,13 @@ fn create_string_byte_array_dictionary_reader( use parquet::arrow::array_reader::make_byte_array_dictionary_reader; let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type)) - .unwrap() + make_byte_array_dictionary_reader( + Box::new(page_iterator), + column_desc, + Some(arrow_type), + DEFAULT_BATCH_SIZE, + ) + .unwrap() } fn create_string_list_reader( diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 818e06e8b81f..980c391356f1 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -96,15 +96,26 @@ pub struct ArrayReaderBuilder<'a> { parquet_metadata: Option<&'a ParquetMetaData>, /// metrics metrics: &'a ArrowReaderMetrics, + /// Batch size for pre-allocating internal buffers + batch_size: usize, } impl<'a> ArrayReaderBuilder<'a> { - pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self { + /// Create a new `ArrayReaderBuilder` + /// + /// `batch_size` is used to pre-allocate internal buffers with the expected capacity, + /// avoiding reallocations when reading the first batch of data. + pub fn new( + row_groups: &'a dyn RowGroups, + metrics: &'a ArrowReaderMetrics, + batch_size: usize, + ) -> Self { Self { row_groups, cache_options: None, parquet_metadata: None, metrics, + batch_size, } } @@ -414,18 +425,21 @@ impl<'a> ArrayReaderBuilder<'a> { page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _, PhysicalType::INT32 => { if let Some(DataType::Null) = arrow_type { Box::new(NullArrayReader::::new( page_iterator, column_desc, + self.batch_size, )?) as _ } else { Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _ } } @@ -433,36 +447,56 @@ impl<'a> ArrayReaderBuilder<'a> { page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _, PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _, PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _, PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + self.batch_size, )?) as _, PhysicalType::BYTE_ARRAY => match arrow_type { - Some(DataType::Dictionary(_, _)) => { - make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? + Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + page_iterator, + column_desc, + arrow_type, + self.batch_size, + )?, + Some(DataType::Utf8View | DataType::BinaryView) => make_byte_view_array_reader( + page_iterator, + column_desc, + arrow_type, + self.batch_size, + )?, + _ => { + make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)? } - Some(DataType::Utf8View | DataType::BinaryView) => { - make_byte_view_array_reader(page_iterator, column_desc, arrow_type)? - } - _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?, }, PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type { - Some(DataType::Dictionary(_, _)) => { - make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? - } - _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?, + Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + page_iterator, + column_desc, + arrow_type, + self.batch_size, + )?, + _ => make_fixed_len_byte_array_reader( + page_iterator, + column_desc, + arrow_type, + self.batch_size, + )?, }, }; Ok(Some(reader)) @@ -532,7 +566,7 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) .build_array_reader(fields.as_ref(), &mask) .unwrap(); @@ -565,7 +599,7 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) .with_parquet_metadata(file_reader.metadata()) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 2d0d44fbe203..c856f7e3c677 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -38,10 +38,14 @@ use std::any::Any; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column +/// +/// `batch_size` is used to pre-allocate internal buffers, +/// avoiding reallocations when reading the first batch of data. pub fn make_byte_array_reader( pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + batch_size: usize, ) -> Result> { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -56,13 +60,13 @@ pub fn make_byte_array_reader( | ArrowType::Utf8 | ArrowType::Decimal128(_, _) | ArrowType::Decimal256(_, _) => { - let reader = GenericRecordReader::new(column_desc); + let reader = GenericRecordReader::new(column_desc, batch_size); Ok(Box::new(ByteArrayReader::::new( pages, data_type, reader, ))) } ArrowType::LargeUtf8 | ArrowType::LargeBinary => { - let reader = GenericRecordReader::new(column_desc); + let reader = GenericRecordReader::new(column_desc, batch_size); Ok(Box::new(ByteArrayReader::::new( pages, data_type, reader, ))) @@ -202,7 +206,7 @@ impl ColumnValueDecoder for ByteArrayColumnValueDecoder { )); } - let mut buffer = OffsetBuffer::default(); + let mut buffer = OffsetBuffer::with_capacity(0); let mut decoder = ByteArrayDecoderPlain::new( buf, num_values as usize, @@ -623,7 +627,7 @@ mod tests { .unwrap(); for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); + let mut output = OffsetBuffer::::with_capacity(0); decoder.set_data(encoding, page, 4, Some(4)).unwrap(); assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); @@ -678,7 +682,7 @@ mod tests { .unwrap(); for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); + let mut output = OffsetBuffer::::with_capacity(0); decoder.set_data(encoding, page, 4, Some(4)).unwrap(); assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); @@ -722,7 +726,7 @@ mod tests { // test nulls read for (encoding, page) in pages.clone() { - let mut output = OffsetBuffer::::default(); + let mut output = OffsetBuffer::::with_capacity(0); decoder.set_data(encoding, page, 4, None).unwrap(); assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index f7b93264b760..110991add57f 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -39,14 +39,14 @@ use crate::util::bit_util::FromBytes; /// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`] macro_rules! make_reader { ( - ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) { + ($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) => match ($k:expr, $v:expr) { $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+ } ) => { match (($k, $v)) { $( ($key_arrow, $value_arrow) => { - let reader = GenericRecordReader::new($column_desc); + let reader = GenericRecordReader::new($column_desc, $batch_size); Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new( $pages, $data_type, reader, ))) @@ -72,10 +72,13 @@ macro_rules! make_reader { /// It is therefore recommended that if `pages` contains data from multiple column chunks, /// that the read batch size used is a divisor of the row group size /// +/// `batch_size` is used to pre-allocate internal buffers, +/// avoiding reallocations when reading the first batch of data. pub fn make_byte_array_dictionary_reader( pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + batch_size: usize, ) -> Result> { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -88,7 +91,7 @@ pub fn make_byte_array_dictionary_reader( match &data_type { ArrowType::Dictionary(key_type, value_type) => { make_reader! { - (pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) { + (pages, column_desc, data_type, batch_size) => match (key_type.as_ref(), value_type.as_ref()) { (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32), (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64), (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32), @@ -272,7 +275,7 @@ where } let len = num_values as usize; - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); decoder.read(&mut buffer, usize::MAX)?; @@ -425,7 +428,7 @@ mod tests { .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len())) .unwrap(); - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); assert_eq!(decoder.read(&mut output, 3).unwrap(), 3); let mut valid = vec![false, false, true, true, false, true]; @@ -491,7 +494,7 @@ mod tests { .set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len())) .unwrap(); - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); // read two skip one assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); @@ -542,7 +545,7 @@ mod tests { .unwrap(); // Read all pages into single buffer - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); for (encoding, page) in pages { decoder.set_data(encoding, page, 4, Some(4)).unwrap(); @@ -585,7 +588,7 @@ mod tests { .unwrap(); // Read all pages into single buffer - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); for (encoding, page) in pages { decoder.set_data(encoding, page, 4, Some(4)).unwrap(); @@ -649,7 +652,7 @@ mod tests { .unwrap(); for (encoding, page) in pages.clone() { - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); decoder.set_data(encoding, page, 8, None).unwrap(); assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); @@ -664,7 +667,7 @@ mod tests { } for (encoding, page) in pages { - let mut output = DictionaryBuffer::::default(); + let mut output = DictionaryBuffer::::with_capacity(0); decoder.set_data(encoding, page, 8, None).unwrap(); assert_eq!(decoder.skip_values(1024).unwrap(), 0); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 58e58d712e74..a189a80cdf86 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -36,10 +36,14 @@ use bytes::Bytes; use std::any::Any; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +/// +/// `batch_size` is used to pre-allocate internal buffers, +/// avoiding reallocations when reading the first batch of data. pub fn make_byte_view_array_reader( pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + batch_size: usize, ) -> Result> { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -52,7 +56,7 @@ pub fn make_byte_view_array_reader( match data_type { ArrowType::BinaryView | ArrowType::Utf8View => { - let reader = GenericRecordReader::new(column_desc); + let reader = GenericRecordReader::new(column_desc, batch_size); Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) } @@ -766,7 +770,7 @@ mod tests { .unwrap(); for (encoding, page) in pages { - let mut output = ViewBuffer::default(); + let mut output = ViewBuffer::with_capacity(0); decoder.set_data(encoding, page, 4, Some(4)).unwrap(); assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); @@ -809,7 +813,7 @@ mod tests { let column_desc = utf8_column(); let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); - let mut view_buffer = ViewBuffer::default(); + let mut view_buffer = ViewBuffer::with_capacity(0); decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap(); decoder.read(&mut view_buffer, 1).unwrap(); decoder.read(&mut view_buffer, 1).unwrap(); diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 2297926add5f..d562c88cb844 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -40,10 +40,14 @@ use std::ops::Range; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column +/// +/// `batch_size` is used to pre-allocate internal buffers, +/// avoiding reallocations when reading the first batch of data. pub fn make_fixed_len_byte_array_reader( pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + batch_size: usize, ) -> Result> { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -126,6 +130,7 @@ pub fn make_fixed_len_byte_array_reader( column_desc, data_type, byte_length, + batch_size, ))) } @@ -144,14 +149,16 @@ impl FixedLenByteArrayReader { column_desc: ColumnDescPtr, data_type: ArrowType, byte_length: usize, + batch_size: usize, ) -> Self { + let record_reader = GenericRecordReader::new(column_desc, batch_size); Self { data_type, byte_length, pages, def_levels_buffer: None, rep_levels_buffer: None, - record_reader: GenericRecordReader::new(column_desc), + record_reader, } } } @@ -284,6 +291,15 @@ fn move_values( } impl ValuesBuffer for FixedLenByteArrayBuffer { + fn with_capacity(_capacity: usize) -> Self { + // byte_length is not known at trait level, so we return a default buffer + // The decoder will pre-allocate when it knows both capacity and byte_length + Self { + buffer: Vec::new(), + byte_length: None, + } + } + fn pad_nulls( &mut self, read_offset: usize, diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 1d5c68c22e11..5564aff631d7 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -566,7 +566,7 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 4ddd1df86442..97bc68d6d22e 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -47,8 +47,14 @@ where T::T: ArrowNativeType, { /// Construct null array reader. - pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { - let record_reader = RecordReader::::new(column_desc); + /// + /// `batch_size` is used to pre-allocate internal buffers. + pub fn new( + pages: Box, + column_desc: ColumnDescPtr, + batch_size: usize, + ) -> Result { + let record_reader = RecordReader::::new(column_desc, batch_size); Ok(Self { data_type: ArrowType::Null, diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index e1c944f60c42..eeb1e1c0981d 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -104,10 +104,13 @@ where Vec: IntoBuffer, { /// Construct primitive array reader. + /// + /// `batch_size` is used to pre-allocate internal buffers. pub fn new( pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + batch_size: usize, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -117,7 +120,7 @@ where .clone(), }; - let record_reader = RecordReader::::new(column_desc); + let record_reader = RecordReader::::new(column_desc, batch_size); Ok(Self { data_type, @@ -510,6 +513,7 @@ mod tests { Box::::default(), schema.column(0), None, + 1024, ) .unwrap(); @@ -552,9 +556,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 1024, + ) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -623,6 +631,7 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, + 1024, ) .expect("Unable to get array reader"); @@ -758,9 +767,13 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 1024, + ) + .unwrap(); let mut accu_len: usize = 0; @@ -834,9 +847,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 1024, + ) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -893,9 +910,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 1024, + ) + .unwrap(); // read data from the reader // the data type is decimal(18,4) @@ -955,9 +976,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 1024, + ) + .unwrap(); // read data from the reader // the data type is date diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..399401f0e75a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -57,6 +57,9 @@ mod read_plan; pub(crate) mod selection; pub mod statistics; +/// Default batch size for reading parquet files +pub const DEFAULT_BATCH_SIZE: usize = 1024; + /// Builder for constructing Parquet readers that decode into [Apache Arrow] /// arrays. /// @@ -168,7 +171,7 @@ impl ArrowReaderBuilder { metadata: metadata.metadata, schema: metadata.schema, fields: metadata.fields, - batch_size: 1024, + batch_size: DEFAULT_BATCH_SIZE, row_groups: None, projection: ProjectionMask::all(), filter: None, @@ -196,7 +199,7 @@ impl ArrowReaderBuilder { &self.schema } - /// Set the size of [`RecordBatch`] to produce. Defaults to 1024 + /// Set the size of [`RecordBatch`] to produce. Defaults to [`DEFAULT_BATCH_SIZE`] /// If the batch_size more than the file row count, use the file row count. pub fn with_batch_size(self, batch_size: usize) -> Self { // Try to avoid allocate large buffer @@ -1213,7 +1216,10 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + let mut cache_projection = predicate.projection().clone(); + cache_projection.intersect(&projection); + + let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size) .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), predicate.projection())?; @@ -1221,7 +1227,7 @@ impl ParquetRecordBatchReaderBuilder { } } - let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size) .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), &projection)?; @@ -1528,7 +1534,7 @@ impl ParquetRecordBatchReader { ) -> Result { // note metrics are not supported in this API let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) + let array_reader = ArrayReaderBuilder::new(row_groups, &metrics, batch_size) .with_parquet_metadata(row_groups.metadata()) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 71fb18917d9a..c3cd5744d285 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -38,14 +38,6 @@ pub enum DictionaryBuffer { Values { values: OffsetBuffer }, } -impl Default for DictionaryBuffer { - fn default() -> Self { - Self::Values { - values: Default::default(), - } - } -} - impl DictionaryBuffer { #[allow(unused)] pub fn len(&self) -> usize { @@ -103,7 +95,7 @@ impl DictionaryBuffer { match self { Self::Values { values } => Ok(values), Self::Dict { keys, values } => { - let mut spilled = OffsetBuffer::default(); + let mut spilled = OffsetBuffer::with_capacity(0); let data = values.to_data(); let dict_buffers = data.buffers(); let dict_offsets = dict_buffers[0].typed_data::(); @@ -202,6 +194,12 @@ impl DictionaryBuffer { } impl ValuesBuffer for DictionaryBuffer { + fn with_capacity(capacity: usize) -> Self { + Self::Values { + values: OffsetBuffer::with_capacity(capacity), + } + } + fn pad_nulls( &mut self, read_offset: usize, @@ -288,7 +286,7 @@ mod tests { let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"])); - let mut buffer = DictionaryBuffer::::default(); + let mut buffer = DictionaryBuffer::::with_capacity(0); // Read some data preserving the dictionary let values = &[1, 0, 3, 2, 4]; @@ -310,7 +308,7 @@ mod tests { buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice()); assert_eq!(buffer.len(), 13); - let split = std::mem::take(&mut buffer); + let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0)); let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); assert_eq!(array.data_type(), &dict_type); @@ -345,7 +343,7 @@ mod tests { .unwrap() .extend_from_slice(&[0, 1, 0, 1]); - let array = std::mem::take(&mut buffer) + let array = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0)) .into_array(None, &dict_type) .unwrap(); assert_eq!(array.data_type(), &dict_type); @@ -373,7 +371,7 @@ mod tests { let dict_type = ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); - let mut buffer = DictionaryBuffer::::default(); + let mut buffer = DictionaryBuffer::::with_capacity(0); let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef; buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]); @@ -384,7 +382,7 @@ mod tests { err ); - let mut buffer = DictionaryBuffer::::default(); + let mut buffer = DictionaryBuffer::::with_capacity(0); let d = Arc::new(StringArray::from(vec![""])) as ArrayRef; buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]); diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 209ed4e5c15f..087900b62be2 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -32,18 +32,20 @@ pub struct OffsetBuffer { pub values: Vec, } -impl Default for OffsetBuffer { - fn default() -> Self { - let mut offsets = Vec::new(); - offsets.resize(1, I::default()); +impl OffsetBuffer { + /// Create a new `OffsetBuffer` with capacity for at least `capacity` elements + /// + /// Pre-allocates the offsets vector to avoid reallocations during reading. + /// The values vector is not pre-allocated as its size is unpredictable. + pub fn with_capacity(capacity: usize) -> Self { + let mut offsets = Vec::with_capacity(capacity + 1); + offsets.push(I::default()); Self { offsets, values: Vec::new(), } } -} -impl OffsetBuffer { /// Returns the number of byte arrays in this buffer pub fn len(&self) -> usize { self.offsets.len() - 1 @@ -139,6 +141,10 @@ impl OffsetBuffer { } impl ValuesBuffer for OffsetBuffer { + fn with_capacity(capacity: usize) -> Self { + Self::with_capacity(capacity) + } + fn pad_nulls( &mut self, read_offset: usize, @@ -195,7 +201,7 @@ mod tests { #[test] fn test_offset_buffer_empty() { - let buffer = OffsetBuffer::::default(); + let buffer = OffsetBuffer::::with_capacity(0); let array = buffer.into_array(None, ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); assert_eq!(strings.len(), 0); @@ -203,7 +209,7 @@ mod tests { #[test] fn test_offset_buffer_append() { - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); buffer.try_push("hello".as_bytes(), true).unwrap(); buffer.try_push("bar".as_bytes(), true).unwrap(); buffer @@ -220,11 +226,11 @@ mod tests { #[test] fn test_offset_buffer() { - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); for v in ["hello", "world", "cupcakes", "a", "b", "c"] { buffer.try_push(v.as_bytes(), false).unwrap() } - let split = std::mem::take(&mut buffer); + let split = std::mem::replace(&mut buffer, OffsetBuffer::with_capacity(0)); let array = split.into_array(None, ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); @@ -244,7 +250,7 @@ mod tests { #[test] fn test_offset_buffer_pad_nulls() { - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); let values = ["a", "b", "c", "def", "gh"]; for v in &values { buffer.try_push(v.as_bytes(), false).unwrap() @@ -287,7 +293,7 @@ mod tests { let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101]; std::str::from_utf8(valid_4_byte_utf8).unwrap(); - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); buffer.try_push(valid_2_byte_utf8, true).unwrap(); buffer.try_push(valid_3_byte_utf8, true).unwrap(); buffer.try_push(valid_4_byte_utf8, true).unwrap(); @@ -320,7 +326,7 @@ mod tests { #[test] fn test_pad_nulls_empty() { - let mut buffer = OffsetBuffer::::default(); + let mut buffer = OffsetBuffer::::with_capacity(0); let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9)); buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 6e2016fae9a2..e7cc6eb1fab7 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -80,6 +80,10 @@ impl ViewBuffer { } impl ValuesBuffer for ViewBuffer { + fn with_capacity(capacity: usize) -> Self { + Self::with_capacity(capacity) + } + fn pad_nulls( &mut self, read_offset: usize, @@ -102,7 +106,7 @@ mod tests { #[test] fn test_view_buffer_empty() { - let buffer = ViewBuffer::default(); + let buffer = ViewBuffer::with_capacity(0); let array = buffer.into_array(None, &ArrowType::Utf8View); let strings = array .as_any() @@ -113,7 +117,7 @@ mod tests { #[test] fn test_view_buffer_append_view() { - let mut buffer = ViewBuffer::default(); + let mut buffer = ViewBuffer::with_capacity(0); let data = b"0123456789long string to test string view"; let string_buffer = Buffer::from(data); let block_id = buffer.append_block(string_buffer); @@ -141,7 +145,7 @@ mod tests { #[test] fn test_view_buffer_pad_null() { - let mut buffer = ViewBuffer::default(); + let mut buffer = ViewBuffer::with_capacity(0); let data = b"0123456789long string to test string view"; let string_buffer = Buffer::from(data); let block_id = buffer.append_block(string_buffer); diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d3d78ca7c263..582e18b2a08b 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -432,10 +432,11 @@ impl RowGroupReaderBuilder { let cache_options = filter_info.cache_builder().producer(); - let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) - .with_cache_options(Some(&cache_options)) - .with_parquet_metadata(&self.metadata) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let array_reader = + ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size) + .with_cache_options(Some(&cache_options)) + .with_parquet_metadata(&self.metadata) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; // Reset to original policy before each predicate so the override // can detect page skipping for THIS predicate's columns. @@ -608,8 +609,9 @@ impl RowGroupReaderBuilder { let plan = plan_builder.build(); // if we have any cached results, connect them up - let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics) - .with_parquet_metadata(&self.metadata); + let array_reader_builder = + ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size) + .with_parquet_metadata(&self.metadata); let array_reader = if let Some(cache_info) = cache_info.as_ref() { let cache_options: CacheOptions = cache_info.builder().consumer(); array_reader_builder diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 880407a54745..6e0855dda376 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -18,7 +18,13 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; /// A buffer that supports padding with nulls -pub trait ValuesBuffer: Default { +pub trait ValuesBuffer { + /// Create a new buffer with capacity for at least `capacity` elements + /// + /// This allows pre-allocating buffers to avoid reallocations during reading, + /// improving performance when the number of values is known in advance. + fn with_capacity(capacity: usize) -> Self; + /// If a column contains nulls, more level data may be read than value data, as null /// values are not encoded. Therefore, first the levels data is read, the null count /// determined, and then the corresponding number of values read to a [`ValuesBuffer`]. @@ -43,6 +49,10 @@ pub trait ValuesBuffer: Default { } impl ValuesBuffer for Vec { + fn with_capacity(capacity: usize) -> Self { + Vec::with_capacity(capacity) + } + fn pad_nulls( &mut self, read_offset: usize, diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 2092b4972d9a..1ab2fbbaf1d9 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -58,6 +58,8 @@ pub struct GenericRecordReader { num_values: usize, /// Number of buffered records num_records: usize, + /// Capacity hint for pre-allocating buffers based on batch size + capacity_hint: usize, } impl GenericRecordReader @@ -66,20 +68,25 @@ where CV: ColumnValueDecoder, { /// Create a new [`GenericRecordReader`] - pub fn new(desc: ColumnDescPtr) -> Self { + /// + /// The capacity is used to pre-allocate internal buffers, avoiding reallocations + /// when reading the first batch of data. For optimal performance, set this to + /// the expected batch size. + pub fn new(desc: ColumnDescPtr, capacity: usize) -> Self { let def_levels = (desc.max_def_level() > 0) .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc))); let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { - values: V::default(), + values: V::with_capacity(capacity), def_levels, rep_levels, column_reader: None, column_desc: desc, num_values: 0, num_records: 0, + capacity_hint: capacity, } } @@ -169,7 +176,9 @@ where /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. pub fn consume_record_data(&mut self) -> V { - std::mem::take(&mut self.values) + // Replace the buffer with a new one that has the same capacity + // This avoids reallocations on subsequent batches + std::mem::replace(&mut self.values, V::with_capacity(self.capacity_hint)) } /// Returns currently stored null bitmap data for nullable columns. @@ -208,6 +217,11 @@ where /// Try to read one batch of data returning the number of records read fn read_one_batch(&mut self, batch_size: usize) -> Result { + // Update capacity hint to the largest batch size seen + if batch_size > self.capacity_hint { + self.capacity_hint = batch_size; + } + let (records_read, values_read, levels_read) = self.column_reader.as_mut().unwrap().read_records( batch_size, @@ -272,7 +286,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); // First page @@ -345,7 +359,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); // First page @@ -447,7 +461,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); // First page @@ -550,7 +564,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); { let values = [100; 5000]; @@ -600,7 +614,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let mut record_reader = RecordReader::::new(desc); + let mut record_reader = RecordReader::::new(desc, 1024); let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(record_reader.read_records(4).unwrap(), 4); @@ -639,7 +653,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); // First page @@ -713,7 +727,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone()); + let mut record_reader = RecordReader::::new(desc.clone(), 1024); // First page From dd4884c57b534df6d253ce2d3de205f634391cd6 Mon Sep 17 00:00:00 2001 From: lyang24 Date: Wed, 21 Jan 2026 19:12:17 -0800 Subject: [PATCH 3/4] made the value buffer optional and made the initialization on read to avoid allocate one extra buffer at the end --- parquet/src/arrow/record_reader/mod.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 1ab2fbbaf1d9..745725bbf676 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -50,7 +50,9 @@ pub(crate) type ColumnReader = pub struct GenericRecordReader { column_desc: ColumnDescPtr, - values: V, + /// Values buffer, lazily initialized on first read to avoid + /// allocating a buffer that may never be used (e.g., after the last batch) + values: Option, def_levels: Option, rep_levels: Option>, column_reader: Option>, @@ -79,7 +81,7 @@ where let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { - values: V::with_capacity(capacity), + values: None, // Lazily initialized on first read def_levels, rep_levels, column_reader: None, @@ -176,9 +178,9 @@ where /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. pub fn consume_record_data(&mut self) -> V { - // Replace the buffer with a new one that has the same capacity - // This avoids reallocations on subsequent batches - std::mem::replace(&mut self.values, V::with_capacity(self.capacity_hint)) + // Take the buffer, leaving None. The next read will lazily allocate a new buffer. + // This avoids allocating a buffer that may never be used (e.g., after the last batch). + self.values.take().unwrap_or_else(|| V::with_capacity(0)) } /// Returns currently stored null bitmap data for nullable columns. @@ -222,12 +224,18 @@ where self.capacity_hint = batch_size; } + // Lazily initialize buffer on first read + let capacity_hint = self.capacity_hint; + let values = self + .values + .get_or_insert_with(|| V::with_capacity(capacity_hint)); + let (records_read, values_read, levels_read) = self.column_reader.as_mut().unwrap().read_records( batch_size, self.def_levels.as_mut(), self.rep_levels.as_mut(), - &mut self.values, + values, )?; if values_read < levels_read { @@ -235,7 +243,7 @@ where general_err!("Definition levels should exist when data is less than levels!") })?; - self.values.pad_nulls( + values.pad_nulls( self.num_values, values_read, levels_read, From 3d7ba2d70093e09e4d1c3672399c515cf0a8b882 Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sat, 21 Mar 2026 16:08:10 -0700 Subject: [PATCH 4/4] Optimize dict decode and view writes - Optimize OffsetBuffer dict decode by inlining extend + offset push - Use raw ptr writes in ByteViewArray dictionary decoder for better perf - Remove unused append_raw_view_unchecked from ViewBuffer Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/array_reader/builder.rs | 26 ++++++++------- parquet/src/arrow/array_reader/byte_array.rs | 32 ++++++++++-------- .../src/arrow/array_reader/byte_view_array.rs | 33 ++++++++++++------- parquet/src/arrow/array_reader/list_array.rs | 4 ++- .../src/arrow/array_reader/primitive_array.rs | 15 +++++---- parquet/src/arrow/arrow_reader/mod.rs | 9 +++-- parquet/src/arrow/buffer/offset_buffer.rs | 8 ++++- parquet/src/arrow/buffer/view_buffer.rs | 25 ++++---------- .../arrow/push_decoder/reader_builder/mod.rs | 16 ++++----- parquet/src/arrow/record_reader/mod.rs | 15 +++++---- 10 files changed, 101 insertions(+), 82 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 980c391356f1..d806b2147a09 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -33,6 +33,7 @@ use crate::arrow::array_reader::{ NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader, make_byte_array_reader, }; +use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType}; use crate::basic::Type as PhysicalType; @@ -102,23 +103,24 @@ pub struct ArrayReaderBuilder<'a> { impl<'a> ArrayReaderBuilder<'a> { /// Create a new `ArrayReaderBuilder` - /// - /// `batch_size` is used to pre-allocate internal buffers with the expected capacity, - /// avoiding reallocations when reading the first batch of data. - pub fn new( - row_groups: &'a dyn RowGroups, - metrics: &'a ArrowReaderMetrics, - batch_size: usize, - ) -> Self { + pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self { Self { row_groups, cache_options: None, parquet_metadata: None, metrics, - batch_size, + batch_size: DEFAULT_BATCH_SIZE, } } + /// Set the batch size used to pre-allocate internal buffers. + /// + /// This avoids reallocations when reading the first batch of data. + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + /// Add cache options to the builder pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self { self.cache_options = cache_options; @@ -566,7 +568,8 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + .with_batch_size(DEFAULT_BATCH_SIZE) .build_array_reader(fields.as_ref(), &mask) .unwrap(); @@ -599,7 +602,8 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + .with_batch_size(DEFAULT_BATCH_SIZE) .with_parquet_metadata(file_reader.metadata()) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index c856f7e3c677..cf40d0576d17 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -485,24 +485,28 @@ impl ByteArrayDecoderDeltaLength { let initial_values_length = output.values.len(); let to_read = len.min(self.lengths.len() - self.length_offset); - output.offsets.reserve(to_read); - let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; - let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); - output.values.reserve(total_bytes); - let mut current_offset = self.data_offset; - for length in src_lengths { - let end_offset = current_offset + *length as usize; - output.try_push( - &self.data.as_ref()[current_offset..end_offset], - self.validate_utf8, - )?; - current_offset = end_offset; - } + // Reserve capacity for both offsets and values upfront + output.offsets.reserve(to_read); + output.values.reserve(total_bytes); - self.data_offset = current_offset; + // Delta length data is contiguous — copy all value bytes at once + let data_end = self.data_offset + total_bytes; + output + .values + .extend_from_slice(&self.data.as_ref()[self.data_offset..data_end]); + + // Compute and extend offsets in batch using extend + let base_offset = initial_values_length; + let mut running = base_offset; + output.offsets.extend(src_lengths.iter().map(|length| { + running += *length as usize; + I::from_usize(running).expect("index overflow decoding byte array") + })); + + self.data_offset = data_end; self.length_offset += to_read; if self.validate_utf8 { diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index a189a80cdf86..c134261609be 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -675,13 +675,20 @@ impl ByteViewArrayDecoderDelta { // fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - output.views.reserve(len.min(self.decoder.remaining())); + let to_reserve = len.min(self.decoder.remaining()); + output.views.reserve(to_reserve); // array buffer only have long strings let mut array_buffer: Vec = Vec::with_capacity(4096); let buffer_id = output.buffers.len() as u32; + // Use unsafe ptr writes instead of per-element push to avoid + // repeated length checks. Safety: we reserved enough space above. + let views_ptr = output.views.as_mut_ptr(); + let initial_len = output.views.len(); + let mut write_count = 0; + let read = if !self.validate_utf8 { self.decoder.read(len, |bytes| { let offset = array_buffer.len(); @@ -691,18 +698,18 @@ impl ByteViewArrayDecoderDelta { array_buffer.extend_from_slice(bytes); } - // # Safety - // The buffer_id is the last buffer in the output buffers - // The offset is calculated from the buffer, so it is valid + // Safety: views_ptr is valid for writes, we reserved enough space, + // and write_count < to_reserve. unsafe { - output.append_raw_view_unchecked(view); + views_ptr.add(initial_len + write_count).write(view); } + write_count += 1; Ok(()) })? } else { // utf8 validation buffer has only short strings. These short // strings are inlined into the views but we copy them into a - // contiguous buffer to accelerate validation.® + // contiguous buffer to accelerate validation. let mut utf8_validation_buffer = Vec::with_capacity(4096); let v = self.decoder.read(len, |bytes| { @@ -715,13 +722,12 @@ impl ByteViewArrayDecoderDelta { utf8_validation_buffer.extend_from_slice(bytes); } - // # Safety - // The buffer_id is the last buffer in the output buffers - // The offset is calculated from the buffer, so it is valid - // Utf-8 validation is done later + // Safety: views_ptr is valid for writes, we reserved enough space, + // and write_count < to_reserve. Utf-8 validation is done later. unsafe { - output.append_raw_view_unchecked(view); + views_ptr.add(initial_len + write_count).write(view); } + write_count += 1; Ok(()) })?; check_valid_utf8(&array_buffer)?; @@ -729,6 +735,11 @@ impl ByteViewArrayDecoderDelta { v }; + // Safety: we wrote exactly `read` views via ptr writes above + unsafe { + output.views.set_len(initial_len + read); + } + let actual_block_id = output.append_block(Buffer::from_vec(array_buffer)); assert_eq!(actual_block_id, buffer_id); Ok(read) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 5564aff631d7..a7adc01b912d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -249,6 +249,7 @@ mod tests { use crate::arrow::array_reader::ArrayReaderBuilder; use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema}; @@ -566,7 +567,8 @@ mod tests { .unwrap(); let metrics = ArrowReaderMetrics::disabled(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024) + let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + .with_batch_size(DEFAULT_BATCH_SIZE) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index eeb1e1c0981d..71218a628201 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -439,6 +439,7 @@ fn pack_dictionary_impl( mod tests { use super::*; use crate::arrow::array_reader::test_util::EmptyPageIterator; + use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use crate::basic::Encoding; use crate::column::page::Page; use crate::data_type::{Int32Type, Int64Type}; @@ -513,7 +514,7 @@ mod tests { Box::::default(), schema.column(0), None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); @@ -560,7 +561,7 @@ mod tests { Box::new(page_iterator), column_desc, None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); @@ -631,7 +632,7 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, - 1024, + DEFAULT_BATCH_SIZE, ) .expect("Unable to get array reader"); @@ -771,7 +772,7 @@ mod tests { Box::new(page_iterator), column_desc, None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); @@ -851,7 +852,7 @@ mod tests { Box::new(page_iterator), column_desc, None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); @@ -914,7 +915,7 @@ mod tests { Box::new(page_iterator), column_desc, None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); @@ -980,7 +981,7 @@ mod tests { Box::new(page_iterator), column_desc, None, - 1024, + DEFAULT_BATCH_SIZE, ) .unwrap(); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 399401f0e75a..8a7e602618c2 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1219,7 +1219,8 @@ impl ParquetRecordBatchReaderBuilder { let mut cache_projection = predicate.projection().clone(); cache_projection.intersect(&projection); - let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size) + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .with_batch_size(batch_size) .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), predicate.projection())?; @@ -1227,7 +1228,8 @@ impl ParquetRecordBatchReaderBuilder { } } - let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size) + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .with_batch_size(batch_size) .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), &projection)?; @@ -1534,7 +1536,8 @@ impl ParquetRecordBatchReader { ) -> Result { // note metrics are not supported in this API let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(row_groups, &metrics, batch_size) + let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) + .with_batch_size(batch_size) .with_parquet_metadata(row_groups.metadata()) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 087900b62be2..2d9d1c9b6be8 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -95,6 +95,8 @@ impl OffsetBuffer { dict_offsets: &[V], dict_values: &[u8], ) -> Result<()> { + self.offsets.reserve(keys.len()); + for key in keys { let index = key.as_usize(); if index + 1 >= dict_offsets.len() { @@ -107,7 +109,11 @@ impl OffsetBuffer { let end_offset = dict_offsets[index + 1].as_usize(); // Dictionary values are verified when decoding dictionary page - self.try_push(&dict_values[start_offset..end_offset], false)?; + self.values + .extend_from_slice(&dict_values[start_offset..end_offset]); + let index_offset = I::from_usize(self.values.len()) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + self.offsets.push(index_offset); } Ok(()) } diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index e7cc6eb1fab7..b1bdeb64e5c0 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -51,15 +51,6 @@ impl ViewBuffer { block_id } - /// Directly append a view to the view array. - /// This is used when we create a StringViewArray from a dictionary whose values are StringViewArray. - /// - /// # Safety - /// The `view` must be a valid view as per the ByteView spec. - pub unsafe fn append_raw_view_unchecked(&mut self, view: u128) { - self.views.push(view); - } - /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: &ArrowType) -> ArrayRef { let len = self.views.len(); @@ -122,11 +113,9 @@ mod tests { let string_buffer = Buffer::from(data); let block_id = buffer.append_block(string_buffer); - unsafe { - buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 0)); - buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 1)); - buffer.append_raw_view_unchecked(make_view(&data[10..41], block_id, 10)); - } + buffer.views.push(make_view(&data[0..1], block_id, 0)); + buffer.views.push(make_view(&data[1..10], block_id, 1)); + buffer.views.push(make_view(&data[10..41], block_id, 10)); let array = buffer.into_array(None, &ArrowType::Utf8View); let string_array = array @@ -150,11 +139,9 @@ mod tests { let string_buffer = Buffer::from(data); let block_id = buffer.append_block(string_buffer); - unsafe { - buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 0)); - buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 1)); - buffer.append_raw_view_unchecked(make_view(&data[10..41], block_id, 10)); - } + buffer.views.push(make_view(&data[0..1], block_id, 0)); + buffer.views.push(make_view(&data[1..10], block_id, 1)); + buffer.views.push(make_view(&data[10..41], block_id, 10)); let valid = [true, false, false, true, false, false, true]; let valid_mask = Buffer::from_iter(valid.iter().copied()); diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 582e18b2a08b..befe5a034ad0 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -432,11 +432,11 @@ impl RowGroupReaderBuilder { let cache_options = filter_info.cache_builder().producer(); - let array_reader = - ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size) - .with_cache_options(Some(&cache_options)) - .with_parquet_metadata(&self.metadata) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_batch_size(self.batch_size) + .with_cache_options(Some(&cache_options)) + .with_parquet_metadata(&self.metadata) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; // Reset to original policy before each predicate so the override // can detect page skipping for THIS predicate's columns. @@ -609,9 +609,9 @@ impl RowGroupReaderBuilder { let plan = plan_builder.build(); // if we have any cached results, connect them up - let array_reader_builder = - ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size) - .with_parquet_metadata(&self.metadata); + let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_batch_size(self.batch_size) + .with_parquet_metadata(&self.metadata); let array_reader = if let Some(cache_info) = cache_info.as_ref() { let cache_options: CacheOptions = cache_info.builder().consumer(); array_reader_builder diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 745725bbf676..a33b489c62a7 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -270,6 +270,7 @@ mod tests { use arrow::buffer::Buffer; + use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use crate::basic::Encoding; use crate::data_type::Int32Type; use crate::schema::parser::parse_message_type; @@ -294,7 +295,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); // First page @@ -367,7 +368,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); // First page @@ -469,7 +470,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); // First page @@ -572,7 +573,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); { let values = [100; 5000]; @@ -622,7 +623,7 @@ mod tests { pb.add_values::(Encoding::PLAIN, &values); let page = pb.consume(); - let mut record_reader = RecordReader::::new(desc, 1024); + let mut record_reader = RecordReader::::new(desc, DEFAULT_BATCH_SIZE); let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()])); record_reader.set_page_reader(page_reader).unwrap(); assert_eq!(record_reader.read_records(4).unwrap(), 4); @@ -661,7 +662,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); // First page @@ -735,7 +736,7 @@ mod tests { .unwrap(); // Construct record reader - let mut record_reader = RecordReader::::new(desc.clone(), 1024); + let mut record_reader = RecordReader::::new(desc.clone(), DEFAULT_BATCH_SIZE); // First page