Skip to content

Commit 224798a

Browse files
committed
pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated.
1 parent e2b2b8f commit 224798a

File tree

12 files changed

+138
-16
lines changed

12 files changed

+138
-16
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub struct ArrayReaderBuilder<'a> {
9595
parquet_metadata: Option<&'a ParquetMetaData>,
9696
/// metrics
9797
metrics: &'a ArrowReaderMetrics,
98+
/// Batch size for pre-allocating internal buffers
99+
batch_size: Option<usize>,
98100
}
99101

100102
impl<'a> ArrayReaderBuilder<'a> {
@@ -104,6 +106,7 @@ impl<'a> ArrayReaderBuilder<'a> {
104106
cache_options: None,
105107
parquet_metadata: None,
106108
metrics,
109+
batch_size: None,
107110
}
108111
}
109112

@@ -119,6 +122,15 @@ impl<'a> ArrayReaderBuilder<'a> {
119122
self
120123
}
121124

125+
/// Set the batch size for pre-allocating internal buffers
126+
///
127+
/// This allows the reader to pre-allocate buffers with the expected capacity,
128+
/// avoiding reallocations when reading the first batch of data.
129+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
130+
self.batch_size = Some(batch_size);
131+
self
132+
}
133+
122134
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
123135
pub fn build_array_reader(
124136
&self,
@@ -410,18 +422,18 @@ impl<'a> ArrayReaderBuilder<'a> {
410422
)?) as _,
411423
PhysicalType::BYTE_ARRAY => match arrow_type {
412424
Some(DataType::Dictionary(_, _)) => {
413-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
425+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
414426
}
415427
Some(DataType::Utf8View | DataType::BinaryView) => {
416-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
428+
make_byte_view_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
417429
}
418-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
430+
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
419431
},
420432
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
421433
Some(DataType::Dictionary(_, _)) => {
422-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
434+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
423435
}
424-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
436+
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
425437
},
426438
};
427439
Ok(Some(reader))

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ use std::any::Any;
3838
use std::sync::Arc;
3939

4040
/// Returns an [`ArrayReader`] that decodes the provided byte array column
41+
///
42+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
43+
/// avoiding reallocations when reading the first batch of data.
4144
pub fn make_byte_array_reader(
4245
pages: Box<dyn PageIterator>,
4346
column_desc: ColumnDescPtr,
4447
arrow_type: Option<ArrowType>,
48+
batch_size: Option<usize>,
4549
) -> Result<Box<dyn ArrayReader>> {
4650
// Check if Arrow type is specified, else create it from Parquet type
4751
let data_type = match arrow_type {
@@ -56,13 +60,23 @@ pub fn make_byte_array_reader(
5660
| ArrowType::Utf8
5761
| ArrowType::Decimal128(_, _)
5862
| ArrowType::Decimal256(_, _) => {
59-
let reader = GenericRecordReader::new(column_desc);
63+
let reader = match batch_size {
64+
Some(capacity) => {
65+
GenericRecordReader::new_with_capacity(column_desc, capacity)
66+
}
67+
None => GenericRecordReader::new(column_desc),
68+
};
6069
Ok(Box::new(ByteArrayReader::<i32>::new(
6170
pages, data_type, reader,
6271
)))
6372
}
6473
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65-
let reader = GenericRecordReader::new(column_desc);
74+
let reader = match batch_size {
75+
Some(capacity) => {
76+
GenericRecordReader::new_with_capacity(column_desc, capacity)
77+
}
78+
None => GenericRecordReader::new(column_desc),
79+
};
6680
Ok(Box::new(ByteArrayReader::<i64>::new(
6781
pages, data_type, reader,
6882
)))

parquet/src/arrow/array_reader/byte_array_dictionary.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ use crate::util::bit_util::FromBytes;
4040
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
4141
macro_rules! make_reader {
4242
(
43-
($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
43+
($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) => match ($k:expr, $v:expr) {
4444
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
4545
}
4646
) => {
4747
match (($k, $v)) {
4848
$(
4949
($key_arrow, $value_arrow) => {
50-
let reader = GenericRecordReader::new($column_desc);
50+
let reader = match $batch_size {
51+
Some(capacity) => GenericRecordReader::new_with_capacity($column_desc, capacity),
52+
None => GenericRecordReader::new($column_desc),
53+
};
5154
Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
5255
$pages, $data_type, reader,
5356
)))
@@ -73,10 +76,13 @@ macro_rules! make_reader {
7376
/// It is therefore recommended that if `pages` contains data from multiple column chunks,
7477
/// that the read batch size used is a divisor of the row group size
7578
///
79+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
80+
/// avoiding reallocations when reading the first batch of data.
7681
pub fn make_byte_array_dictionary_reader(
7782
pages: Box<dyn PageIterator>,
7883
column_desc: ColumnDescPtr,
7984
arrow_type: Option<ArrowType>,
85+
batch_size: Option<usize>,
8086
) -> Result<Box<dyn ArrayReader>> {
8187
// Check if Arrow type is specified, else create it from Parquet type
8288
let data_type = match arrow_type {
@@ -89,7 +95,7 @@ pub fn make_byte_array_dictionary_reader(
8995
match &data_type {
9096
ArrowType::Dictionary(key_type, value_type) => {
9197
make_reader! {
92-
(pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
98+
(pages, column_desc, data_type, batch_size) => match (key_type.as_ref(), value_type.as_ref()) {
9399
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32),
94100
(ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
95101
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32),

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ use bytes::Bytes;
3636
use std::any::Any;
3737

3838
/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
39+
///
40+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
41+
/// avoiding reallocations when reading the first batch of data.
3942
pub fn make_byte_view_array_reader(
4043
pages: Box<dyn PageIterator>,
4144
column_desc: ColumnDescPtr,
4245
arrow_type: Option<ArrowType>,
46+
batch_size: Option<usize>,
4347
) -> Result<Box<dyn ArrayReader>> {
4448
// Check if Arrow type is specified, else create it from Parquet type
4549
let data_type = match arrow_type {
@@ -52,7 +56,10 @@ pub fn make_byte_view_array_reader(
5256

5357
match data_type {
5458
ArrowType::BinaryView | ArrowType::Utf8View => {
55-
let reader = GenericRecordReader::new(column_desc);
59+
let reader = match batch_size {
60+
Some(capacity) => GenericRecordReader::new_with_capacity(column_desc, capacity),
61+
None => GenericRecordReader::new(column_desc),
62+
};
5663
Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
5764
}
5865

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@ use std::ops::Range;
4040
use std::sync::Arc;
4141

4242
/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
43+
///
44+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
45+
/// avoiding reallocations when reading the first batch of data.
4346
pub fn make_fixed_len_byte_array_reader(
4447
pages: Box<dyn PageIterator>,
4548
column_desc: ColumnDescPtr,
4649
arrow_type: Option<ArrowType>,
50+
batch_size: Option<usize>,
4751
) -> Result<Box<dyn ArrayReader>> {
4852
// Check if Arrow type is specified, else create it from Parquet type
4953
let data_type = match arrow_type {
@@ -126,6 +130,7 @@ pub fn make_fixed_len_byte_array_reader(
126130
column_desc,
127131
data_type,
128132
byte_length,
133+
batch_size,
129134
)))
130135
}
131136

@@ -144,14 +149,19 @@ impl FixedLenByteArrayReader {
144149
column_desc: ColumnDescPtr,
145150
data_type: ArrowType,
146151
byte_length: usize,
152+
batch_size: Option<usize>,
147153
) -> Self {
154+
let record_reader = match batch_size {
155+
Some(capacity) => GenericRecordReader::new_with_capacity(column_desc, capacity),
156+
None => GenericRecordReader::new(column_desc),
157+
};
148158
Self {
149159
data_type,
150160
byte_length,
151161
pages,
152162
def_levels_buffer: None,
153163
rep_levels_buffer: None,
154-
record_reader: GenericRecordReader::new(column_desc),
164+
record_reader,
155165
}
156166
}
157167
}
@@ -284,6 +294,15 @@ fn move_values<F>(
284294
}
285295

286296
impl ValuesBuffer for FixedLenByteArrayBuffer {
297+
fn with_capacity(_capacity: usize) -> Self {
298+
// byte_length is not known at trait level, so we return a default buffer
299+
// The decoder will pre-allocate when it knows both capacity and byte_length
300+
Self {
301+
buffer: Vec::new(),
302+
byte_length: None,
303+
}
304+
}
305+
287306
fn pad_nulls(
288307
&mut self,
289308
read_offset: usize,

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
10661066

10671067
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
10681068
.with_parquet_metadata(&reader.metadata)
1069+
.with_batch_size(batch_size)
10691070
.build_array_reader(fields.as_deref(), predicate.projection())?;
10701071

10711072
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
@@ -1074,6 +1075,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
10741075

10751076
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
10761077
.with_parquet_metadata(&reader.metadata)
1078+
.with_batch_size(batch_size)
10771079
.build_array_reader(fields.as_deref(), &projection)?;
10781080

10791081
let read_plan = plan_builder
@@ -1381,6 +1383,7 @@ impl ParquetRecordBatchReader {
13811383
let metrics = ArrowReaderMetrics::disabled();
13821384
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
13831385
.with_parquet_metadata(row_groups.metadata())
1386+
.with_batch_size(batch_size)
13841387
.build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
13851388

13861389
let read_plan = ReadPlanBuilder::new(batch_size)

parquet/src/arrow/buffer/dictionary_buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
194194
}
195195

196196
impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197+
fn with_capacity(capacity: usize) -> Self {
198+
Self::Values {
199+
values: OffsetBuffer::with_capacity(capacity),
200+
}
201+
}
202+
197203
fn pad_nulls(
198204
&mut self,
199205
read_offset: usize,

parquet/src/arrow/buffer/offset_buffer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
4444
}
4545

4646
impl<I: OffsetSizeTrait> OffsetBuffer<I> {
47+
/// Create a new `OffsetBuffer` with capacity for at least `capacity` elements
48+
///
49+
/// Pre-allocates the offsets vector to avoid reallocations during reading.
50+
/// The values vector is not pre-allocated as its size is unpredictable.
51+
pub fn with_capacity(capacity: usize) -> Self {
52+
let mut offsets = Vec::with_capacity(capacity + 1);
53+
offsets.push(I::default());
54+
Self {
55+
offsets,
56+
values: Vec::new(),
57+
}
58+
}
59+
4760
/// Returns the number of byte arrays in this buffer
4861
pub fn len(&self) -> usize {
4962
self.offsets.len() - 1
@@ -139,6 +152,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
139152
}
140153

141154
impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
155+
fn with_capacity(capacity: usize) -> Self {
156+
Self::with_capacity(capacity)
157+
}
158+
142159
fn pad_nulls(
143160
&mut self,
144161
read_offset: usize,

parquet/src/arrow/buffer/view_buffer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ impl ViewBuffer {
105105
}
106106

107107
impl ValuesBuffer for ViewBuffer {
108+
fn with_capacity(capacity: usize) -> Self {
109+
Self::with_capacity(capacity)
110+
}
111+
108112
fn pad_nulls(
109113
&mut self,
110114
read_offset: usize,

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ impl RowGroupReaderBuilder {
435435
let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436436
.with_cache_options(Some(&cache_options))
437437
.with_parquet_metadata(&self.metadata)
438+
.with_batch_size(self.batch_size)
438439
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
439440

440441
plan_builder =
@@ -592,7 +593,8 @@ impl RowGroupReaderBuilder {
592593

593594
// if we have any cached results, connect them up
594595
let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
595-
.with_parquet_metadata(&self.metadata);
596+
.with_parquet_metadata(&self.metadata)
597+
.with_batch_size(self.batch_size);
596598
let array_reader = if let Some(cache_info) = cache_info.as_ref() {
597599
let cache_options = cache_info.builder().consumer();
598600
array_reader_builder

0 commit comments

Comments
 (0)