Skip to content

Commit 3d7ba2d

Browse files
lyang24claude
andcommitted
Optimize dict decode and view writes
- Optimize OffsetBuffer dict decode by inlining extend + offset push - Use raw ptr writes in ByteViewArray dictionary decoder for better perf - Remove unused append_raw_view_unchecked from ViewBuffer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent dd4884c commit 3d7ba2d

File tree

10 files changed

+101
-82
lines changed

10 files changed

+101
-82
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::arrow::array_reader::{
3333
NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader,
3434
make_byte_array_dictionary_reader, make_byte_array_reader,
3535
};
36+
use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
3637
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
3738
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
3839
use crate::basic::Type as PhysicalType;
@@ -102,23 +103,24 @@ pub struct ArrayReaderBuilder<'a> {
102103

103104
impl<'a> ArrayReaderBuilder<'a> {
104105
/// Create a new `ArrayReaderBuilder`
105-
///
106-
/// `batch_size` is used to pre-allocate internal buffers with the expected capacity,
107-
/// avoiding reallocations when reading the first batch of data.
108-
pub fn new(
109-
row_groups: &'a dyn RowGroups,
110-
metrics: &'a ArrowReaderMetrics,
111-
batch_size: usize,
112-
) -> Self {
106+
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
113107
Self {
114108
row_groups,
115109
cache_options: None,
116110
parquet_metadata: None,
117111
metrics,
118-
batch_size,
112+
batch_size: DEFAULT_BATCH_SIZE,
119113
}
120114
}
121115

116+
/// Set the batch size used to pre-allocate internal buffers.
117+
///
118+
/// This avoids reallocations when reading the first batch of data.
119+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
120+
self.batch_size = batch_size;
121+
self
122+
}
123+
122124
/// Add cache options to the builder
123125
pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
124126
self.cache_options = cache_options;
@@ -566,7 +568,8 @@ mod tests {
566568
.unwrap();
567569

568570
let metrics = ArrowReaderMetrics::disabled();
569-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
571+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
572+
.with_batch_size(DEFAULT_BATCH_SIZE)
570573
.build_array_reader(fields.as_ref(), &mask)
571574
.unwrap();
572575

@@ -599,7 +602,8 @@ mod tests {
599602
.unwrap();
600603

601604
let metrics = ArrowReaderMetrics::disabled();
602-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
605+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
606+
.with_batch_size(DEFAULT_BATCH_SIZE)
603607
.with_parquet_metadata(file_reader.metadata())
604608
.build_array_reader(fields.as_ref(), &mask)
605609
.unwrap();

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -485,24 +485,28 @@ impl ByteArrayDecoderDeltaLength {
485485
let initial_values_length = output.values.len();
486486

487487
let to_read = len.min(self.lengths.len() - self.length_offset);
488-
output.offsets.reserve(to_read);
489-
490488
let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
491-
492489
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
493-
output.values.reserve(total_bytes);
494490

495-
let mut current_offset = self.data_offset;
496-
for length in src_lengths {
497-
let end_offset = current_offset + *length as usize;
498-
output.try_push(
499-
&self.data.as_ref()[current_offset..end_offset],
500-
self.validate_utf8,
501-
)?;
502-
current_offset = end_offset;
503-
}
491+
// Reserve capacity for both offsets and values upfront
492+
output.offsets.reserve(to_read);
493+
output.values.reserve(total_bytes);
504494

505-
self.data_offset = current_offset;
495+
// Delta length data is contiguous — copy all value bytes at once
496+
let data_end = self.data_offset + total_bytes;
497+
output
498+
.values
499+
.extend_from_slice(&self.data.as_ref()[self.data_offset..data_end]);
500+
501+
// Compute and extend offsets in batch using extend
502+
let base_offset = initial_values_length;
503+
let mut running = base_offset;
504+
output.offsets.extend(src_lengths.iter().map(|length| {
505+
running += *length as usize;
506+
I::from_usize(running).expect("index overflow decoding byte array")
507+
}));
508+
509+
self.data_offset = data_end;
506510
self.length_offset += to_read;
507511

508512
if self.validate_utf8 {

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -675,13 +675,20 @@ impl ByteViewArrayDecoderDelta {
675675
// <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
676676

677677
fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
678-
output.views.reserve(len.min(self.decoder.remaining()));
678+
let to_reserve = len.min(self.decoder.remaining());
679+
output.views.reserve(to_reserve);
679680

680681
// array buffer only have long strings
681682
let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
682683

683684
let buffer_id = output.buffers.len() as u32;
684685

686+
// Use unsafe ptr writes instead of per-element push to avoid
687+
// repeated length checks. Safety: we reserved enough space above.
688+
let views_ptr = output.views.as_mut_ptr();
689+
let initial_len = output.views.len();
690+
let mut write_count = 0;
691+
685692
let read = if !self.validate_utf8 {
686693
self.decoder.read(len, |bytes| {
687694
let offset = array_buffer.len();
@@ -691,18 +698,18 @@ impl ByteViewArrayDecoderDelta {
691698
array_buffer.extend_from_slice(bytes);
692699
}
693700

694-
// # Safety
695-
// The buffer_id is the last buffer in the output buffers
696-
// The offset is calculated from the buffer, so it is valid
701+
// Safety: views_ptr is valid for writes, we reserved enough space,
702+
// and write_count < to_reserve.
697703
unsafe {
698-
output.append_raw_view_unchecked(view);
704+
views_ptr.add(initial_len + write_count).write(view);
699705
}
706+
write_count += 1;
700707
Ok(())
701708
})?
702709
} else {
703710
// utf8 validation buffer has only short strings. These short
704711
// strings are inlined into the views but we copy them into a
705-
// contiguous buffer to accelerate validation.®
712+
// contiguous buffer to accelerate validation.
706713
let mut utf8_validation_buffer = Vec::with_capacity(4096);
707714

708715
let v = self.decoder.read(len, |bytes| {
@@ -715,20 +722,24 @@ impl ByteViewArrayDecoderDelta {
715722
utf8_validation_buffer.extend_from_slice(bytes);
716723
}
717724

718-
// # Safety
719-
// The buffer_id is the last buffer in the output buffers
720-
// The offset is calculated from the buffer, so it is valid
721-
// Utf-8 validation is done later
725+
// Safety: views_ptr is valid for writes, we reserved enough space,
726+
// and write_count < to_reserve. Utf-8 validation is done later.
722727
unsafe {
723-
output.append_raw_view_unchecked(view);
728+
views_ptr.add(initial_len + write_count).write(view);
724729
}
730+
write_count += 1;
725731
Ok(())
726732
})?;
727733
check_valid_utf8(&array_buffer)?;
728734
check_valid_utf8(&utf8_validation_buffer)?;
729735
v
730736
};
731737

738+
// Safety: we wrote exactly `read` views via ptr writes above
739+
unsafe {
740+
output.views.set_len(initial_len + read);
741+
}
742+
732743
let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
733744
assert_eq!(actual_block_id, buffer_id);
734745
Ok(read)

parquet/src/arrow/array_reader/list_array.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ mod tests {
249249
use crate::arrow::array_reader::ArrayReaderBuilder;
250250
use crate::arrow::array_reader::list_array::ListArrayReader;
251251
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
252+
use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
252253
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
253254
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
254255
use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema};
@@ -566,7 +567,8 @@ mod tests {
566567
.unwrap();
567568

568569
let metrics = ArrowReaderMetrics::disabled();
569-
let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
570+
let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
571+
.with_batch_size(DEFAULT_BATCH_SIZE)
570572
.build_array_reader(fields.as_ref(), &mask)
571573
.unwrap();
572574

parquet/src/arrow/array_reader/primitive_array.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: ArrowPrimitiveType>(
439439
mod tests {
440440
use super::*;
441441
use crate::arrow::array_reader::test_util::EmptyPageIterator;
442+
use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
442443
use crate::basic::Encoding;
443444
use crate::column::page::Page;
444445
use crate::data_type::{Int32Type, Int64Type};
@@ -513,7 +514,7 @@ mod tests {
513514
Box::<EmptyPageIterator>::default(),
514515
schema.column(0),
515516
None,
516-
1024,
517+
DEFAULT_BATCH_SIZE,
517518
)
518519
.unwrap();
519520

@@ -560,7 +561,7 @@ mod tests {
560561
Box::new(page_iterator),
561562
column_desc,
562563
None,
563-
1024,
564+
DEFAULT_BATCH_SIZE,
564565
)
565566
.unwrap();
566567

@@ -631,7 +632,7 @@ mod tests {
631632
Box::new(page_iterator),
632633
column_desc.clone(),
633634
None,
634-
1024,
635+
DEFAULT_BATCH_SIZE,
635636
)
636637
.expect("Unable to get array reader");
637638

@@ -771,7 +772,7 @@ mod tests {
771772
Box::new(page_iterator),
772773
column_desc,
773774
None,
774-
1024,
775+
DEFAULT_BATCH_SIZE,
775776
)
776777
.unwrap();
777778

@@ -851,7 +852,7 @@ mod tests {
851852
Box::new(page_iterator),
852853
column_desc,
853854
None,
854-
1024,
855+
DEFAULT_BATCH_SIZE,
855856
)
856857
.unwrap();
857858

@@ -914,7 +915,7 @@ mod tests {
914915
Box::new(page_iterator),
915916
column_desc,
916917
None,
917-
1024,
918+
DEFAULT_BATCH_SIZE,
918919
)
919920
.unwrap();
920921

@@ -980,7 +981,7 @@ mod tests {
980981
Box::new(page_iterator),
981982
column_desc,
982983
None,
983-
1024,
984+
DEFAULT_BATCH_SIZE,
984985
)
985986
.unwrap();
986987

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,15 +1219,17 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
12191219
let mut cache_projection = predicate.projection().clone();
12201220
cache_projection.intersect(&projection);
12211221

1222-
let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size)
1222+
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1223+
.with_batch_size(batch_size)
12231224
.with_parquet_metadata(&reader.metadata)
12241225
.build_array_reader(fields.as_deref(), predicate.projection())?;
12251226

12261227
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
12271228
}
12281229
}
12291230

1230-
let array_reader = ArrayReaderBuilder::new(&reader, &metrics, batch_size)
1231+
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1232+
.with_batch_size(batch_size)
12311233
.with_parquet_metadata(&reader.metadata)
12321234
.build_array_reader(fields.as_deref(), &projection)?;
12331235

@@ -1534,7 +1536,8 @@ impl ParquetRecordBatchReader {
15341536
) -> Result<Self> {
15351537
// note metrics are not supported in this API
15361538
let metrics = ArrowReaderMetrics::disabled();
1537-
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics, batch_size)
1539+
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1540+
.with_batch_size(batch_size)
15381541
.with_parquet_metadata(row_groups.metadata())
15391542
.build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
15401543

parquet/src/arrow/buffer/offset_buffer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
9595
dict_offsets: &[V],
9696
dict_values: &[u8],
9797
) -> Result<()> {
98+
self.offsets.reserve(keys.len());
99+
98100
for key in keys {
99101
let index = key.as_usize();
100102
if index + 1 >= dict_offsets.len() {
@@ -107,7 +109,11 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
107109
let end_offset = dict_offsets[index + 1].as_usize();
108110

109111
// Dictionary values are verified when decoding dictionary page
110-
self.try_push(&dict_values[start_offset..end_offset], false)?;
112+
self.values
113+
.extend_from_slice(&dict_values[start_offset..end_offset]);
114+
let index_offset = I::from_usize(self.values.len())
115+
.ok_or_else(|| general_err!("index overflow decoding byte array"))?;
116+
self.offsets.push(index_offset);
111117
}
112118
Ok(())
113119
}

parquet/src/arrow/buffer/view_buffer.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,6 @@ impl ViewBuffer {
5151
block_id
5252
}
5353

54-
/// Directly append a view to the view array.
55-
/// This is used when we create a StringViewArray from a dictionary whose values are StringViewArray.
56-
///
57-
/// # Safety
58-
/// The `view` must be a valid view as per the ByteView spec.
59-
pub unsafe fn append_raw_view_unchecked(&mut self, view: u128) {
60-
self.views.push(view);
61-
}
62-
6354
/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
6455
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: &ArrowType) -> ArrayRef {
6556
let len = self.views.len();
@@ -122,11 +113,9 @@ mod tests {
122113
let string_buffer = Buffer::from(data);
123114
let block_id = buffer.append_block(string_buffer);
124115

125-
unsafe {
126-
buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 0));
127-
buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 1));
128-
buffer.append_raw_view_unchecked(make_view(&data[10..41], block_id, 10));
129-
}
116+
buffer.views.push(make_view(&data[0..1], block_id, 0));
117+
buffer.views.push(make_view(&data[1..10], block_id, 1));
118+
buffer.views.push(make_view(&data[10..41], block_id, 10));
130119

131120
let array = buffer.into_array(None, &ArrowType::Utf8View);
132121
let string_array = array
@@ -150,11 +139,9 @@ mod tests {
150139
let string_buffer = Buffer::from(data);
151140
let block_id = buffer.append_block(string_buffer);
152141

153-
unsafe {
154-
buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 0));
155-
buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 1));
156-
buffer.append_raw_view_unchecked(make_view(&data[10..41], block_id, 10));
157-
}
142+
buffer.views.push(make_view(&data[0..1], block_id, 0));
143+
buffer.views.push(make_view(&data[1..10], block_id, 1));
144+
buffer.views.push(make_view(&data[10..41], block_id, 10));
158145

159146
let valid = [true, false, false, true, false, false, true];
160147
let valid_mask = Buffer::from_iter(valid.iter().copied());

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -432,11 +432,11 @@ impl RowGroupReaderBuilder {
432432

433433
let cache_options = filter_info.cache_builder().producer();
434434

435-
let array_reader =
436-
ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size)
437-
.with_cache_options(Some(&cache_options))
438-
.with_parquet_metadata(&self.metadata)
439-
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
435+
let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436+
.with_batch_size(self.batch_size)
437+
.with_cache_options(Some(&cache_options))
438+
.with_parquet_metadata(&self.metadata)
439+
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
440440

441441
// Reset to original policy before each predicate so the override
442442
// can detect page skipping for THIS predicate's columns.
@@ -609,9 +609,9 @@ impl RowGroupReaderBuilder {
609609
let plan = plan_builder.build();
610610

611611
// if we have any cached results, connect them up
612-
let array_reader_builder =
613-
ArrayReaderBuilder::new(&row_group, &self.metrics, self.batch_size)
614-
.with_parquet_metadata(&self.metadata);
612+
let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
613+
.with_batch_size(self.batch_size)
614+
.with_parquet_metadata(&self.metadata);
615615
let array_reader = if let Some(cache_info) = cache_info.as_ref() {
616616
let cache_options: CacheOptions = cache_info.builder().consumer();
617617
array_reader_builder

0 commit comments

Comments
 (0)