Skip to content

Commit 57a2d80

Browse files
committed
Add with_capacity to ValuesBuffer trait
1 parent e2b2b8f commit 57a2d80

File tree

6 files changed

+61
-2
lines changed

6 files changed

+61
-2
lines changed

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,15 @@ fn move_values<F>(
284284
}
285285

286286
impl ValuesBuffer for FixedLenByteArrayBuffer {
287+
fn with_capacity(_capacity: usize) -> Self {
288+
// byte_length is not known at trait level, so we return a default buffer
289+
// The decoder will pre-allocate when it knows both capacity and byte_length
290+
Self {
291+
buffer: Vec::new(),
292+
byte_length: None,
293+
}
294+
}
295+
287296
fn pad_nulls(
288297
&mut self,
289298
read_offset: usize,

parquet/src/arrow/buffer/dictionary_buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
194194
}
195195

196196
impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197+
fn with_capacity(capacity: usize) -> Self {
198+
Self::Values {
199+
values: OffsetBuffer::with_capacity(capacity),
200+
}
201+
}
202+
197203
fn pad_nulls(
198204
&mut self,
199205
read_offset: usize,

parquet/src/arrow/buffer/offset_buffer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
4444
}
4545

4646
impl<I: OffsetSizeTrait> OffsetBuffer<I> {
47+
/// Create a new `OffsetBuffer` with capacity for at least `capacity` elements
48+
///
49+
/// Pre-allocates the offsets vector to avoid reallocations during reading.
50+
/// The values vector is not pre-allocated as its size is unpredictable.
51+
pub fn with_capacity(capacity: usize) -> Self {
52+
let mut offsets = Vec::with_capacity(capacity + 1);
53+
offsets.push(I::default());
54+
Self {
55+
offsets,
56+
values: Vec::new(),
57+
}
58+
}
59+
4760
/// Returns the number of byte arrays in this buffer
4861
pub fn len(&self) -> usize {
4962
self.offsets.len() - 1
@@ -139,6 +152,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
139152
}
140153

141154
impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
155+
fn with_capacity(capacity: usize) -> Self {
156+
Self::with_capacity(capacity)
157+
}
158+
142159
fn pad_nulls(
143160
&mut self,
144161
read_offset: usize,

parquet/src/arrow/buffer/view_buffer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ impl ViewBuffer {
105105
}
106106

107107
impl ValuesBuffer for ViewBuffer {
108+
fn with_capacity(capacity: usize) -> Self {
109+
Self::with_capacity(capacity)
110+
}
111+
108112
fn pad_nulls(
109113
&mut self,
110114
read_offset: usize,

parquet/src/arrow/record_reader/buffer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev;
1919

2020
/// A buffer that supports padding with nulls
2121
pub trait ValuesBuffer: Default {
22+
/// Create a new buffer with capacity for at least `capacity` elements
23+
///
24+
/// This allows pre-allocating buffers to avoid reallocations during reading,
25+
/// improving performance when the number of values is known in advance.
26+
fn with_capacity(capacity: usize) -> Self;
27+
2228
/// If a column contains nulls, more level data may be read than value data, as null
2329
/// values are not encoded. Therefore, first the levels data is read, the null count
2430
/// determined, and then the corresponding number of values read to a [`ValuesBuffer`].
@@ -43,6 +49,10 @@ pub trait ValuesBuffer: Default {
4349
}
4450

4551
impl<T: Copy + Default> ValuesBuffer for Vec<T> {
52+
fn with_capacity(capacity: usize) -> Self {
53+
Vec::with_capacity(capacity)
54+
}
55+
4656
fn pad_nulls(
4757
&mut self,
4858
read_offset: usize,

parquet/src/arrow/record_reader/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub struct GenericRecordReader<V, CV> {
5858
num_values: usize,
5959
/// Number of buffered records
6060
num_records: usize,
61+
/// Capacity hint for pre-allocating buffers based on batch size
62+
capacity_hint: usize,
6163
}
6264

6365
impl<V, CV> GenericRecordReader<V, CV>
@@ -67,19 +69,23 @@ where
6769
{
6870
/// Create a new [`GenericRecordReader`]
6971
pub fn new(desc: ColumnDescPtr) -> Self {
72+
// Start with a reasonable default capacity to avoid zero reallocations on first batch
73+
const DEFAULT_CAPACITY: usize = 1024;
74+
7075
let def_levels = (desc.max_def_level() > 0)
7176
.then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
7277

7378
let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
7479

7580
Self {
76-
values: V::default(),
81+
values: V::with_capacity(DEFAULT_CAPACITY),
7782
def_levels,
7883
rep_levels,
7984
column_reader: None,
8085
column_desc: desc,
8186
num_values: 0,
8287
num_records: 0,
88+
capacity_hint: DEFAULT_CAPACITY,
8389
}
8490
}
8591

@@ -169,7 +175,9 @@ where
169175
/// Returns currently stored buffer data.
170176
/// The side effect is similar to `consume_def_levels`.
171177
pub fn consume_record_data(&mut self) -> V {
172-
std::mem::take(&mut self.values)
178+
// Replace the buffer with a new one that has the same capacity
179+
// This avoids reallocations on subsequent batches
180+
std::mem::replace(&mut self.values, V::with_capacity(self.capacity_hint))
173181
}
174182

175183
/// Returns currently stored null bitmap data for nullable columns.
@@ -208,6 +216,11 @@ where
208216

209217
/// Try to read one batch of data returning the number of records read
210218
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
219+
// Update capacity hint to the largest batch size seen
220+
if batch_size > self.capacity_hint {
221+
self.capacity_hint = batch_size;
222+
}
223+
211224
let (records_read, values_read, levels_read) =
212225
self.column_reader.as_mut().unwrap().read_records(
213226
batch_size,

0 commit comments

Comments
 (0)