Skip to content

Commit 871700c

Browse files
committed
feat(parquet): add retention filter to discard unrequested prefetch data
Per-range release guarantees that every consumed row group is freed. But when the IO layer prefetches aggressively (eg. streaming entire files or over-reading across row group boundaries) data for row groups the decoder will never process enters `PushBuffers` and is never consumed, so `release_range` is never called for it. Add `RetentionSet`, a sorted set of byte ranges derived from column chunk metadata for the queued row groups. Incoming buffers are filtered at push time: only portions overlapping a retained range are stored (zero-copy via `Bytes::slice`); everything else is discarded before reaching `PushBuffers`. Together with the per-range release in the previous commit, this closes the loop on memory management: the IO layer is free to push data in any shape — coalesced, prefetched, uniform-sized, or even the entire file — without knowledge of Parquet layout. The decoder admits only what it will consume, and releases it at row-group boundaries. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent fdd3fe5 commit 871700c

File tree

4 files changed

+262
-2
lines changed

4 files changed

+262
-2
lines changed

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::arrow::arrow_reader::{
2828
use crate::errors::ParquetError;
2929
use crate::file::metadata::ParquetMetaData;
3030
use crate::util::push_buffers::PushBuffers;
31+
use crate::util::retention::RetentionSet;
3132
use arrow_array::RecordBatch;
3233
use bytes::Bytes;
3334
use reader_builder::RowGroupReaderBuilder;
@@ -185,6 +186,7 @@ impl ParquetPushDecoderBuilder {
185186
// Prepare to build RowGroup readers
186187
let file_len = 0; // not used in push decoder
187188
let buffers = PushBuffers::new(file_len);
189+
let retention = RetentionSet::from_row_groups(&parquet_metadata, &row_groups);
188190
let row_group_reader_builder = RowGroupReaderBuilder::new(
189191
batch_size,
190192
projection,
@@ -197,6 +199,7 @@ impl ParquetPushDecoderBuilder {
197199
max_predicate_cache_size,
198200
buffers,
199201
row_selection_policy,
202+
Some(retention),
200203
);
201204

202205
// Initialize the decoder with the configured options

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::errors::ParquetError;
3434
use crate::file::metadata::ParquetMetaData;
3535
use crate::file::page_index::offset_index::OffsetIndexMetaData;
3636
use crate::util::push_buffers::PushBuffers;
37+
use crate::util::retention::RetentionSet;
3738
use bytes::Bytes;
3839
use data::DataRequest;
3940
use filter::AdvanceResult;
@@ -168,6 +169,10 @@ pub(crate) struct RowGroupReaderBuilder {
168169

169170
/// The underlying data store
170171
buffers: PushBuffers,
172+
173+
/// Optional retention filter. When present, incoming `push_data` buffers
174+
/// are trimmed to only keep byte ranges the decoder will eventually need.
175+
retention: Option<RetentionSet>,
171176
}
172177

173178
impl RowGroupReaderBuilder {
@@ -185,6 +190,7 @@ impl RowGroupReaderBuilder {
185190
max_predicate_cache_size: usize,
186191
buffers: PushBuffers,
187192
row_selection_policy: RowSelectionPolicy,
193+
retention: Option<RetentionSet>,
188194
) -> Self {
189195
Self {
190196
batch_size,
@@ -199,12 +205,23 @@ impl RowGroupReaderBuilder {
199205
row_selection_policy,
200206
state: Some(RowGroupDecoderState::Finished),
201207
buffers,
208+
retention,
202209
}
203210
}
204211

205-
/// Push new data buffers that can be used to satisfy pending requests
212+
/// Push new data buffers that can be used to satisfy pending requests.
213+
///
214+
/// When a [`RetentionSet`] is configured, incoming buffers are filtered so
215+
/// that only byte ranges the decoder will eventually need are stored.
216+
/// Portions outside the retention set are silently discarded.
206217
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207-
self.buffers.push_ranges(ranges, buffers);
218+
let (ranges, buffers) = match &self.retention {
219+
Some(retention) => retention.filter(ranges, buffers),
220+
None => (ranges, buffers),
221+
};
222+
if !ranges.is_empty() {
223+
self.buffers.push_ranges(ranges, buffers);
224+
}
208225
}
209226

210227
/// Returns the total number of buffered bytes available

parquet/src/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ mod bit_pack;
2121
pub(crate) mod interner;
2222

2323
pub(crate) mod push_buffers;
24+
#[cfg(feature = "arrow")]
25+
pub(crate) mod retention;
2426
#[cfg(any(test, feature = "test_common"))]
2527
pub(crate) mod test_common;
2628
pub mod utf8;

parquet/src/util/retention.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use bytes::Bytes;
19+
use std::ops::Range;
20+
21+
use crate::file::metadata::ParquetMetaData;
22+
23+
/// A sorted, non-overlapping set of byte ranges that the decoder expects to
24+
/// consume.
25+
///
26+
/// When attached to a `RowGroupReaderBuilder`, incoming buffers are filtered
27+
/// against this set: only the portions that overlap a retained range are
28+
/// stored. Everything else is silently discarded.
29+
///
30+
/// This prevents speculatively prefetched data for row groups the decoder will
31+
/// never process from accumulating in memory.
32+
#[derive(Debug, Clone)]
33+
pub(crate) struct RetentionSet {
34+
/// Sorted, non-overlapping, merged ranges.
35+
ranges: Vec<Range<u64>>,
36+
}
37+
38+
impl RetentionSet {
39+
/// Build a retention set from the column chunk byte ranges of the given
40+
/// row groups.
41+
///
42+
/// All column chunks (regardless of projection) for each queued row group
43+
/// are included — this is a conservative superset of what the decoder will
44+
/// actually read.
45+
pub fn from_row_groups(metadata: &ParquetMetaData, row_groups: &[usize]) -> Self {
46+
let total_cols: usize = row_groups
47+
.iter()
48+
.map(|&rg| metadata.row_group(rg).columns().len())
49+
.sum();
50+
let mut ranges: Vec<Range<u64>> = Vec::with_capacity(total_cols);
51+
for &rg_idx in row_groups {
52+
let rg = metadata.row_group(rg_idx);
53+
for col in rg.columns() {
54+
let (start, len) = col.byte_range();
55+
ranges.push(start..start + len);
56+
}
57+
}
58+
ranges.sort_unstable_by_key(|r| r.start);
59+
let mut merged: Vec<Range<u64>> = Vec::with_capacity(ranges.len());
60+
for range in ranges {
61+
if let Some(last) = merged.last_mut() {
62+
if range.start <= last.end {
63+
last.end = last.end.max(range.end);
64+
continue;
65+
}
66+
}
67+
merged.push(range);
68+
}
69+
Self { ranges: merged }
70+
}
71+
72+
/// Filter incoming ranges and buffers, keeping only the portions that
73+
/// overlap the retention set.
74+
///
75+
/// Each retained portion is a zero-copy [`Bytes::slice`] of the original
76+
/// buffer. Portions that fall entirely outside the retention set are
77+
/// dropped.
78+
pub fn filter(
79+
&self,
80+
ranges: Vec<Range<u64>>,
81+
buffers: Vec<Bytes>,
82+
) -> (Vec<Range<u64>>, Vec<Bytes>) {
83+
let mut out_ranges = Vec::new();
84+
let mut out_buffers = Vec::new();
85+
86+
for (range, buffer) in ranges.into_iter().zip(buffers) {
87+
// Find the first retention range that could overlap: the first
88+
// whose end is past range.start.
89+
let start_idx = self.ranges.partition_point(|r| r.end <= range.start);
90+
91+
for ret in &self.ranges[start_idx..] {
92+
if ret.start >= range.end {
93+
break;
94+
}
95+
let overlap_start = range.start.max(ret.start);
96+
let overlap_end = range.end.min(ret.end);
97+
let buf_offset = (overlap_start - range.start) as usize;
98+
let buf_len = (overlap_end - overlap_start) as usize;
99+
out_ranges.push(overlap_start..overlap_end);
100+
out_buffers.push(buffer.slice(buf_offset..buf_offset + buf_len));
101+
}
102+
}
103+
104+
(out_ranges, out_buffers)
105+
}
106+
}
107+
108+
#[cfg(test)]
109+
mod tests {
110+
#![allow(clippy::single_range_in_vec_init)]
111+
use super::*;
112+
113+
fn make_retention(ranges: &[Range<u64>]) -> RetentionSet {
114+
let mut sorted: Vec<Range<u64>> = ranges.to_vec();
115+
sorted.sort_unstable_by_key(|r| r.start);
116+
let mut merged: Vec<Range<u64>> = Vec::new();
117+
for range in sorted {
118+
if let Some(last) = merged.last_mut() {
119+
if range.start <= last.end {
120+
last.end = last.end.max(range.end);
121+
continue;
122+
}
123+
}
124+
merged.push(range);
125+
}
126+
RetentionSet { ranges: merged }
127+
}
128+
129+
#[test]
130+
fn exact_match() {
131+
let ret = make_retention(&[10..20]);
132+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
133+
let (ranges, buffers) = ret.filter(vec![10..20], vec![buf]);
134+
assert_eq!(ranges, vec![10..20]);
135+
assert_eq!(buffers.len(), 1);
136+
assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
137+
}
138+
139+
#[test]
140+
fn no_overlap() {
141+
let ret = make_retention(&[10..20]);
142+
let buf = Bytes::from_static(&[1, 2, 3]);
143+
let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]);
144+
assert!(ranges.is_empty());
145+
assert!(buffers.is_empty());
146+
}
147+
148+
#[test]
149+
fn partial_overlap_left() {
150+
let ret = make_retention(&[10..20]);
151+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
152+
// Buffer covers 5..15, retention is 10..20 → keep 10..15
153+
let (ranges, buffers) = ret.filter(vec![5..15], vec![buf]);
154+
assert_eq!(ranges, vec![10..15]);
155+
assert_eq!(&*buffers[0], &[6, 7, 8, 9, 10]);
156+
}
157+
158+
#[test]
159+
fn partial_overlap_right() {
160+
let ret = make_retention(&[10..20]);
161+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
162+
// Buffer covers 15..25, retention is 10..20 → keep 15..20
163+
let (ranges, buffers) = ret.filter(vec![15..25], vec![buf]);
164+
assert_eq!(ranges, vec![15..20]);
165+
assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5]);
166+
}
167+
168+
#[test]
169+
fn buffer_spans_gap_between_retention_ranges() {
170+
// Retention: [10..20) and [30..40). Buffer covers 5..45.
171+
let ret = make_retention(&[10..20, 30..40]);
172+
let data: Vec<u8> = (0..40).collect();
173+
let buf = Bytes::from(data);
174+
let (ranges, buffers) = ret.filter(vec![5..45], vec![buf]);
175+
assert_eq!(ranges, vec![10..20, 30..40]);
176+
assert_eq!(buffers.len(), 2);
177+
// First slice: bytes at offset 5..15 in the buffer (values 5..15)
178+
assert_eq!(&*buffers[0], &[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]);
179+
// Second slice: bytes at offset 25..35 in the buffer (values 25..35)
180+
assert_eq!(&*buffers[1], &[25, 26, 27, 28, 29, 30, 31, 32, 33, 34]);
181+
}
182+
183+
#[test]
184+
fn superset_buffer_trimmed() {
185+
let ret = make_retention(&[10..20]);
186+
let data: Vec<u8> = (0..50).collect();
187+
let buf = Bytes::from(data);
188+
let (ranges, buffers) = ret.filter(vec![0..50], vec![buf]);
189+
assert_eq!(ranges, vec![10..20]);
190+
assert_eq!(&*buffers[0], &[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
191+
}
192+
193+
#[test]
194+
fn empty_retention_discards_everything() {
195+
let ret = RetentionSet { ranges: Vec::new() };
196+
let buf = Bytes::from_static(&[1, 2, 3]);
197+
let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]);
198+
assert!(ranges.is_empty());
199+
assert!(buffers.is_empty());
200+
}
201+
202+
#[test]
203+
fn multiple_input_buffers() {
204+
let ret = make_retention(&[10..20, 30..40]);
205+
let buf1 = Bytes::from_static(&[1, 2, 3, 4, 5]);
206+
let buf2 = Bytes::from_static(&[1, 2, 3, 4, 5]);
207+
let buf3 = Bytes::from_static(&[1, 2, 3, 4, 5]);
208+
let (ranges, buffers) = ret.filter(vec![0..5, 10..15, 35..40], vec![buf1, buf2, buf3]);
209+
// First buffer: no overlap. Second: exact. Third: exact.
210+
assert_eq!(ranges, vec![10..15, 35..40]);
211+
assert_eq!(buffers.len(), 2);
212+
}
213+
214+
#[test]
215+
fn zero_copy_slicing() {
216+
let ret = make_retention(&[10..20]);
217+
let data: Vec<u8> = (0..30).collect();
218+
let buf = Bytes::from(data);
219+
let original_ptr = buf.as_ptr();
220+
let (_, buffers) = ret.filter(vec![0..30], vec![buf]);
221+
// The output slice should point into the same allocation,
222+
// offset by 10 bytes.
223+
assert_eq!(buffers[0].as_ptr(), unsafe { original_ptr.add(10) },);
224+
}
225+
226+
#[test]
227+
fn adjacent_retention_ranges_are_merged() {
228+
// Two abutting ranges should merge into one.
229+
let ret = make_retention(&[10..20, 20..30]);
230+
assert_eq!(ret.ranges, vec![10..30]);
231+
let data: Vec<u8> = (0..40).collect();
232+
let buf = Bytes::from(data);
233+
let (ranges, buffers) = ret.filter(vec![0..40], vec![buf]);
234+
// Should produce a single slice, not two.
235+
assert_eq!(ranges, vec![10..30]);
236+
assert_eq!(buffers.len(), 1);
237+
}
238+
}

0 commit comments

Comments
 (0)