Skip to content

Commit 504035a

Browse files
committed
feat(parquet): make PushBuffers boundary-agnostic for prefetch IO
The `PushDecoder` (introduced in #7997, #8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with a `NeedsData`/`push_range` protocol. However, it requires each logical read to be satisfied in full by a single physical buffer: `has_range`, `get_bytes`, and `Read::read` all searched for one buffer that entirely covered the requested range. This assumption conflates two orthogonal IO strategies: - Coalescing: the IO layer merges adjacent requested ranges into fewer, larger fetches. - Prefetching: the IO layer pushes data ahead of what the decoder has requested. This is an inversion of control: the IO layer speculatively fills buffers at offsets not yet requested and for arbitrary buffer sizes. These two strategies interact poorly with the current release mechanism (`clear_ranges`), which matches buffers by exact range equality: - Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and `clear_ranges` performs an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, so `clear_ranges` silently skips it: the buffer leaks in `PushBuffers` until the decoder finishes or the caller manually calls `release_all_ranges` (#9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group. - Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and `clear_ranges` cannot release them. This commit makes `PushBuffers` boundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count: - Buffer stitching: `has_range`, `get_bytes`, and `Read::read` resolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads. - Incremental release (`release_through`): replaces `clear_ranges` with a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copy `Bytes::slice`. The decoder calls this automatically at row-group boundaries. Benchmark results (vs baseline): push_decoder/1buf/1000ranges 321.9 µs (was 323.5 µs, −1%) push_decoder/1buf/10000ranges 3.26 ms (was 3.25 ms, +0%) push_decoder/1buf/100000ranges 34.9 ms (was 34.6 ms, +1%) push_decoder/1buf/500000ranges 192.2 ms (was 185.3 ms, +4%) push_decoder/Nbuf/1000ranges 363.9 µs (was 437.2 µs, −17%) push_decoder/Nbuf/10000ranges 3.82 ms (was 10.7 ms, −64%) push_decoder/Nbuf/100000ranges 42.1 ms (was 711.6 ms, −94%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 711fac8 commit 504035a

File tree

7 files changed

+735
-138
lines changed

7 files changed

+735
-138
lines changed

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -326,22 +326,25 @@ impl ParquetPushDecoder {
326326
Ok(decode_result)
327327
}
328328

329-
/// Push data into the decoder for processing
329+
/// Push data into the decoder for processing.
330330
///
331331
/// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
332-
/// single range of data.
333-
///
334-
/// Note this can be the entire file or just a part of it. If it is part of the file,
335-
/// the ranges should correspond to the data ranges requested by the decoder.
336-
///
337-
/// See example in [`ParquetPushDecoderBuilder`]
332+
/// single range of data. See [`Self::push_ranges`] for details.
338333
pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
339334
self.push_ranges(vec![range], vec![data])
340335
}
341336

342-
/// Push data into the decoder for processing
337+
/// Push data into the decoder for processing.
343338
///
344-
/// This should correspond to the data ranges requested by the decoder
339+
/// Each `(range, data)` pair associates a byte range in the Parquet file
340+
/// with its contents. The pushed buffers do not need to align with the
341+
/// ranges requested by [`DecodeResult::NeedsData`]: they may be smaller
342+
/// (the decoder stitches adjacent buffers), larger (e.g. coalesced
343+
/// fetches), or even cover offsets not yet requested (prefetch).
344+
///
345+
/// The only requirement is that, by the time [`Self::try_decode`] is
346+
/// called, the union of all pushed ranges must cover every byte the
347+
/// decoder requeted for the current decode step.
345348
pub fn push_ranges(
346349
&mut self,
347350
ranges: Vec<Range<u64>>,
@@ -366,13 +369,31 @@ impl ParquetPushDecoder {
366369
self.state.buffered_bytes()
367370
}
368371

369-
/// Clear any staged byte ranges currently buffered for future decode work.
372+
/// Release all staged byte ranges currently buffered for future decode
373+
/// work.
370374
///
371-
/// This clears byte ranges still owned by the decoder's internal
375+
/// This releases byte ranges still owned by the decoder's internal
372376
/// `PushBuffers`. It does not affect any data that has already been handed
373377
/// off to an active [`ParquetRecordBatchReader`].
378+
pub fn release_all(&mut self) {
379+
self.state.release_all();
380+
}
381+
382+
/// Use [`Self::release_all`] instead.
383+
#[deprecated(since = "58.1.0", note = "Use `release_all` instead")]
374384
pub fn clear_all_ranges(&mut self) {
375-
self.state.clear_all_ranges();
385+
self.release_all();
386+
}
387+
388+
/// Release all physical buffers that end at or before the given byte offset.
389+
///
390+
/// A buffer straddling the offset is trimmed: the portion before `offset`
391+
/// is dropped and the suffix is retained (zero-copy via [`Bytes::slice`]).
392+
///
393+
/// This does not affect any data that has already been handed off to an
394+
/// active [`ParquetRecordBatchReader`].
395+
pub fn release_through(&mut self, offset: u64) {
396+
self.state.release_through(offset);
376397
}
377398
}
378399

@@ -583,16 +604,28 @@ impl ParquetDecoderState {
583604
}
584605
}
585606

586-
/// Clear any staged ranges currently buffered in the decoder.
587-
fn clear_all_ranges(&mut self) {
607+
fn release_all(&mut self) {
588608
match self {
589609
ParquetDecoderState::ReadingRowGroup {
590610
remaining_row_groups,
591-
} => remaining_row_groups.clear_all_ranges(),
611+
} => remaining_row_groups.release_all(),
592612
ParquetDecoderState::DecodingRowGroup {
593613
record_batch_reader: _,
594614
remaining_row_groups,
595-
} => remaining_row_groups.clear_all_ranges(),
615+
} => remaining_row_groups.release_all(),
616+
ParquetDecoderState::Finished => {}
617+
}
618+
}
619+
620+
fn release_through(&mut self, offset: u64) {
621+
match self {
622+
ParquetDecoderState::ReadingRowGroup {
623+
remaining_row_groups,
624+
} => remaining_row_groups.release_through(offset),
625+
ParquetDecoderState::DecodingRowGroup {
626+
record_batch_reader: _,
627+
remaining_row_groups,
628+
} => remaining_row_groups.release_through(offset),
596629
ParquetDecoderState::Finished => {}
597630
}
598631
}
@@ -691,8 +724,9 @@ mod test {
691724
/// Releasing staged ranges should free speculative buffers without affecting
692725
/// the active row group reader.
693726
#[test]
694-
fn test_decoder_clear_all_ranges() {
695-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
727+
fn test_decoder_release_all() {
728+
let metadata = test_file_parquet_metadata();
729+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
696730
.unwrap()
697731
.with_batch_size(100)
698732
.build()
@@ -703,14 +737,16 @@ mod test {
703737
.unwrap();
704738
assert_eq!(decoder.buffered_bytes(), test_file_len());
705739

706-
// The current row group reader is built from the prefetched bytes, but
707-
// the speculative full-file range remains staged in the decoder.
740+
// Building the InMemoryRowGroup for row group 0 releases buffers up
741+
// to that row group's end offset. The remainder (row group 1 + footer)
742+
// is still staged.
708743
let batch1 = expect_data(decoder.try_decode());
709744
assert_eq!(batch1, TEST_BATCH.slice(0, 100));
710-
assert_eq!(decoder.buffered_bytes(), test_file_len());
745+
let rg0_end = metadata.row_group(0).end_offset();
746+
assert_eq!(decoder.buffered_bytes(), test_file_len() - rg0_end);
711747

712-
// All of the buffer is released
713-
decoder.clear_all_ranges();
748+
// Release everything that remains.
749+
decoder.release_all();
714750
assert_eq!(decoder.buffered_bytes(), 0);
715751

716752
// The active reader still owns the current row group's bytes, so it can
@@ -1167,6 +1203,72 @@ mod test {
11671203
expect_finished(decoder.try_decode());
11681204
}
11691205

1206+
/// Decode the file pushed as fixed-size streaming parts, simulating a
1207+
/// single GET request that yields part-sized buffers. Part boundaries are
1208+
/// intentionally misaligned with column chunk / page boundaries.
1209+
#[test]
1210+
fn test_decoder_streaming_parts() {
1211+
let part_size = 512usize; // misaligned with column chunks
1212+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1213+
.unwrap()
1214+
.build()
1215+
.unwrap();
1216+
1217+
// Push the entire file as fixed-size parts.
1218+
let file_len = TEST_FILE_DATA.len();
1219+
let mut offset = 0usize;
1220+
while offset < file_len {
1221+
let end = (offset + part_size).min(file_len);
1222+
let range = (offset as u64)..(end as u64);
1223+
let data = TEST_FILE_DATA.slice(offset..end);
1224+
decoder.push_range(range, data).unwrap();
1225+
offset = end;
1226+
}
1227+
1228+
// Decode all row groups — stitching should handle cross-part reads.
1229+
let batch1 = expect_data(decoder.try_decode());
1230+
let batch2 = expect_data(decoder.try_decode());
1231+
expect_finished(decoder.try_decode());
1232+
1233+
let all_output = concat_batches(&TEST_BATCH.schema(), &[batch1, batch2]).unwrap();
1234+
assert_eq!(all_output, *TEST_BATCH);
1235+
}
1236+
1237+
/// Push the entire file, decode the first row group, call `release_through`
1238+
/// to free its buffers, then decode the second row group.
1239+
#[test]
1240+
fn test_decoder_release_through() {
1241+
let metadata = test_file_parquet_metadata();
1242+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
1243+
.unwrap()
1244+
.build()
1245+
.unwrap();
1246+
1247+
decoder
1248+
.push_range(test_file_range(), TEST_FILE_DATA.clone())
1249+
.unwrap();
1250+
assert_eq!(decoder.buffered_bytes(), test_file_len());
1251+
1252+
// Decode first row group.
1253+
let batch1 = expect_data(decoder.try_decode());
1254+
assert_eq!(batch1, TEST_BATCH.slice(0, 200));
1255+
1256+
// Free everything up to the end of row group 0.
1257+
let rg0_end = metadata.row_group(0).end_offset();
1258+
decoder.release_through(rg0_end);
1259+
let remaining = decoder.buffered_bytes();
1260+
assert!(
1261+
remaining < test_file_len(),
1262+
"buffered_bytes should have decreased: {remaining} < {}",
1263+
test_file_len()
1264+
);
1265+
1266+
// Second row group should still be decodable.
1267+
let batch2 = expect_data(decoder.try_decode());
1268+
assert_eq!(batch2, TEST_BATCH.slice(200, 200));
1269+
expect_finished(decoder.try_decode());
1270+
}
1271+
11701272
/// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
11711273
///
11721274
/// Note c is a different types (so the data page sizes will be different)

parquet/src/arrow/push_decoder/reader_builder/data.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRo
2323
use crate::errors::ParquetError;
2424
use crate::file::metadata::ParquetMetaData;
2525
use crate::file::page_index::offset_index::OffsetIndexMetaData;
26-
use crate::file::reader::ChunkReader;
2726
use crate::util::push_buffers::PushBuffers;
2827
use bytes::Bytes;
2928
use std::ops::Range;
@@ -55,7 +54,7 @@ impl DataRequest {
5554
}
5655

5756
/// Returns the chunks from the buffers that satisfy this request
58-
fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
57+
fn get_chunks(&self, buffers: &mut PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
5958
self.ranges
6059
.iter()
6160
.map(|range| {
@@ -72,10 +71,12 @@ impl DataRequest {
7271
.collect()
7372
}
7473

75-
/// Create a new InMemoryRowGroup, and fill it with provided data
74+
/// Create a new InMemoryRowGroup, and fill it with provided data.
7675
///
77-
/// Assumes that all needed data is present in the buffers
78-
/// and clears any explicitly requested ranges
76+
/// Assumes that all needed data is present in the buffers.
77+
/// Does **not** release any buffers — the caller is responsible for
78+
/// calling `PushBuffers::release_through` at the appropriate time
79+
/// (typically after all phases for a row group are complete).
7980
pub fn try_into_in_memory_row_group<'a>(
8081
self,
8182
row_group_idx: usize,
@@ -88,7 +89,7 @@ impl DataRequest {
8889

8990
let Self {
9091
column_chunks,
91-
ranges,
92+
ranges: _,
9293
page_start_offsets,
9394
} = self;
9495

@@ -105,9 +106,6 @@ impl DataRequest {
105106

106107
in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks);
107108

108-
// Clear the ranges that were explicitly requested
109-
buffers.clear_ranges(&ranges);
110-
111109
Ok(in_memory_row_group)
112110
}
113111
}

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,15 @@ impl RowGroupReaderBuilder {
212212
self.buffers.buffered_bytes()
213213
}
214214

215-
/// Clear any staged ranges currently buffered for future decode work.
216-
pub fn clear_all_ranges(&mut self) {
217-
self.buffers.clear_all_ranges();
215+
/// Release all staged ranges currently buffered for future decode work.
216+
pub fn release_all(&mut self) {
217+
self.buffers.release_all();
218+
}
219+
220+
/// Release all physical buffers that end at or before `offset`.
221+
/// A straddling buffer is trimmed via zero-copy [`Bytes::slice`].
222+
pub fn release_through(&mut self, offset: u64) {
223+
self.buffers.release_through(offset);
218224
}
219225

220226
/// take the current state, leaving None in its place.
@@ -269,6 +275,7 @@ impl RowGroupReaderBuilder {
269275
pub(crate) fn try_build(
270276
&mut self,
271277
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
278+
self.buffers.ensure_sorted();
272279
loop {
273280
let current_state = self.take_state()?;
274281
// Try to transition the decoder.
@@ -610,6 +617,12 @@ impl RowGroupReaderBuilder {
610617
&mut self.buffers,
611618
)?;
612619

620+
// All data for this row group has been extracted into the
621+
// InMemoryRowGroup. Release physical buffers up to the end
622+
// of this row group so streaming IO can reclaim memory.
623+
self.buffers
624+
.release_through(self.metadata.row_group(row_group_idx).end_offset());
625+
613626
let plan = plan_builder.build();
614627

615628
// if we have any cached results, connect them up

parquet/src/arrow/push_decoder/remaining.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,15 @@ impl RemainingRowGroups {
7070
self.row_group_reader_builder.buffered_bytes()
7171
}
7272

73-
/// Clear any staged ranges currently buffered for future decode work
74-
pub fn clear_all_ranges(&mut self) {
75-
self.row_group_reader_builder.clear_all_ranges();
73+
/// Release all staged ranges currently buffered for future decode work.
74+
pub fn release_all(&mut self) {
75+
self.row_group_reader_builder.release_all();
76+
}
77+
78+
/// Release all physical buffers that end at or before `offset`.
79+
/// A straddling buffer is trimmed via zero-copy [`Bytes::slice`].
80+
pub fn release_through(&mut self, offset: u64) {
81+
self.row_group_reader_builder.release_through(offset);
7682
}
7783

7884
/// returns [`ParquetRecordBatchReader`] suitable for reading the next

parquet/src/file/metadata/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,21 @@ impl RowGroupMetaData {
713713
self.file_offset
714714
}
715715

716+
/// Returns the byte offset just past the last column chunk in this row group.
717+
///
718+
/// This is the maximum of `(start + length)` across all column chunks, which
719+
/// represents the first byte that is *not* part of this row group's data.
720+
pub fn end_offset(&self) -> u64 {
721+
self.columns
722+
.iter()
723+
.map(|c| {
724+
let (start, len) = c.byte_range();
725+
start + len
726+
})
727+
.max()
728+
.unwrap_or(0)
729+
}
730+
716731
/// Converts this [`RowGroupMetaData`] into a [`RowGroupMetaDataBuilder`]
717732
pub fn into_builder(self) -> RowGroupMetaDataBuilder {
718733
RowGroupMetaDataBuilder(self)

parquet/src/file/metadata/push_decoder.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::file::FOOTER_SIZE;
2323
use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
2424
use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
2525
use crate::file::page_index::index_reader::acc_range;
26-
use crate::file::reader::ChunkReader;
2726
use bytes::Bytes;
2827
use std::ops::Range;
2928
use std::sync::Arc;
@@ -360,12 +359,13 @@ impl ParquetMetaDataPushDecoder {
360359

361360
/// Clear any staged byte ranges currently buffered for future decode work.
362361
pub fn clear_all_ranges(&mut self) {
363-
self.buffers.clear_all_ranges();
362+
self.buffers.release_all();
364363
}
365364

366365
/// Try to decode the metadata from the pushed data, returning the
367366
/// decoded metadata or an error if not enough data is available.
368367
pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
368+
self.buffers.ensure_sorted();
369369
let file_len = self.buffers.file_len();
370370
let footer_len = FOOTER_SIZE as u64;
371371
loop {
@@ -397,10 +397,10 @@ impl ParquetMetaDataPushDecoder {
397397
return Ok(needs_range(metadata_range));
398398
}
399399

400-
let metadata = self.metadata_parser.decode_metadata(
401-
&self.get_bytes(&metadata_range)?,
402-
footer_tail.is_encrypted_footer(),
403-
)?;
400+
let metadata_bytes = self.get_bytes(&metadata_range)?;
401+
let metadata = self
402+
.metadata_parser
403+
.decode_metadata(&metadata_bytes, footer_tail.is_encrypted_footer())?;
404404
// Note: ReadingPageIndex first checks if page indexes are needed
405405
// and is a no-op if not
406406
self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
@@ -445,7 +445,7 @@ impl ParquetMetaDataPushDecoder {
445445
}
446446

447447
/// Returns the bytes for the given range from the internal buffer
448-
fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
448+
fn get_bytes(&mut self, range: &Range<u64>) -> Result<Bytes> {
449449
let start = range.start;
450450
let raw_len = range.end - range.start;
451451
let len: usize = raw_len.try_into().map_err(|_| {

0 commit comments

Comments
 (0)