Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
41e7bcb
Use per-predicate projection masks in ClickBench benchmark
Dandandan Feb 14, 2026
8d7533b
Add statistics-based page pruning to ClickBench benchmark
Dandandan Feb 14, 2026
15142b6
Reorder Q22 predicates by selectivity for ClickBench benchmark
Dandandan Feb 14, 2026
498160e
[Test] Filter selectivity
Dandandan Feb 14, 2026
907f82d
[Test] Filter selectivity
Dandandan Feb 14, 2026
b4275cb
[Test] Filter selectivity
Dandandan Feb 14, 2026
06954d5
Merge branch 'main' into selectivity_threshold
Dandandan Feb 15, 2026
4581a1f
Merge branch 'main' into selectivity_threshold
Dandandan Mar 19, 2026
155a1fc
Improve selectivity threshold: measure absolute selectivity and avoid…
Dandandan Mar 19, 2026
75073e1
Revert selectivity to measure against raw (relative) counts
Dandandan Mar 20, 2026
5f2be6e
Replace selectivity threshold with scatter-based filter deferral
Dandandan Mar 20, 2026
a9bc94f
Use selector density (selectors/rows) for scatter threshold
Dandandan Mar 20, 2026
a1e52a3
Add cached row_count to RowSelection and O(1) total_row_count()
Dandandan Mar 20, 2026
7e7e065
Pass row_count to with_predicate instead of caching in RowSelection
Dandandan Mar 20, 2026
9cc2660
Use scatter density threshold of 0.01 and remove debug prints
Dandandan Mar 20, 2026
070fb5a
Only defer predicates that increase fragmentation
Dandandan Mar 20, 2026
0da100d
Fmt
Dandandan Mar 20, 2026
9649704
use statistics for deferral
sdf-jkl Apr 2, 2026
b86516d
Use selectivity and long-skip-share for deferral
sdf-jkl Apr 2, 2026
5552577
Add incremental improvement gate
sdf-jkl Apr 2, 2026
af70684
Add filter selectivity stats
sdf-jkl Apr 2, 2026
b988580
fix CI
sdf-jkl Apr 3, 2026
2c4787d
oops
sdf-jkl Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions parquet/benches/arrow_reader_clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,15 @@ impl ReadTest {
};

// setup the reader
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
parquet_file,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.75));
let mut stream = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand All @@ -747,15 +747,15 @@ impl ReadTest {
let reader = ParquetObjectReader::new(store, location);

// setup the reader
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.75));
let mut stream = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand All @@ -774,15 +774,15 @@ impl ReadTest {
};

// setup the reader
let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(
let builder = ParquetRecordBatchReaderBuilder::new_with_metadata(
parquet_file,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.75));
let reader = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand Down
128 changes: 128 additions & 0 deletions parquet/src/arrow/arrow_reader/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,62 @@
//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader

use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;

/// Why a predicate was applied or deferred at the read-plan stage.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterDeferralDecisionReason {
/// Predicate selected all rows with no existing selection, so no selection
/// structure was materialized.
AllSelectedFastPath,
/// Deferral threshold is not configured.
ThresholdDisabled,
/// Row count was zero.
ZeroRowCount,
/// Predicate did not increase selector fragmentation.
FragmentationNotIncreased,
/// Predicate passed non-deferral gates and was kept applied.
GatesPassed,
/// Predicate failed one or more non-deferral gates and was deferred.
GatesFailedDeferred,
}

/// Per-filter stats captured during read-plan predicate evaluation.
#[derive(Debug, Clone, PartialEq)]
pub struct FilterSelectivityStat {
/// Zero-based predicate evaluation index within this read-plan build.
pub predicate_index: usize,
/// Number of rows considered by the predicate decision.
pub row_count: usize,
/// Selector count before applying this predicate.
pub current_selector_count: usize,
/// Selector count after applying this predicate.
pub absolute_selector_count: usize,
/// Skipped rows before applying this predicate.
pub current_skipped_rows: usize,
/// Skipped rows after applying this predicate.
pub absolute_skipped_rows: usize,
/// Long skipped rows before applying this predicate.
pub current_long_skip_rows: usize,
/// Long skipped rows after applying this predicate.
pub absolute_long_skip_rows: usize,
/// Absolute skipped/rows ratio.
pub absolute_skip_selectivity: f64,
/// Absolute long-skipped/skipped ratio.
pub absolute_long_skip_share: f64,
/// Incremental skipped/rows ratio contributed by this predicate.
pub delta_skip_selectivity: f64,
/// Incremental long-skipped/skipped ratio contributed by this predicate.
pub delta_long_skip_share: f64,
/// Threshold supplied via `with_scatter_threshold`.
pub long_skip_share_threshold: Option<f64>,
/// Whether this predicate result was deferred.
pub deferred: bool,
/// Why this predicate was applied or deferred.
pub decision_reason: FilterDeferralDecisionReason,
}

/// This enum represents the state of Arrow reader metrics collection.
///
/// The inner metrics are stored in an `Arc<ArrowReaderMetricsInner>`
Expand Down Expand Up @@ -90,6 +144,22 @@ impl ArrowReaderMetrics {
}
}

/// Per-filter selectivity/deferral stats captured during planning.
///
/// Returns `None` if metrics are disabled.
pub fn filter_selectivity_stats(&self) -> Option<Vec<FilterSelectivityStat>> {
match self {
Self::Disabled => None,
Self::Enabled(inner) => {
let stats = match inner.filter_selectivity_stats.lock() {
Ok(stats) => stats,
Err(poisoned) => poisoned.into_inner(),
};
Some(stats.clone())
}
}
}

/// Increments the count of records read from the inner reader
pub(crate) fn increment_inner_reads(&self, count: usize) {
let Self::Enabled(inner) = self else {
Expand All @@ -110,6 +180,19 @@ impl ArrowReaderMetrics {
.records_read_from_cache
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
}

/// Records a per-filter selectivity stat.
pub(crate) fn record_filter_selectivity_stat(&self, stat: FilterSelectivityStat) {
let Self::Enabled(inner) = self else {
return;
};

let mut stats = match inner.filter_selectivity_stats.lock() {
Ok(stats) => stats,
Err(poisoned) => poisoned.into_inner(),
};
stats.push(stat);
}
}

/// Holds the actual metrics for the Arrow reader.
Expand All @@ -122,6 +205,8 @@ pub struct ArrowReaderMetricsInner {
records_read_from_inner: AtomicUsize,
/// Total number of records read from previously cached pages
records_read_from_cache: AtomicUsize,
/// Per-filter selectivity stats captured during read planning.
filter_selectivity_stats: Mutex<Vec<FilterSelectivityStat>>,
}

impl ArrowReaderMetricsInner {
Expand All @@ -130,6 +215,49 @@ impl ArrowReaderMetricsInner {
Self {
records_read_from_inner: AtomicUsize::new(0),
records_read_from_cache: AtomicUsize::new(0),
filter_selectivity_stats: Mutex::new(vec![]),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_filter_selectivity_stats_disabled() {
let metrics = ArrowReaderMetrics::disabled();
assert!(metrics.filter_selectivity_stats().is_none());
}

#[test]
fn test_filter_selectivity_stats_enabled() {
let metrics = ArrowReaderMetrics::enabled();
metrics.record_filter_selectivity_stat(FilterSelectivityStat {
predicate_index: 0,
row_count: 100,
current_selector_count: 1,
absolute_selector_count: 3,
current_skipped_rows: 10,
absolute_skipped_rows: 20,
current_long_skip_rows: 10,
absolute_long_skip_rows: 15,
absolute_skip_selectivity: 0.2,
absolute_long_skip_share: 0.75,
delta_skip_selectivity: 0.1,
delta_long_skip_share: 0.5,
long_skip_share_threshold: Some(0.75),
deferred: true,
decision_reason: FilterDeferralDecisionReason::GatesFailedDeferred,
});

let stats = metrics.filter_selectivity_stats().expect("metrics enabled");
assert_eq!(stats.len(), 1);
assert_eq!(stats[0].predicate_index, 0);
assert!(stats[0].deferred);
assert_eq!(
stats[0].decision_reason,
FilterDeferralDecisionReason::GatesFailedDeferred
);
}
}
48 changes: 46 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) metrics: ArrowReaderMetrics,

pub(crate) max_predicate_cache_size: usize,

pub(crate) scatter_threshold: Option<f64>,
}

impl<T: Debug> Debug for ArrowReaderBuilder<T> {
Expand All @@ -157,6 +159,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("metrics", &self.metrics)
.field("scatter_threshold", &self.scatter_threshold)
.finish()
}
}
Expand All @@ -178,6 +181,7 @@ impl<T> ArrowReaderBuilder<T> {
offset: None,
metrics: ArrowReaderMetrics::Disabled,
max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
scatter_threshold: None,
}
}

Expand Down Expand Up @@ -430,6 +434,37 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}

/// Set a scatter threshold for filter deferral.
///
/// Deferral is considered only when a predicate increases selector
/// fragmentation. In that case, the result is deferred unless:
///
/// 1. absolute skip selectivity (`skipped_rows / total_rows`) is at least
/// 10%,
/// 2. absolute long-skip share (`long_skip_rows / skipped_rows`) is at
/// least this threshold,
/// 3. incremental skip selectivity added by the predicate is at least 2%,
/// and
/// 4. incremental long-skip share added by the predicate is at least this
/// threshold.
///
/// For example, `0.75` means at least 75% of skipped rows must be in long
/// skip runs to avoid deferral.
///
/// The deferred results are applied at the end via
/// [`RowSelection::intersection`], so correctness is preserved.
///
/// `None` disables deferral (the default).
///
/// [`RowFilter`]: crate::arrow::arrow_reader::RowFilter
/// [`RowSelection::intersection`]: crate::arrow::arrow_reader::RowSelection::intersection
pub fn with_scatter_threshold(self, threshold: Option<f64>) -> Self {
Self {
scatter_threshold: threshold,
..self
}
}
}

/// Options that control how [`ParquetMetaData`] is read when constructing
Expand Down Expand Up @@ -1188,6 +1223,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
metrics,
// Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
max_predicate_cache_size: _,
scatter_threshold,
} = self;

// Try to avoid allocate large buffer
Expand All @@ -1203,7 +1239,9 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {

let mut plan_builder = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.with_row_selection_policy(row_selection_policy);
.with_metrics(metrics.clone())
.with_row_selection_policy(row_selection_policy)
.with_scatter_threshold(scatter_threshold);

// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
Expand All @@ -1217,7 +1255,13 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(), predicate.projection())?;

plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
let row_count: usize = reader
.row_groups
.iter()
.map(|&i| reader.metadata.row_group(i).num_rows() as usize)
.sum();
plan_builder =
plan_builder.with_predicate(array_reader, predicate.as_mut(), row_count)?;
}
}

Expand Down
Loading
Loading