Skip to content

Commit 76c0284

Browse files
xudong963claude
andcommitted
feat(parquet): add with_fully_matched_row_groups to skip filter for fully matched row groups
When row group statistics prove that all rows in a row group satisfy the filter predicate, the RowFilter evaluation can be skipped entirely for those row groups. This avoids the cost of decoding filter columns and evaluating the predicate expression. Adds `with_fully_matched_row_groups(Vec<usize>)` to ArrowReaderBuilder which flows through to RowGroupReaderBuilder. When processing a fully matched row group, the Start state transitions directly to StartData, bypassing all filter evaluation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 68851ef commit 76c0284

File tree

4 files changed

+60
-0
lines changed

4 files changed

+60
-0
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ pub struct ArrowReaderBuilder<T> {
139139
pub(crate) metrics: ArrowReaderMetrics,
140140

141141
pub(crate) max_predicate_cache_size: usize,
142+
143+
/// Row groups where ALL rows are known to match the filter predicate.
144+
///
145+
/// For these row groups, the [`RowFilter`] evaluation is skipped entirely
146+
/// since the predicate is guaranteed to be true for every row.
147+
pub(crate) fully_matched_row_groups: Option<Vec<usize>>,
142148
}
143149

144150
impl<T: Debug> Debug for ArrowReaderBuilder<T> {
@@ -178,6 +184,7 @@ impl<T> ArrowReaderBuilder<T> {
178184
offset: None,
179185
metrics: ArrowReaderMetrics::Disabled,
180186
max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
187+
fully_matched_row_groups: None,
181188
}
182189
}
183190

@@ -344,6 +351,28 @@ impl<T> ArrowReaderBuilder<T> {
344351
}
345352
}
346353

354+
/// Specify row groups where ALL rows are known to match the filter predicate.
355+
///
356+
/// For these row groups, the [`RowFilter`] evaluation (set via
357+
/// [`Self::with_row_filter`]) is skipped entirely since the predicate is
358+
/// guaranteed to be `true` for every row. This avoids the cost of decoding
359+
/// filter columns and evaluating the predicate expression for those row
360+
/// groups.
361+
///
362+
/// This is typically determined by evaluating row group statistics: if the
363+
/// statistics prove that all rows satisfy the predicate, the row group is
364+
/// "fully matched."
365+
///
366+
/// The provided indices must be a subset of the row groups specified via
367+
/// [`Self::with_row_groups`] (or all row groups if none were specified).
368+
/// Indices not present in the row group list are ignored.
369+
pub fn with_fully_matched_row_groups(self, row_groups: Vec<usize>) -> Self {
370+
Self {
371+
fully_matched_row_groups: Some(row_groups),
372+
..self
373+
}
374+
}
375+
347376
/// Provide a limit to the number of rows to be read
348377
///
349378
/// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
@@ -1188,6 +1217,8 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
11881217
metrics,
11891218
// Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
11901219
max_predicate_cache_size: _,
1220+
// Not used for the sync reader (single row group per reader)
1221+
fully_matched_row_groups: _,
11911222
} = self;
11921223

11931224
// Try to avoid allocate large buffer

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
497497
offset,
498498
metrics,
499499
max_predicate_cache_size,
500+
fully_matched_row_groups,
500501
} = self;
501502

502503
// Ensure schema of ParquetRecordBatchStream respects projection, and does
@@ -522,6 +523,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
522523
offset,
523524
metrics,
524525
max_predicate_cache_size,
526+
fully_matched_row_groups,
525527
}
526528
.build()?;
527529

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder {
176176
metrics,
177177
row_selection_policy,
178178
max_predicate_cache_size,
179+
fully_matched_row_groups,
179180
} = self;
180181

181182
// If no row groups were specified, read all of them
@@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder {
197198
max_predicate_cache_size,
198199
buffers,
199200
row_selection_policy,
201+
fully_matched_row_groups,
200202
);
201203

202204
// Initialize the decoder with the configured options

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use bytes::Bytes;
3838
use data::DataRequest;
3939
use filter::AdvanceResult;
4040
use filter::FilterInfo;
41+
use std::collections::HashSet;
4142
use std::ops::Range;
4243
use std::sync::{Arc, RwLock};
4344

@@ -160,6 +161,10 @@ pub(crate) struct RowGroupReaderBuilder {
160161
/// Strategy for materialising row selections
161162
row_selection_policy: RowSelectionPolicy,
162163

164+
/// Row groups where ALL rows are known to match the filter predicate.
165+
/// For these row groups, filter evaluation is skipped entirely.
166+
fully_matched_row_groups: HashSet<usize>,
167+
163168
/// Current state of the decoder.
164169
///
165170
/// It is taken when processing, and must be put back before returning
@@ -185,6 +190,7 @@ impl RowGroupReaderBuilder {
185190
max_predicate_cache_size: usize,
186191
buffers: PushBuffers,
187192
row_selection_policy: RowSelectionPolicy,
193+
fully_matched_row_groups: Option<Vec<usize>>,
188194
) -> Self {
189195
Self {
190196
batch_size,
@@ -197,6 +203,9 @@ impl RowGroupReaderBuilder {
197203
metrics,
198204
max_predicate_cache_size,
199205
row_selection_policy,
206+
fully_matched_row_groups: fully_matched_row_groups
207+
.map(|v| v.into_iter().collect())
208+
.unwrap_or_default(),
200209
state: Some(RowGroupDecoderState::Finished),
201210
buffers,
202211
}
@@ -328,6 +337,22 @@ impl RowGroupReaderBuilder {
328337
}));
329338
};
330339

340+
// Skip filter for fully matched row groups: all rows are known
341+
// to satisfy the predicate based on row group statistics, so
342+
// evaluating the filter would be wasted work.
343+
if self
344+
.fully_matched_row_groups
345+
.contains(&row_group_info.row_group_idx)
346+
{
347+
// Put the filter back for subsequent non-fully-matched row groups
348+
self.filter = Some(filter);
349+
return Ok(NextState::again(RowGroupDecoderState::StartData {
350+
row_group_info,
351+
column_chunks,
352+
cache_info: None,
353+
}));
354+
}
355+
331356
// we have predicates to evaluate
332357
let cache_projection =
333358
self.compute_cache_projection(row_group_info.row_group_idx, &filter);

0 commit comments

Comments
 (0)