From 321a8d3bc1c766a31e43b2b459e01d474cf1ce3a Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Wed, 7 Jan 2026 16:28:16 -0500 Subject: [PATCH 01/32] Finding where to start --- parquet/src/arrow/arrow_reader/mod.rs | 5 +- parquet/src/arrow/arrow_reader/selection.rs | 75 +++++----- .../arrow/push_decoder/reader_builder/mod.rs | 128 +++++++++--------- 3 files changed, 107 insertions(+), 101 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a626076ebdd7..f45a294779f5 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1223,8 +1223,11 @@ impl ParquetRecordBatchReader { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + // Start here + let pages = self.array_reader. + for page in self. while !mask_cursor.is_empty() { - let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { + let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, end) else { return Ok(None); }; diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..cd1164bb5e49 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::ProjectionMask; +// use crate::arrow::ProjectionMask; use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; @@ -250,38 +250,39 @@ impl RowSelection { ranges } - /// Returns true if this selection would skip any data pages within the provided columns - fn selection_skips_any_page( - &self, - projection: &ProjectionMask, - columns: &[OffsetIndexMetaData], - ) -> bool { - columns.iter().enumerate().any(|(leaf_idx, column)| { - if !projection.leaf_included(leaf_idx) { - return false; - } - - let locations = column.page_locations(); - if locations.is_empty() { - return false; - } - - let ranges = self.scan_ranges(locations); - !ranges.is_empty() && ranges.len() < locations.len() - }) - } - - /// Returns true if selectors should be forced, preventing mask materialisation - pub(crate) fn should_force_selectors( - &self, - projection: &ProjectionMask, - offset_index: Option<&[OffsetIndexMetaData]>, - ) -> bool { - match offset_index { - Some(columns) => self.selection_skips_any_page(projection, columns), - None => false, - } - } + // /// Returns true if this selection would skip any data pages within the provided columns + // fn selection_skips_any_page( + // &self, + // projection: &ProjectionMask, + // columns: &[OffsetIndexMetaData], + // ) -> bool { + // columns.iter().enumerate().any(|(leaf_idx, column)| { + // if !projection.leaf_included(leaf_idx) { + // return false; + // } + + // let locations = column.page_locations(); + // if locations.is_empty() { + // return false; + // } + + // let ranges = self.scan_ranges(locations); + // !ranges.is_empty() && ranges.len() < locations.len() + // }) + // } + + // / Returns true if selectors should be forced, preventing mask materialisation + // pub(crate) fn should_force_selectors( + // &self, + // _projection: &ProjectionMask, + // _offset_index: Option<&[OffsetIndexMetaData]>, + // ) -> bool { + // match offset_index { + // Some(columns) => self.selection_skips_any_page(projection, columns), + // None => false, + // } + // false + // } /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { @@ -779,7 +780,7 @@ impl MaskCursor { } /// Advance through the mask representation, producing the next chunk summary - pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + pub fn next_mask_chunk(&mut self, batch_size: usize, range_end: usize) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -791,7 +792,9 @@ impl MaskCursor { let mut cursor = start_position; let mut initial_skip = 0; - while cursor < mask.len() && !mask.value(cursor) { + let limit = range_end.min(mask.len()); + + while cursor < limit && !mask.value(cursor) { initial_skip += 1; cursor += 1; } @@ -803,7 +806,7 @@ impl MaskCursor { // Advance until enough rows have been selected to satisfy the batch size, // or until the mask is exhausted. This mirrors the behaviour of the legacy // `RowSelector` queue-based iteration. - while cursor < mask.len() && selected_rows < batch_size { + while cursor < limit && selected_rows < batch_size { chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 61a244589c6d..d647e26089d2 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -22,7 +22,7 @@ use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::arrow_reader::selection::RowSelectionStrategy; +// use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, }; @@ -536,11 +536,11 @@ impl RowGroupReaderBuilder { plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); - plan_builder = override_selector_strategy_if_needed( - plan_builder, - &self.projection, - self.row_group_offset_index(row_group_idx), - ); + // plan_builder = override_selector_strategy_if_needed( + // plan_builder, + // &self.projection, + // self.row_group_offset_index(row_group_idx), + // ); let row_group_info = RowGroupInfo { row_group_idx, @@ -644,66 +644,66 @@ impl RowGroupReaderBuilder { mask.without_nested_types(self.metadata.file_metadata().schema_descr()) } - /// Get the offset index for the specified row group, if any - fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { - self.metadata - .offset_index() - .filter(|index| !index.is_empty()) - .and_then(|index| index.get(row_group_idx)) - .map(|columns| columns.as_slice()) - } + // /// Get the offset index for the specified row group, if any + // fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { + // self.metadata + // .offset_index() + // .filter(|index| !index.is_empty()) + // .and_then(|index| index.get(row_group_idx)) + // .map(|columns| columns.as_slice()) + // } } -/// Override the selection strategy if needed. -/// -/// Some pages can be skipped during row-group construction if they are not read -/// by the selections. This means that the data pages for those rows are never -/// loaded and definition/repetition levels are never read. When using -/// `RowSelections` selection works because `skip_records()` handles this -/// case and skips the page accordingly. -/// -/// However, with the current mask design, all values must be read and decoded -/// and then a mask filter is applied. Thus if any pages are skipped during -/// row-group construction, the data pages are missing and cannot be decoded. -/// -/// A simple example: -/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) -/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) -/// -/// Using the row selection to skip(4), page2 won't be read at all, so in this -/// case we can't decode all the rows and apply a mask. To correctly apply the -/// bit mask, we need all 6 values be read, but page2 is not in memory. -fn override_selector_strategy_if_needed( - plan_builder: ReadPlanBuilder, - projection_mask: &ProjectionMask, - offset_index: Option<&[OffsetIndexMetaData]>, -) -> ReadPlanBuilder { - // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that - let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { - return plan_builder; - }; - - let preferred_strategy = plan_builder.resolve_selection_strategy(); - - let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) - && plan_builder.selection().is_some_and(|selection| { - selection.should_force_selectors(projection_mask, offset_index) - }); - - let resolved_strategy = if force_selectors { - RowSelectionStrategy::Selectors - } else { - preferred_strategy - }; - - // override the plan builder strategy with the resolved one - let new_policy = match resolved_strategy { - RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, - RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, - }; - - plan_builder.with_row_selection_policy(new_policy) -} +// / Override the selection strategy if needed. +// / +// / Some pages can be skipped during row-group construction if they are not read +// / by the selections. This means that the data pages for those rows are never +// / loaded and definition/repetition levels are never read. When using +// / `RowSelections` selection works because `skip_records()` handles this +// / case and skips the page accordingly. +// / +// / However, with the current mask design, all values must be read and decoded +// / and then a mask filter is applied. Thus if any pages are skipped during +// / row-group construction, the data pages are missing and cannot be decoded. +// / +// / A simple example: +// / * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) +// / * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) +// / +// / Using the row selection to skip(4), page2 won't be read at all, so in this +// / case we can't decode all the rows and apply a mask. To correctly apply the +// / bit mask, we need all 6 values be read, but page2 is not in memory. +// fn override_selector_strategy_if_needed( +// plan_builder: ReadPlanBuilder, +// projection_mask: &ProjectionMask, +// offset_index: Option<&[OffsetIndexMetaData]>, +// ) -> ReadPlanBuilder { +// // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that +// let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { +// return plan_builder; +// }; + +// let preferred_strategy = plan_builder.resolve_selection_strategy(); + +// let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) +// && plan_builder.selection().is_some_and(|selection| { +// selection.should_force_selectors(projection_mask, offset_index) +// }); + +// let resolved_strategy = if force_selectors { +// RowSelectionStrategy::Selectors +// } else { +// preferred_strategy +// }; + +// // override the plan builder strategy with the resolved one +// let new_policy = match resolved_strategy { +// RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, +// RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, +// }; + +// plan_builder.with_row_selection_policy(new_policy) +// } #[cfg(test)] mod tests { From 8acfb4e061fd75dddc2650fe241edc8c29227757 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 8 Jan 2026 21:24:20 -0500 Subject: [PATCH 02/32] Seems to work --- parquet/src/arrow/arrow_reader/mod.rs | 23 +++- parquet/src/arrow/arrow_reader/read_plan.rs | 2 +- parquet/src/arrow/arrow_reader/selection.rs | 103 +++++++++++------- .../arrow/push_decoder/reader_builder/mod.rs | 88 ++++----------- 4 files changed, 101 insertions(+), 115 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index f45a294779f5..8518d36d092a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,6 +44,7 @@ use crate::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetStatisticsPolicy, RowGroupMetaData, }; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -1083,7 +1084,7 @@ impl ParquetRecordBatchReaderBuilder { .build_limited() .build(); - Ok(ParquetRecordBatchReader::new(array_reader, read_plan)) + Ok(ParquetRecordBatchReader::new(array_reader, read_plan, None)) } } @@ -1185,6 +1186,7 @@ pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, read_plan: ReadPlan, + page_offsets: Option>>, } impl Debug for ParquetRecordBatchReader { @@ -1223,11 +1225,14 @@ impl ParquetRecordBatchReader { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - // Start here - let pages = self.array_reader. - for page in self. + let page_locations = self.page_offsets.as_ref().map(|columns| { + // Use only the first column as the global guide + &columns[0].page_locations()[..] + }); + while !mask_cursor.is_empty() { - let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, end) else { + let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations) + else { return Ok(None); }; @@ -1394,13 +1399,18 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), read_plan, + page_offsets: None, }) } /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - pub(crate) fn new(array_reader: Box, read_plan: ReadPlan) -> Self { + pub(crate) fn new( + array_reader: Box, + read_plan: ReadPlan, + page_offsets: Option<&[OffsetIndexMetaData]>, + ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), @@ -1410,6 +1420,7 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(schema), read_plan, + page_offsets: page_offsets.map(|slice| Arc::new(slice.to_vec())), } } diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 7c9eb36befe3..c8171f61b1a0 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -148,7 +148,7 @@ impl ReadPlanBuilder { array_reader: Box, predicate: &mut dyn ArrowPredicate, ) -> Result { - let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); + let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build(), None); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index cd1164bb5e49..616f8e8fae9d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::ProjectionMask; // use crate::arrow::ProjectionMask; use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; @@ -250,39 +251,38 @@ impl RowSelection { ranges } - // /// Returns true if this selection would skip any data pages within the provided columns - // fn selection_skips_any_page( - // &self, - // projection: &ProjectionMask, - // columns: &[OffsetIndexMetaData], - // ) -> bool { - // columns.iter().enumerate().any(|(leaf_idx, column)| { - // if !projection.leaf_included(leaf_idx) { - // return false; - // } - - // let locations = column.page_locations(); - // if locations.is_empty() { - // return false; - // } - - // let ranges = self.scan_ranges(locations); - // !ranges.is_empty() && ranges.len() < locations.len() - // }) - // } - - // / Returns true if selectors should be forced, preventing mask materialisation - // pub(crate) fn should_force_selectors( - // &self, - // _projection: &ProjectionMask, - // _offset_index: Option<&[OffsetIndexMetaData]>, - // ) -> bool { - // match offset_index { - // Some(columns) => self.selection_skips_any_page(projection, columns), - // None => false, - // } - // false - // } + /// Returns true if this selection would skip any data pages within the provided columns + fn selection_skips_any_page( + &self, + projection: &ProjectionMask, + columns: &[OffsetIndexMetaData], + ) -> bool { + columns.iter().enumerate().any(|(leaf_idx, column)| { + if !projection.leaf_included(leaf_idx) { + return false; + } + + let locations = column.page_locations(); + if locations.is_empty() { + return false; + } + + let ranges = self.scan_ranges(locations); + !ranges.is_empty() && ranges.len() < locations.len() + }) + } + + /// Returns true if bitmasks should be page aware + pub(crate) fn requires_page_aware_mask( + &self, + projection: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, + ) -> bool { + match offset_index { + Some(columns) => self.selection_skips_any_page(projection, columns), + None => false, + } + } /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { @@ -779,8 +779,13 @@ impl MaskCursor { self.position >= self.mask.len() } - /// Advance through the mask representation, producing the next chunk summary - pub fn next_mask_chunk(&mut self, batch_size: usize, range_end: usize) -> Option { + /// Advance through the mask representation, producing the next chunk summary. + /// Optionally clips chunk boundaries to page boundaries. + pub fn next_mask_chunk( + &mut self, + batch_size: usize, + page_locations: Option<&[PageLocation]>, + ) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -792,9 +797,8 @@ impl MaskCursor { let mut cursor = start_position; let mut initial_skip = 0; - let limit = range_end.min(mask.len()); - - while cursor < limit && !mask.value(cursor) { + // Skip unselected rows + while cursor < mask.len() && !mask.value(cursor) { initial_skip += 1; cursor += 1; } @@ -803,14 +807,29 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - // Advance until enough rows have been selected to satisfy the batch size, - // or until the mask is exhausted. This mirrors the behaviour of the legacy - // `RowSelector` queue-based iteration. - while cursor < limit && selected_rows < batch_size { + // Advance until enough rows have been selected to satisfy batch_size, + // or until the mask is exhausted. + while cursor < mask.len() && selected_rows < batch_size { + // Increment counters chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; } + + // If page boundaries are provided, clip the chunk at the first boundary + if let Some(pages) = page_locations { + for loc in pages { + // Convert first_row_index safely to usize + let page_start = loc.first_row_index.try_into().unwrap_or(usize::MAX); + if page_start > mask_start && page_start < mask_start + chunk_rows { + // shrink chunk_rows to page boundary + chunk_rows = page_start - mask_start; + // stop checking further pages + break; + } + } + } + cursor += 1; } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d647e26089d2..5ecd9df81867 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -536,12 +536,6 @@ impl RowGroupReaderBuilder { plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); - // plan_builder = override_selector_strategy_if_needed( - // plan_builder, - // &self.projection, - // self.row_group_offset_index(row_group_idx), - // ); - let row_group_info = RowGroupInfo { row_group_idx, row_count, @@ -588,6 +582,19 @@ impl RowGroupReaderBuilder { &mut self.buffers, )?; + // before plan is build below + // check if plan is bitmask and if it is, put it in a variable + let page_offsets = if plan_builder.selection().is_some_and(|selection| { + selection.requires_page_aware_mask( + &self.projection, + self.row_group_offset_index(row_group_idx), + ) + }) { + self.row_group_offset_index(row_group_idx) + } else { + None + }; + let plan = plan_builder.build(); // if we have any cached results, connect them up @@ -603,7 +610,7 @@ impl RowGroupReaderBuilder { .build_array_reader(self.fields.as_deref(), &self.projection) }?; - let reader = ParquetRecordBatchReader::new(array_reader, plan); + let reader = ParquetRecordBatchReader::new(array_reader, plan, page_offsets); NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) } RowGroupDecoderState::Finished => { @@ -644,67 +651,16 @@ impl RowGroupReaderBuilder { mask.without_nested_types(self.metadata.file_metadata().schema_descr()) } - // /// Get the offset index for the specified row group, if any - // fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { - // self.metadata - // .offset_index() - // .filter(|index| !index.is_empty()) - // .and_then(|index| index.get(row_group_idx)) - // .map(|columns| columns.as_slice()) - // } + /// Get the offset index for the specified row group, if any + fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { + self.metadata + .offset_index() + .filter(|index| !index.is_empty()) + .and_then(|index| index.get(row_group_idx)) + .map(|columns| columns.as_slice()) + } } -// / Override the selection strategy if needed. -// / -// / Some pages can be skipped during row-group construction if they are not read -// / by the selections. This means that the data pages for those rows are never -// / loaded and definition/repetition levels are never read. When using -// / `RowSelections` selection works because `skip_records()` handles this -// / case and skips the page accordingly. -// / -// / However, with the current mask design, all values must be read and decoded -// / and then a mask filter is applied. Thus if any pages are skipped during -// / row-group construction, the data pages are missing and cannot be decoded. -// / -// / A simple example: -// / * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) -// / * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) -// / -// / Using the row selection to skip(4), page2 won't be read at all, so in this -// / case we can't decode all the rows and apply a mask. To correctly apply the -// / bit mask, we need all 6 values be read, but page2 is not in memory. -// fn override_selector_strategy_if_needed( -// plan_builder: ReadPlanBuilder, -// projection_mask: &ProjectionMask, -// offset_index: Option<&[OffsetIndexMetaData]>, -// ) -> ReadPlanBuilder { -// // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that -// let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { -// return plan_builder; -// }; - -// let preferred_strategy = plan_builder.resolve_selection_strategy(); - -// let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) -// && plan_builder.selection().is_some_and(|selection| { -// selection.should_force_selectors(projection_mask, offset_index) -// }); - -// let resolved_strategy = if force_selectors { -// RowSelectionStrategy::Selectors -// } else { -// preferred_strategy -// }; - -// // override the plan builder strategy with the resolved one -// let new_policy = match resolved_strategy { -// RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, -// RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, -// }; - -// plan_builder.with_row_selection_policy(new_policy) -// } - #[cfg(test)] mod tests { use super::*; From ed2a182a5f813814d0e6b8b90abc99361ffbffbe Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 8 Jan 2026 22:01:43 -0500 Subject: [PATCH 03/32] Fix --- parquet/src/arrow/arrow_reader/selection.rs | 1 - parquet/src/arrow/push_decoder/reader_builder/mod.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 616f8e8fae9d..87db08e5d135 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -16,7 +16,6 @@ // under the License. use crate::arrow::ProjectionMask; -// use crate::arrow::ProjectionMask; use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 5ecd9df81867..cd53ca4eec2c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -22,7 +22,6 @@ use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -// use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, }; From 5395dbf2ed0bcca51224945a5a6d0637dd6019d8 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 9 Jan 2026 15:30:44 -0500 Subject: [PATCH 04/32] Fix async err? --- parquet/src/arrow/arrow_reader/selection.rs | 29 +++++++++------------ 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 87db08e5d135..204b33f1ff6a 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -806,29 +806,26 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; + let mut page_end = mask.len(); + if let Some(pages) = page_locations { + for loc in pages { + let page_start = loc.first_row_index.try_into().unwrap_or(usize::MAX); + if page_start > mask_start { + page_end = page_start.min(page_end); + break; + } + } + } + // Advance until enough rows have been selected to satisfy batch_size, - // or until the mask is exhausted. - while cursor < mask.len() && selected_rows < batch_size { + // or until the mask is exhausted or the page boundary is reached. + while cursor < mask.len() && cursor < page_end && selected_rows < batch_size { // Increment counters chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; } - // If page boundaries are provided, clip the chunk at the first boundary - if let Some(pages) = page_locations { - for loc in pages { - // Convert first_row_index safely to usize - let page_start = loc.first_row_index.try_into().unwrap_or(usize::MAX); - if page_start > mask_start && page_start < mask_start + chunk_rows { - // shrink chunk_rows to page boundary - chunk_rows = page_start - mask_start; - // stop checking further pages - break; - } - } - } - cursor += 1; } From df9a49307cf5d9251447ed8c3af89ac7100b8cbe Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Sun, 11 Jan 2026 23:40:10 -0500 Subject: [PATCH 05/32] Fix complexity from O(n^2) to O(logn) --- parquet/src/arrow/arrow_reader/selection.rs | 30 +++++++++------------ 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 87db08e5d135..c88fe2ca6d40 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -806,29 +806,25 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; + let max_chunk_rows = page_locations + .and_then(|pages| { + let next_idx = + pages.partition_point(|loc| loc.first_row_index as usize <= mask_start); + pages.get(next_idx).and_then(|loc| { + let page_start = loc.first_row_index as usize; + (page_start > mask_start).then_some(page_start - mask_start) + }) + }) + .unwrap_or(usize::MAX); + // Advance until enough rows have been selected to satisfy batch_size, - // or until the mask is exhausted. - while cursor < mask.len() && selected_rows < batch_size { + // or until the mask is exhausted or until a page boundary. + while cursor < mask.len() && selected_rows < batch_size && chunk_rows < max_chunk_rows { // Increment counters chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; } - - // If page boundaries are provided, clip the chunk at the first boundary - if let Some(pages) = page_locations { - for loc in pages { - // Convert first_row_index safely to usize - let page_start = loc.first_row_index.try_into().unwrap_or(usize::MAX); - if page_start > mask_start && page_start < mask_start + chunk_rows { - // shrink chunk_rows to page boundary - chunk_rows = page_start - mask_start; - // stop checking further pages - break; - } - } - } - cursor += 1; } From 55e01260675e1315b6f1928d1277c40fc14ba4b6 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Mon, 12 Jan 2026 11:52:35 -0500 Subject: [PATCH 06/32] Pass pagelocation instead offsetindexmetadata --- parquet/src/arrow/arrow_reader/mod.rs | 13 +++++-------- .../src/arrow/push_decoder/reader_builder/mod.rs | 5 ++++- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 8518d36d092a..e09334b2d8e6 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,7 +44,7 @@ use crate::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetStatisticsPolicy, RowGroupMetaData, }; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::page_index::offset_index::PageLocation; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -1186,7 +1186,7 @@ pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, read_plan: ReadPlan, - page_offsets: Option>>, + page_offsets: Option>, } impl Debug for ParquetRecordBatchReader { @@ -1225,10 +1225,7 @@ impl ParquetRecordBatchReader { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - let page_locations = self.page_offsets.as_ref().map(|columns| { - // Use only the first column as the global guide - &columns[0].page_locations()[..] - }); + let page_locations = self.page_offsets.as_deref(); while !mask_cursor.is_empty() { let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations) @@ -1409,7 +1406,7 @@ impl ParquetRecordBatchReader { pub(crate) fn new( array_reader: Box, read_plan: ReadPlan, - page_offsets: Option<&[OffsetIndexMetaData]>, + page_offsets: Option>, ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(fields) => Schema::new(fields.clone()), @@ -1420,7 +1417,7 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(schema), read_plan, - page_offsets: page_offsets.map(|slice| Arc::new(slice.to_vec())), + page_offsets: page_offsets, } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index cd53ca4eec2c..63c8c32d3dfa 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -590,6 +590,8 @@ impl RowGroupReaderBuilder { ) }) { self.row_group_offset_index(row_group_idx) + .and_then(|columns| columns.first()) + .map(|column| column.page_locations()) } else { None }; @@ -609,7 +611,8 @@ impl RowGroupReaderBuilder { .build_array_reader(self.fields.as_deref(), &self.projection) }?; - let reader = ParquetRecordBatchReader::new(array_reader, plan, page_offsets); + let reader = + ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) } RowGroupDecoderState::Finished => { From 691919662f3a55d1ca47c2ac03383fdae64b9734 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Mon, 12 Jan 2026 13:42:26 -0500 Subject: [PATCH 07/32] Fix clippy --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e09334b2d8e6..76d0b99ae0dc 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1417,7 +1417,7 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(schema), read_plan, - page_offsets: page_offsets, + page_offsets, } } From 87e21d908bf3ead2209b6a40926c4a8201a0e59a Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Wed, 14 Jan 2026 12:48:38 -0500 Subject: [PATCH 08/32] Only add page_offsets if the policy is bitmask --- .../src/arrow/push_decoder/reader_builder/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 63c8c32d3dfa..bf728139579e 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -24,6 +24,7 @@ use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::{ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, + selection::RowSelectionStrategy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; @@ -583,12 +584,14 @@ impl RowGroupReaderBuilder { // before plan is build below // check if plan is bitmask and if it is, put it in a variable - let page_offsets = if plan_builder.selection().is_some_and(|selection| { - selection.requires_page_aware_mask( - &self.projection, - self.row_group_offset_index(row_group_idx), - ) - }) { + let page_offsets = if plan_builder.resolve_selection_strategy() + == RowSelectionStrategy::Mask + && plan_builder.selection().is_some_and(|selection| { + selection.requires_page_aware_mask( + &self.projection, + self.row_group_offset_index(row_group_idx), + ) + }) { self.row_group_offset_index(row_group_idx) .and_then(|columns| columns.first()) .map(|column| column.page_locations()) From b41a94b76bd2fe590e615438043d6d434c768421 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Wed, 14 Jan 2026 12:49:34 -0500 Subject: [PATCH 09/32] Add assert row values to end to end tests --- parquet/src/arrow/arrow_reader/mod.rs | 8 ++++++++ parquet/src/arrow/async_reader/mod.rs | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 3a879bfa0953..503caaa7b680 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5622,6 +5622,14 @@ pub(crate) mod tests { let batches = reader.collect::, _>>().unwrap(); let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); + assert_eq!( + result.column(0).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); + assert_eq!( + result.column(1).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); } #[test] diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 60f2ca1615a3..18112a8e8247 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1288,6 +1288,14 @@ mod tests { let batches: Vec<_> = stream.try_collect().await.unwrap(); let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); + assert_eq!( + result.column(0).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); + assert_eq!( + result.column(1).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); } #[tokio::test] From 700d550c6da6f5eb9ca24f50a2403230ab29ff46 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Wed, 14 Jan 2026 12:50:55 -0500 Subject: [PATCH 10/32] Add cursor page awarness test --- parquet/src/arrow/arrow_reader/selection.rs | 43 +++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c88fe2ca6d40..ecba96362646 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -1107,6 +1107,49 @@ mod tests { ); } + #[test] + fn test_mask_cursor_page_aware_chunking() { + let selectors = vec![RowSelector::skip(2), RowSelector::select(10)]; + let mask = boolean_mask_from_selectors(&selectors); + let mut cursor = MaskCursor { mask, position: 0 }; + + let pages = vec![ + PageLocation { + offset: 0, + compressed_page_size: 1, + first_row_index: 0, + }, + PageLocation { + offset: 1, + compressed_page_size: 1, + first_row_index: 4, + }, + PageLocation { + offset: 2, + compressed_page_size: 1, + first_row_index: 8, + }, + PageLocation { + offset: 3, + compressed_page_size: 1, + first_row_index: 12, + }, + ]; + // First chunk is page 1 + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + assert_eq!(chunk.initial_skip, 2); + assert_eq!(chunk.mask_start, 2); + assert_eq!(chunk.chunk_rows, 2); + assert_eq!(chunk.selected_rows, 2); + + // Second chunk is page 2 + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + assert_eq!(chunk.initial_skip, 0); + assert_eq!(chunk.mask_start, 4); + assert_eq!(chunk.chunk_rows, 4); + assert_eq!(chunk.selected_rows, 4); + } + #[test] fn test_and() { let mut a = RowSelection::from(vec![ From 8d658b00699d330cc7df6fb1d18f94f92dfa926c Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 15 Jan 2026 08:22:04 +0800 Subject: [PATCH 11/32] Add more tests --- parquet/src/arrow/arrow_reader/mod.rs | 237 ++++++++++++++++++++++++- parquet/src/arrow/async_reader/mod.rs | 245 +++++++++++++++++++++++++- 2 files changed, 467 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 76d0b99ae0dc..9411dbf09243 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5409,14 +5409,18 @@ pub(crate) mod tests { } #[test] - fn test_row_filter_full_page_skip_is_handled() { + fn test_page_aware_mask_handles_page_skip() { + // Test that when using Auto policy with page-aware bitmask, the reader + // correctly handles pages that are skipped entirely due to row filtering. + // + // This creates a file with 12 rows across 6 pages (2 rows per page). + // After filtering, only the first and last rows remain, skipping 4 pages + // in the middle. The page-aware mask should handle this correctly by + // respecting page boundaries during chunk iteration. let first_value: i64 = 1111; let last_value: i64 = 9999; let num_rows: usize = 12; - // build data with row selection average length 4 - // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) - // The Row Selection would be [1111, (skip 10), 9999] let schema = Arc::new(Schema::new(vec![ Field::new("key", arrow_schema::DataType::Int64, false), Field::new("value", arrow_schema::DataType::Int64, false), @@ -5462,8 +5466,8 @@ pub(crate) mod tests { let options = ArrowReaderOptions::new().with_page_index(true); let predicate = make_predicate(filter_mask.clone()); - // The batch size is set to 12 to read all rows in one go after filtering - // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + // Using Auto policy with page index enabled - page-aware mask should handle + // the skipped pages correctly without panicking. let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) @@ -5472,14 +5476,231 @@ pub(crate) mod tests { .build() .unwrap(); - // Predicate pruning used to panic once mask-backed plans removed whole pages. - // Collecting into batches validates the plan now downgrades to selectors instead. let schema = reader.schema().clone(); let batches = reader.collect::, _>>().unwrap(); let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); } + /// Test that bitmask-based row selection correctly handles page boundaries. + /// This test creates a parquet file with multiple small pages and verifies that + /// when using Mask policy, pages that are skipped entirely are handled correctly. + #[test] + fn test_bitmask_page_aware_selection() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 20; + + // Create a file with 20 rows, ~2 rows per page = 10 pages + // Selection will be: first row, skip middle rows, last row + // This forces the reader to handle skipped pages correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("key", arrow_schema::DataType::Int64, false), + Field::new("value", arrow_schema::DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + // Configure small pages to create multiple page boundaries + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let predicate = make_predicate(filter_mask.clone()); + + // Use Mask policy explicitly to test page-aware behavior + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + + // Should have exactly 2 rows: first and last + assert_eq!(result.num_rows(), 2); + + // Verify the values + let key_col = result.column(0).as_primitive::(); + assert_eq!(key_col.value(0), first_value); + assert_eq!(key_col.value(1), last_value); + } + + /// Test that page-aware bitmask handles edge cases at exact page boundaries. + /// When mask_start aligns exactly with a page boundary, verify correct behavior. + #[test] + fn test_bitmask_at_page_boundary() { + let num_rows: usize = 10; + let rows_per_page: usize = 2; + + // Create a file with 10 rows, 2 rows per page = 5 pages + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + arrow_schema::DataType::Int64, + false, + )])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Create a selection that starts exactly at page boundaries + // Select rows 0-1 (page 0), skip 2-5 (pages 1-2), select 6-9 (pages 3-4) + let selection = RowSelection::from(vec![ + RowSelector::select(2), // Page 0: rows 0-1 + RowSelector::skip(4), // Pages 1-2: rows 2-5 + RowSelector::select(4), // Pages 3-4: rows 6-9 + ]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 4 = 6 rows + assert_eq!(total_rows, 6); + + // Verify values from the concatenated result + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 0, 1, 6, 7, 8, 9 + assert_eq!(id_col.value(0), 0); + assert_eq!(id_col.value(1), 1); + assert_eq!(id_col.value(2), 6); + assert_eq!(id_col.value(3), 7); + assert_eq!(id_col.value(4), 8); + assert_eq!(id_col.value(5), 9); + } + + /// Test that page-aware bitmask handles mid-page selections correctly. + /// The selection starts in the middle of a page and ends in the middle of another. + #[test] + fn test_bitmask_mid_page_selection() { + let num_rows: usize = 12; + let rows_per_page: usize = 3; + + // Create a file with 12 rows, 3 rows per page = 4 pages + // Page 0: rows 0-2 + // Page 1: rows 3-5 + // Page 2: rows 6-8 + // Page 3: rows 9-11 + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + arrow_schema::DataType::Int64, + false, + )])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10 + let selection = RowSelection::from(vec![ + RowSelector::skip(1), // Skip row 0 + RowSelector::select(2), // Select rows 1-2 (partial page 0) + RowSelector::skip(5), // Skip rows 3-7 (pages 1, part of 2) + RowSelector::select(3), // Select rows 8-10 (part of pages 2-3) + RowSelector::skip(1), // Skip row 11 + ]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 3 = 5 rows + assert_eq!(total_rows, 5); + + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 1, 2, 8, 9, 10 + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 8); + assert_eq!(id_col.value(3), 9); + assert_eq!(id_col.value(4), 10); + } + #[test] fn test_get_row_group_column_bloom_filter_with_length() { // convert to new parquet file with bloom_filter_length diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 60f2ca1615a3..a183e75ffa85 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -782,7 +782,7 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; - use arrow_array::types::Int32Type; + use arrow_array::types::{Int32Type, Int64Type}; use arrow_array::{ Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, @@ -1214,15 +1214,19 @@ mod tests { assert_eq!(actual_rows, expected_rows); } + /// Test that when using Auto policy with page-aware bitmask, the reader + /// correctly handles pages that are skipped entirely due to row filtering. + /// + /// This creates a file with 12 rows across 6 pages (2 rows per page). + /// After filtering, only the first and last rows remain, skipping 4 pages + /// in the middle. The page-aware mask should handle this correctly by + /// respecting page boundaries during chunk iteration. #[tokio::test] - async fn test_row_filter_full_page_skip_is_handled_async() { + async fn test_page_aware_mask_handles_page_skip_async() { let first_value: i64 = 1111; let last_value: i64 = 9999; let num_rows: usize = 12; - // build data with row selection average length 4 - // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) - // The Row Selection would be [1111, (skip 10), 9999] let schema = Arc::new(Schema::new(vec![ Field::new("key", DataType::Int64, false), Field::new("value", DataType::Int64, false), @@ -1270,8 +1274,8 @@ mod tests { let predicate = make_predicate(filter_mask.clone()); - // The batch size is set to 12 to read all rows in one go after filtering - // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + // Using Auto policy with page index enabled - page-aware mask should handle + // the skipped pages correctly without panicking. let stream = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), ArrowReaderOptions::new().with_page_index(true), @@ -2308,4 +2312,231 @@ mod tests { Ok(()) } + + /// Test that bitmask-based row selection correctly handles page boundaries. + /// This test creates a parquet file with multiple small pages and verifies that + /// when using Mask policy, pages that are skipped entirely are handled correctly. + #[tokio::test] + async fn test_bitmask_page_aware_selection_async() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 20; + + // Create a file with 20 rows, ~2 rows per page = 10 pages + // Selection will be: first row, skip middle rows, last row + // This forces the reader to handle skipped pages correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + // Configure small pages to create multiple page boundaries + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let predicate = make_predicate(filter_mask.clone()); + + // Use Mask policy explicitly to test page-aware behavior + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + + // Should have exactly 2 rows: first and last + assert_eq!(result.num_rows(), 2); + + // Verify the values + let key_col = result.column(0).as_primitive::(); + assert_eq!(key_col.value(0), first_value); + assert_eq!(key_col.value(1), last_value); + } + + /// Test that page-aware bitmask handles edge cases at exact page boundaries. + /// When mask_start aligns exactly with a page boundary, verify correct behavior. + #[tokio::test] + async fn test_bitmask_at_page_boundary_async() { + let num_rows: usize = 10; + let rows_per_page: usize = 2; + + // Create a file with 10 rows, 2 rows per page = 5 pages + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(id_array) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Create a selection that starts exactly at page boundaries + // Select rows 0-1 (page 0), skip 2-5 (pages 1-2), select 6-9 (pages 3-4) + let selection = RowSelection::from(vec![ + RowSelector::select(2), // Page 0: rows 0-1 + RowSelector::skip(4), // Pages 1-2: rows 2-5 + RowSelector::select(4), // Pages 3-4: rows 6-9 + ]); + + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 4 = 6 rows + assert_eq!(total_rows, 6); + + // Verify values from the concatenated result + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 0, 1, 6, 7, 8, 9 + assert_eq!(id_col.value(0), 0); + assert_eq!(id_col.value(1), 1); + assert_eq!(id_col.value(2), 6); + assert_eq!(id_col.value(3), 7); + assert_eq!(id_col.value(4), 8); + assert_eq!(id_col.value(5), 9); + } + + /// Test that page-aware bitmask handles mid-page selections correctly. + /// The selection starts in the middle of a page and ends in the middle of another. + #[tokio::test] + async fn test_bitmask_mid_page_selection_async() { + let num_rows: usize = 12; + let rows_per_page: usize = 3; + + // Create a file with 12 rows, 3 rows per page = 4 pages + // Page 0: rows 0-2 + // Page 1: rows 3-5 + // Page 2: rows 6-8 + // Page 3: rows 9-11 + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(id_array) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10 + let selection = RowSelection::from(vec![ + RowSelector::skip(1), // Skip row 0 + RowSelector::select(2), // Select rows 1-2 (partial page 0) + RowSelector::skip(5), // Skip rows 3-7 (pages 1, part of 2) + RowSelector::select(3), // Select rows 8-10 (part of pages 2-3) + RowSelector::skip(1), // Skip row 11 + ]); + + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 3 = 5 rows + assert_eq!(total_rows, 5); + + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 1, 2, 8, 9, 10 + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 8); + assert_eq!(id_col.value(3), 9); + assert_eq!(id_col.value(4), 10); + } } From c1876e451bcc45a741c5a8ad6f019ffecf80b494 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 15 Jan 2026 10:37:09 -0500 Subject: [PATCH 12/32] cargo fmt --- parquet/src/arrow/arrow_reader/mod.rs | 10 ++++------ parquet/src/arrow/async_reader/mod.rs | 14 ++++---------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 8f664f7cdcda..c896a7b817ed 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5730,9 +5730,8 @@ pub(crate) mod tests { let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); - let batch = - RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) - .unwrap(); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); let props = WriterProperties::builder() .set_write_batch_size(rows_per_page) @@ -5802,9 +5801,8 @@ pub(crate) mod tests { let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); - let batch = - RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) - .unwrap(); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); let props = WriterProperties::builder() .set_write_batch_size(rows_per_page) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index d25a6f51039b..3cea952d3e58 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -2419,11 +2419,8 @@ mod tests { let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(id_array) as ArrayRef], - ) - .unwrap(); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); let props = WriterProperties::builder() .set_write_batch_size(rows_per_page) @@ -2492,11 +2489,8 @@ mod tests { let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(id_array) as ArrayRef], - ) - .unwrap(); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); let props = WriterProperties::builder() .set_write_batch_size(rows_per_page) From 6639ac76aa9af49f2a54eece4bdf0603e125c679 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 22 Jan 2026 14:13:58 -0500 Subject: [PATCH 13/32] fix clippy --- parquet/src/arrow/arrow_reader/mod.rs | 8 ++++---- parquet/src/arrow/async_reader/mod.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 86f12b97d416..2c81c130bccf 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5730,7 +5730,7 @@ pub(crate) mod tests { writer.close().unwrap(); let data = Bytes::from(buffer); - let options = ArrowReaderOptions::new().with_page_index(true); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); let schema = builder.parquet_schema().clone(); @@ -5745,7 +5745,7 @@ pub(crate) mod tests { }) }; - let options = ArrowReaderOptions::new().with_page_index(true); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); let predicate = make_predicate(filter_mask.clone()); // Use Mask policy explicitly to test page-aware behavior @@ -5808,7 +5808,7 @@ pub(crate) mod tests { RowSelector::select(4), // Pages 3-4: rows 6-9 ]); - let options = ArrowReaderOptions::new().with_page_index(true); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_selection(selection.clone()) @@ -5880,7 +5880,7 @@ pub(crate) mod tests { RowSelector::skip(1), // Skip row 11 ]); - let options = ArrowReaderOptions::new().with_page_index(true); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_selection(selection.clone()) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 34f8c3517292..36c5b96a4b19 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -2517,7 +2517,7 @@ mod tests { let builder = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index(true), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), ) .await .unwrap(); @@ -2538,7 +2538,7 @@ mod tests { // Use Mask policy explicitly to test page-aware behavior let stream = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index(true), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), ) .await .unwrap() @@ -2597,7 +2597,7 @@ mod tests { let stream = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index(true), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), ) .await .unwrap() @@ -2668,7 +2668,7 @@ mod tests { let stream = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index(true), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), ) .await .unwrap() From 2fb401ffdc9d7d9dc2fc3bdcf63b763430c3be54 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 22 Jan 2026 14:50:11 -0500 Subject: [PATCH 14/32] Fix PageIndexPolicy::from() to Required --- parquet/src/arrow/arrow_reader/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2c81c130bccf..833a46476c73 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5730,7 +5730,7 @@ pub(crate) mod tests { writer.close().unwrap(); let data = Bytes::from(buffer); - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); let schema = builder.parquet_schema().clone(); @@ -5745,7 +5745,7 @@ pub(crate) mod tests { }) }; - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let predicate = make_predicate(filter_mask.clone()); // Use Mask policy explicitly to test page-aware behavior @@ -5808,7 +5808,7 @@ pub(crate) mod tests { RowSelector::select(4), // Pages 3-4: rows 6-9 ]); - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_selection(selection.clone()) @@ -5880,7 +5880,7 @@ pub(crate) mod tests { RowSelector::skip(1), // Skip row 11 ]); - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(true)); + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_selection(selection.clone()) From d200ee977fa75e83958be2846e7f14db3258f883 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 23 Jan 2026 16:44:33 -0500 Subject: [PATCH 15/32] Change page_aware logic --- parquet/src/arrow/arrow_reader/mod.rs | 18 +++------ parquet/src/arrow/arrow_reader/read_plan.rs | 25 ++++++++++++- parquet/src/arrow/arrow_reader/selection.rs | 37 +++++++++++++------ parquet/src/arrow/async_reader/mod.rs | 2 - .../arrow/push_decoder/reader_builder/mod.rs | 23 +++++++----- 5 files changed, 68 insertions(+), 37 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 833a46476c73..3bd5c6ef15a4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,7 +44,6 @@ use crate::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetStatisticsPolicy, RowGroupMetaData, }; -use crate::file::page_index::offset_index::PageLocation; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -1236,7 +1235,7 @@ impl ParquetRecordBatchReaderBuilder { .build_limited() .build(); - Ok(ParquetRecordBatchReader::new(array_reader, read_plan, None)) + Ok(ParquetRecordBatchReader::new(array_reader, read_plan)) } } @@ -1338,7 +1337,6 @@ pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, read_plan: ReadPlan, - page_offsets: Option>, } impl Debug for ParquetRecordBatchReader { @@ -1373,14 +1371,16 @@ impl ParquetRecordBatchReader { if batch_size == 0 { return Ok(None); } + let offset_index_metadata = self.read_plan.offset_index_metadata(); match self.read_plan.row_selection_cursor_mut() { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - let page_locations = self.page_offsets.as_deref(); + let page_locations = offset_index_metadata; while !mask_cursor.is_empty() { - let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations) + let Some(mask_chunk) = + mask_cursor.next_mask_chunk(batch_size, page_locations.as_deref()) else { return Ok(None); }; @@ -1548,18 +1548,13 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), read_plan, - page_offsets: None, }) } /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - pub(crate) fn new( - array_reader: Box, - read_plan: ReadPlan, - page_offsets: Option>, - ) -> Self { + pub(crate) fn new(array_reader: Box, read_plan: ReadPlan) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), @@ -1569,7 +1564,6 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(schema), read_plan, - page_offsets, } } diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index c8171f61b1a0..4075ec6fe1c7 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -25,9 +25,11 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; +use std::sync::Arc; /// A builder for [`ReadPlan`] #[derive(Clone, Debug)] @@ -37,6 +39,8 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, + /// Offset index metadata for each column chunk, used for page-aware mask chunking + offset_index_metadata: Option>, } impl ReadPlanBuilder { @@ -46,6 +50,7 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), + offset_index_metadata: None, } } @@ -148,7 +153,7 @@ impl ReadPlanBuilder { array_reader: Box, predicate: &mut dyn ArrowPredicate, ) -> Result { - let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build(), None); + let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; @@ -175,6 +180,15 @@ impl ReadPlanBuilder { Ok(self) } + /// Add offset index metadata for each column in a row group to this `ReadPlanBuilder` + pub fn with_offset_index_metadata( + mut self, + metadata: Option>, + ) -> Self { + self.offset_index_metadata = metadata; + self + } + /// Create a final `ReadPlan` the read plan for the scan pub fn build(mut self) -> ReadPlan { // If selection is empty, truncate @@ -189,6 +203,7 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, + offset_index_metadata, } = self; let selection = selection.map(|s| s.trim()); @@ -209,6 +224,7 @@ impl ReadPlanBuilder { ReadPlan { batch_size, row_selection_cursor, + offset_index_metadata, } } } @@ -307,6 +323,8 @@ pub struct ReadPlan { batch_size: usize, /// Row ranges to be selected from the data source row_selection_cursor: RowSelectionCursor, + /// Offset index metadata for each column chunk + offset_index_metadata: Option>, } impl ReadPlan { @@ -330,6 +348,11 @@ impl ReadPlan { pub fn batch_size(&self) -> usize { self.batch_size } + + /// Return the offset index metadata for the columns in this row group + pub fn offset_index_metadata(&self) -> Option> { + self.offset_index_metadata.clone() + } } #[cfg(test)] diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index ecba96362646..b4925224804b 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -779,11 +779,11 @@ impl MaskCursor { } /// Advance through the mask representation, producing the next chunk summary. - /// Optionally clips chunk boundaries to page boundaries. + /// Optionally clips chunk boundaries to the nearest page boundary across columns. pub fn next_mask_chunk( &mut self, batch_size: usize, - page_locations: Option<&[PageLocation]>, + offset_index_metadata: Option<&[OffsetIndexMetaData]>, ) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -806,14 +806,23 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - let max_chunk_rows = page_locations - .and_then(|pages| { - let next_idx = - pages.partition_point(|loc| loc.first_row_index as usize <= mask_start); - pages.get(next_idx).and_then(|loc| { - let page_start = loc.first_row_index as usize; - (page_start > mask_start).then_some(page_start - mask_start) - }) + let max_chunk_rows = offset_index_metadata + .and_then(|columns| { + columns + .iter() + .filter_map(|column| { + let pages = column.page_locations(); + if pages.is_empty() { + return None; + } + let next_idx = pages + .partition_point(|loc| loc.first_row_index as usize <= mask_start); + pages.get(next_idx).and_then(|loc| { + let page_start = loc.first_row_index as usize; + (page_start > mask_start).then_some(page_start - mask_start) + }) + }) + .min() }) .unwrap_or(usize::MAX); @@ -1135,15 +1144,19 @@ mod tests { first_row_index: 12, }, ]; + let columns = vec![OffsetIndexMetaData { + page_locations: pages, + unencoded_byte_array_data_bytes: None, + }]; // First chunk is page 1 - let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&columns)).unwrap(); assert_eq!(chunk.initial_skip, 2); assert_eq!(chunk.mask_start, 2); assert_eq!(chunk.chunk_rows, 2); assert_eq!(chunk.selected_rows, 2); // Second chunk is page 2 - let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&columns)).unwrap(); assert_eq!(chunk.initial_skip, 0); assert_eq!(chunk.mask_start, 4); assert_eq!(chunk.chunk_rows, 4); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 36c5b96a4b19..91796d9eadd9 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -2332,8 +2332,6 @@ mod tests { /// Regression test for adaptive predicate pushdown attempting to read skipped pages. /// Related issue: https://github.com/apache/arrow-rs/issues/9239 #[tokio::test] - /// TODO: Remove should_panic once the bug is fixed - #[should_panic(expected = "Invalid offset in sparse column chunk data")] async fn test_predicate_pushdown_with_skipped_pages() { use arrow_array::TimestampNanosecondArray; use arrow_schema::TimeUnit; diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index bf728139579e..c819cc38983b 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -240,9 +240,13 @@ impl RowGroupReaderBuilder { "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}" ))); } + let offset_index_metadata = self + .row_group_offset_index(row_group_idx) + .map(|columns| columns.to_vec().into()); let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) - .with_row_selection_policy(self.row_selection_policy); + .with_row_selection_policy(self.row_selection_policy) + .with_offset_index_metadata(offset_index_metadata); let row_group_info = RowGroupInfo { row_group_idx, @@ -582,9 +586,8 @@ impl RowGroupReaderBuilder { &mut self.buffers, )?; - // before plan is build below - // check if plan is bitmask and if it is, put it in a variable - let page_offsets = if plan_builder.resolve_selection_strategy() + // For mask-based selection, attach offset index metadata for page-aware chunking. + let offset_index_metadata = if plan_builder.resolve_selection_strategy() == RowSelectionStrategy::Mask && plan_builder.selection().is_some_and(|selection| { selection.requires_page_aware_mask( @@ -593,12 +596,13 @@ impl RowGroupReaderBuilder { ) }) { self.row_group_offset_index(row_group_idx) - .and_then(|columns| columns.first()) - .map(|column| column.page_locations()) + .map(|columns| columns.to_vec().into()) } else { None }; + let plan_builder = plan_builder.with_offset_index_metadata(offset_index_metadata); + let plan = plan_builder.build(); // if we have any cached results, connect them up @@ -614,8 +618,7 @@ impl RowGroupReaderBuilder { .build_array_reader(self.fields.as_deref(), &self.projection) }?; - let reader = - ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); + let reader = ParquetRecordBatchReader::new(array_reader, plan); NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) } RowGroupDecoderState::Finished => { @@ -656,7 +659,7 @@ impl RowGroupReaderBuilder { mask.without_nested_types(self.metadata.file_metadata().schema_descr()) } - /// Get the offset index for the specified row group, if any + /// Get the column offset indexes for the specified row group, if any fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { self.metadata .offset_index() @@ -673,6 +676,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 216); } } From 717a1cbc38b27c59a9af2f4f90326982a37d86b5 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 23 Jan 2026 18:03:11 -0500 Subject: [PATCH 16/32] Make mask use page offsets with smallest pages --- parquet/src/arrow/arrow_reader/mod.rs | 4 +-- parquet/src/arrow/arrow_reader/read_plan.rs | 32 +++++++++++------- parquet/src/arrow/arrow_reader/selection.rs | 37 +++++++-------------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 3bd5c6ef15a4..9bce05536f16 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1371,12 +1371,12 @@ impl ParquetRecordBatchReader { if batch_size == 0 { return Ok(None); } - let offset_index_metadata = self.read_plan.offset_index_metadata(); + let page_locations = self.read_plan.page_locations(); match self.read_plan.row_selection_cursor_mut() { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - let page_locations = offset_index_metadata; + let page_locations = page_locations; while !mask_cursor.is_empty() { let Some(mask_chunk) = diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 4075ec6fe1c7..869cbe090e4f 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -25,7 +25,7 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; @@ -39,8 +39,8 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, - /// Offset index metadata for each column chunk, used for page-aware mask chunking - offset_index_metadata: Option>, + /// Precomputed page locations for mask chunking + page_locations: Option>, } impl ReadPlanBuilder { @@ -50,7 +50,7 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), - offset_index_metadata: None, + page_locations: None, } } @@ -185,7 +185,15 @@ impl ReadPlanBuilder { mut self, metadata: Option>, ) -> Self { - self.offset_index_metadata = metadata; + self.page_locations = metadata + .as_ref() + .and_then(|columns| { + columns + .iter() + .filter(|column| !column.page_locations().is_empty()) + .max_by_key(|column| column.page_locations().len()) + .map(|column| column.page_locations().clone().into()) + }); self } @@ -203,7 +211,7 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, - offset_index_metadata, + page_locations: _, } = self; let selection = selection.map(|s| s.trim()); @@ -224,7 +232,7 @@ impl ReadPlanBuilder { ReadPlan { batch_size, row_selection_cursor, - offset_index_metadata, + page_locations: self.page_locations, } } } @@ -323,8 +331,8 @@ pub struct ReadPlan { batch_size: usize, /// Row ranges to be selected from the data source row_selection_cursor: RowSelectionCursor, - /// Offset index metadata for each column chunk - offset_index_metadata: Option>, + /// Precomputed page locations for mask chunking + page_locations: Option>, } impl ReadPlan { @@ -349,9 +357,9 @@ impl ReadPlan { self.batch_size } - /// Return the offset index metadata for the columns in this row group - pub fn offset_index_metadata(&self) -> Option> { - self.offset_index_metadata.clone() + /// Return the page locations used for mask chunking + pub fn page_locations(&self) -> Option> { + self.page_locations.clone() } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index b4925224804b..ecba96362646 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -779,11 +779,11 @@ impl MaskCursor { } /// Advance through the mask representation, producing the next chunk summary. - /// Optionally clips chunk boundaries to the nearest page boundary across columns. + /// Optionally clips chunk boundaries to page boundaries. pub fn next_mask_chunk( &mut self, batch_size: usize, - offset_index_metadata: Option<&[OffsetIndexMetaData]>, + page_locations: Option<&[PageLocation]>, ) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -806,23 +806,14 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - let max_chunk_rows = offset_index_metadata - .and_then(|columns| { - columns - .iter() - .filter_map(|column| { - let pages = column.page_locations(); - if pages.is_empty() { - return None; - } - let next_idx = pages - .partition_point(|loc| loc.first_row_index as usize <= mask_start); - pages.get(next_idx).and_then(|loc| { - let page_start = loc.first_row_index as usize; - (page_start > mask_start).then_some(page_start - mask_start) - }) - }) - .min() + let max_chunk_rows = page_locations + .and_then(|pages| { + let next_idx = + pages.partition_point(|loc| loc.first_row_index as usize <= mask_start); + pages.get(next_idx).and_then(|loc| { + let page_start = loc.first_row_index as usize; + (page_start > mask_start).then_some(page_start - mask_start) + }) }) .unwrap_or(usize::MAX); @@ -1144,19 +1135,15 @@ mod tests { first_row_index: 12, }, ]; - let columns = vec![OffsetIndexMetaData { - page_locations: pages, - unencoded_byte_array_data_bytes: None, - }]; // First chunk is page 1 - let chunk = cursor.next_mask_chunk(100, Some(&columns)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); assert_eq!(chunk.initial_skip, 2); assert_eq!(chunk.mask_start, 2); assert_eq!(chunk.chunk_rows, 2); assert_eq!(chunk.selected_rows, 2); // Second chunk is page 2 - let chunk = cursor.next_mask_chunk(100, Some(&columns)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); assert_eq!(chunk.initial_skip, 0); assert_eq!(chunk.mask_start, 4); assert_eq!(chunk.chunk_rows, 4); From 03c8bdb5692868ff893c75ad9c338f6617c3e739 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 23 Jan 2026 18:04:43 -0500 Subject: [PATCH 17/32] cargo fmt --- parquet/src/arrow/arrow_reader/read_plan.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 869cbe090e4f..da1c32cb8a19 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -185,15 +185,13 @@ impl ReadPlanBuilder { mut self, metadata: Option>, ) -> Self { - self.page_locations = metadata - .as_ref() - .and_then(|columns| { - columns - .iter() - .filter(|column| !column.page_locations().is_empty()) - .max_by_key(|column| column.page_locations().len()) - .map(|column| column.page_locations().clone().into()) - }); + self.page_locations = metadata.as_ref().and_then(|columns| { + columns + .iter() + .filter(|column| !column.page_locations().is_empty()) + .max_by_key(|column| column.page_locations().len()) + .map(|column| column.page_locations().clone().into()) + }); self } From 1158af0db87059f2f258e1fb35f515ee5605468c Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 23 Jan 2026 18:10:27 -0500 Subject: [PATCH 18/32] clippy --- parquet/src/arrow/arrow_reader/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 9bce05536f16..9697b87dc9ca 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1376,8 +1376,6 @@ impl ParquetRecordBatchReader { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - let page_locations = page_locations; - while !mask_cursor.is_empty() { let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations.as_deref()) From d260f943a83e12eab2ca0d5aaf4d5c8a2ddcf83d Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Sun, 25 Jan 2026 14:05:55 -0500 Subject: [PATCH 19/32] Use all page boundaries to cap next mask chunk --- parquet/src/arrow/arrow_reader/mod.rs | 4 +- parquet/src/arrow/arrow_reader/read_plan.rs | 45 ++++++++++++------- parquet/src/arrow/arrow_reader/selection.rs | 26 ++++++----- .../arrow/push_decoder/reader_builder/mod.rs | 10 ++++- 4 files changed, 53 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 9697b87dc9ca..d5731194ec0a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1371,14 +1371,14 @@ impl ParquetRecordBatchReader { if batch_size == 0 { return Ok(None); } - let page_locations = self.read_plan.page_locations(); + let page_boundaries = self.read_plan.page_boundaries(); match self.read_plan.row_selection_cursor_mut() { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. while !mask_cursor.is_empty() { let Some(mask_chunk) = - mask_cursor.next_mask_chunk(batch_size, page_locations.as_deref()) + mask_cursor.next_mask_chunk(batch_size, page_boundaries.as_deref()) else { return Ok(None); }; diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index da1c32cb8a19..482eeb5334d3 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -18,6 +18,7 @@ //! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read //! from a Parquet file +use crate::arrow::ProjectionMask; use crate::arrow::array_reader::ArrayReader; use crate::arrow::arrow_reader::selection::RowSelectionPolicy; use crate::arrow::arrow_reader::selection::RowSelectionStrategy; @@ -25,7 +26,7 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; -use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; @@ -39,8 +40,8 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, - /// Precomputed page locations for mask chunking - page_locations: Option>, + /// Precomputed page boundary row indices for mask chunking + page_boundaries: Option>, } impl ReadPlanBuilder { @@ -50,7 +51,7 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), - page_locations: None, + page_boundaries: None, } } @@ -181,16 +182,28 @@ impl ReadPlanBuilder { } /// Add offset index metadata for each column in a row group to this `ReadPlanBuilder` + /// + /// The computed page boundaries only include columns in the provided `projection`. pub fn with_offset_index_metadata( mut self, metadata: Option>, + projection: &ProjectionMask, ) -> Self { - self.page_locations = metadata.as_ref().and_then(|columns| { - columns + self.page_boundaries = metadata.as_ref().map(|columns| { + let mut boundaries: Vec = columns .iter() - .filter(|column| !column.page_locations().is_empty()) - .max_by_key(|column| column.page_locations().len()) - .map(|column| column.page_locations().clone().into()) + .enumerate() + .filter(|(idx, _)| projection.leaf_included(*idx)) + .flat_map(|(_, column)| { + column + .page_locations() + .iter() + .map(|loc| loc.first_row_index as usize) + }) + .collect(); + boundaries.sort_unstable(); + boundaries.dedup(); + boundaries.into() }); self } @@ -209,7 +222,7 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, - page_locations: _, + page_boundaries: _, } = self; let selection = selection.map(|s| s.trim()); @@ -230,7 +243,7 @@ impl ReadPlanBuilder { ReadPlan { batch_size, row_selection_cursor, - page_locations: self.page_locations, + page_boundaries: self.page_boundaries, } } } @@ -329,8 +342,8 @@ pub struct ReadPlan { batch_size: usize, /// Row ranges to be selected from the data source row_selection_cursor: RowSelectionCursor, - /// Precomputed page locations for mask chunking - page_locations: Option>, + /// Precomputed page boundary row indices for mask chunking + page_boundaries: Option>, } impl ReadPlan { @@ -355,9 +368,9 @@ impl ReadPlan { self.batch_size } - /// Return the page locations used for mask chunking - pub fn page_locations(&self) -> Option> { - self.page_locations.clone() + /// Return the page boundary row indices used for mask chunking + pub fn page_boundaries(&self) -> Option> { + self.page_boundaries.clone() } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index ecba96362646..5a375d211ea2 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -779,11 +779,11 @@ impl MaskCursor { } /// Advance through the mask representation, producing the next chunk summary. - /// Optionally clips chunk boundaries to page boundaries. + /// Optionally clips chunk boundaries to the next page boundary. pub fn next_mask_chunk( &mut self, batch_size: usize, - page_locations: Option<&[PageLocation]>, + page_boundaries: Option<&[usize]>, ) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -806,14 +806,12 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - let max_chunk_rows = page_locations - .and_then(|pages| { - let next_idx = - pages.partition_point(|loc| loc.first_row_index as usize <= mask_start); - pages.get(next_idx).and_then(|loc| { - let page_start = loc.first_row_index as usize; - (page_start > mask_start).then_some(page_start - mask_start) - }) + let max_chunk_rows = page_boundaries + .and_then(|boundaries| { + let next_idx = boundaries.partition_point(|&start| start <= mask_start); + boundaries + .get(next_idx) + .and_then(|&start| (start > mask_start).then_some(start - mask_start)) }) .unwrap_or(usize::MAX); @@ -1135,15 +1133,19 @@ mod tests { first_row_index: 12, }, ]; + let boundaries: Vec = pages + .iter() + .map(|loc| loc.first_row_index as usize) + .collect(); // First chunk is page 1 - let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&boundaries)).unwrap(); assert_eq!(chunk.initial_skip, 2); assert_eq!(chunk.mask_start, 2); assert_eq!(chunk.chunk_rows, 2); assert_eq!(chunk.selected_rows, 2); // Second chunk is page 2 - let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + let chunk = cursor.next_mask_chunk(100, Some(&boundaries)).unwrap(); assert_eq!(chunk.initial_skip, 0); assert_eq!(chunk.mask_start, 4); assert_eq!(chunk.chunk_rows, 4); diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index c819cc38983b..82678746f333 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -246,7 +246,7 @@ impl RowGroupReaderBuilder { let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) .with_row_selection_policy(self.row_selection_policy) - .with_offset_index_metadata(offset_index_metadata); + .with_offset_index_metadata(offset_index_metadata, &self.projection); let row_group_info = RowGroupInfo { row_group_idx, @@ -441,6 +441,11 @@ impl RowGroupReaderBuilder { .with_parquet_metadata(&self.metadata) .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let offset_index_metadata = self + .row_group_offset_index(row_group_idx) + .map(|columns| columns.to_vec().into()); + plan_builder = plan_builder + .with_offset_index_metadata(offset_index_metadata, predicate.projection()); plan_builder = plan_builder.with_predicate(array_reader, filter_info.current_mut())?; @@ -601,7 +606,8 @@ impl RowGroupReaderBuilder { None }; - let plan_builder = plan_builder.with_offset_index_metadata(offset_index_metadata); + let plan_builder = plan_builder + .with_offset_index_metadata(offset_index_metadata, &self.projection); let plan = plan_builder.build(); From eea54f1e4caff3d032db9ebe89e5a58d4ae34d50 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 26 Jan 2026 13:20:32 -0500 Subject: [PATCH 20/32] fix clippy --- parquet/src/arrow/arrow_reader/selection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 5a375d211ea2..b1a57d258911 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -1111,7 +1111,7 @@ mod tests { let mask = boolean_mask_from_selectors(&selectors); let mut cursor = MaskCursor { mask, position: 0 }; - let pages = vec![ + let pages = [ PageLocation { offset: 0, compressed_page_size: 1, From a4b04c912d3f904dcf92894e81f11f576929fcb5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 26 Jan 2026 13:48:54 -0500 Subject: [PATCH 21/32] Add documentation --- parquet/src/arrow/arrow_reader/mod.rs | 51 ++++++++++++++++++++------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d5731194ec0a..8799641fc3e9 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5691,9 +5691,22 @@ pub(crate) mod tests { let last_value: i64 = 9999; let num_rows: usize = 20; - // Create a file with 20 rows, ~2 rows per page = 10 pages - // Selection will be: first row, skip middle rows, last row - // This forces the reader to handle skipped pages correctly + // Create a file with 20 rows, 2 rows per page = 10 pages. + // Selection will be: first row, skip middle rows, last row. + // + // Diagram (rows grouped by 2 per page): + // Page 0: [ 0, 1] -> select 0 + // Page 1: [ 2, 3] -> skip + // Page 2: [ 4, 5] -> skip + // Page 3: [ 6, 7] -> skip + // Page 4: [ 8, 9] -> skip + // Page 5: [10, 11] -> skip + // Page 6: [12, 13] -> skip + // Page 7: [14, 15] -> skip + // Page 8: [16, 17] -> skip + // Page 9: [18, 19] -> select 19 + // + // This forces the reader to handle skipped pages correctly while using Mask policy. let schema = Arc::new(Schema::new(vec![ Field::new("key", arrow_schema::DataType::Int64, false), Field::new("value", arrow_schema::DataType::Int64, false), @@ -5769,7 +5782,7 @@ pub(crate) mod tests { let num_rows: usize = 10; let rows_per_page: usize = 2; - // Create a file with 10 rows, 2 rows per page = 5 pages + // Create a file with 10 rows, 2 rows per page = 5 pages. let schema = Arc::new(Schema::new(vec![Field::new( "id", arrow_schema::DataType::Int64, @@ -5792,8 +5805,14 @@ pub(crate) mod tests { writer.close().unwrap(); let data = Bytes::from(buffer); - // Create a selection that starts exactly at page boundaries - // Select rows 0-1 (page 0), skip 2-5 (pages 1-2), select 6-9 (pages 3-4) + // Create a selection that starts exactly at page boundaries. + // + // Diagram (rows grouped by 2 per page): + // Page 0: [0, 1] -> select (rows 0-1) + // Page 1: [2, 3] -> skip + // Page 2: [4, 5] -> skip + // Page 3: [6, 7] -> select + // Page 4: [8, 9] -> select let selection = RowSelection::from(vec![ RowSelector::select(2), // Page 0: rows 0-1 RowSelector::skip(4), // Pages 1-2: rows 2-5 @@ -5836,11 +5855,13 @@ pub(crate) mod tests { let num_rows: usize = 12; let rows_per_page: usize = 3; - // Create a file with 12 rows, 3 rows per page = 4 pages - // Page 0: rows 0-2 - // Page 1: rows 3-5 - // Page 2: rows 6-8 - // Page 3: rows 9-11 + // Create a file with 12 rows, 3 rows per page = 4 pages. + // + // Diagram (rows grouped by 3 per page): + // Page 0: [0, 1, 2] + // Page 1: [3, 4, 5] + // Page 2: [6, 7, 8] + // Page 3: [9, 10, 11] let schema = Arc::new(Schema::new(vec![Field::new( "id", arrow_schema::DataType::Int64, @@ -5863,7 +5884,13 @@ pub(crate) mod tests { writer.close().unwrap(); let data = Bytes::from(buffer); - // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10 + // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10. + // + // Diagram (K=keep, S=skip): + // Page 0: [S, K, K] -> rows 0-2 + // Page 1: [S, S, S] -> rows 3-5 + // Page 2: [S, S, K] -> rows 6-8 + // Page 3: [K, K, S] -> rows 9-11 let selection = RowSelection::from(vec![ RowSelector::skip(1), // Skip row 0 RowSelector::select(2), // Select rows 1-2 (partial page 0) From 27174e3113d5ffe9232671c05fb83ad7359cc0d9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 26 Jan 2026 14:58:05 -0500 Subject: [PATCH 22/32] Reduce test replication --- parquet/src/arrow/arrow_reader/mod.rs | 102 +++++++++++++------------- 1 file changed, 49 insertions(+), 53 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 8799641fc3e9..d518c5dbd875 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1579,7 +1579,7 @@ pub(crate) mod tests { use std::fs::File; use std::io::Seek; use std::path::PathBuf; - use std::sync::Arc; + use std::sync::{Arc, OnceLock}; use rand::rngs::StdRng; use rand::{Rng, RngCore, SeedableRng, random, rng}; @@ -5600,6 +5600,38 @@ pub(crate) mod tests { c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); } + /// Write the keys and values to a Parquet file with two columns named "key" + /// and "value" with specified properties. + fn write_key_values_to_parquet(keys: &[i64], values: &[i64], props: WriterProperties) -> Bytes { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", arrow_schema::DataType::Int64, false), + Field::new("value", arrow_schema::DataType::Int64, false), + ])); + + let keys_array = Int64Array::from(keys.to_vec()); + let values_array = Int64Array::from(values.to_vec()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(keys_array) as ArrayRef, + Arc::new(values_array) as ArrayRef, + ], + ) + .unwrap(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + Bytes::from(buffer) + } + + fn read_to_batch(reader: ParquetRecordBatchReader) -> RecordBatch { + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + concat_batches(&schema, &batches).unwrap() + } + #[test] fn test_page_aware_mask_handles_page_skip() { // Test that when using Auto policy with page-aware bitmask, the reader @@ -5613,32 +5645,16 @@ pub(crate) mod tests { let last_value: i64 = 9999; let num_rows: usize = 12; - let schema = Arc::new(Schema::new(vec![ - Field::new("key", arrow_schema::DataType::Int64, false), - Field::new("value", arrow_schema::DataType::Int64, false), - ])); - let mut int_values: Vec = (0..num_rows as i64).collect(); int_values[0] = first_value; int_values[num_rows - 1] = last_value; - let keys = Int64Array::from(int_values.clone()); - let values = Int64Array::from(int_values.clone()); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], - ) - .unwrap(); let props = WriterProperties::builder() .set_write_batch_size(2) .set_data_page_row_count_limit(2) .build(); - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - let data = Bytes::from(buffer); + let data = write_key_values_to_parquet(&int_values, &int_values, props); let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let builder = @@ -5646,6 +5662,7 @@ pub(crate) mod tests { let schema = builder.parquet_schema().clone(); let filter_mask = ProjectionMask::leaves(&schema, [0]); + // Predicate: key == first_value or key == last_value let make_predicate = |mask: ProjectionMask| { ArrowPredicateFn::new(mask, move |batch: RecordBatch| { let column = batch.column(0); @@ -5668,9 +5685,7 @@ pub(crate) mod tests { .build() .unwrap(); - let schema = reader.schema().clone(); - let batches = reader.collect::, _>>().unwrap(); - let result = concat_batches(&schema, &batches).unwrap(); + let result = read_to_batch(reader); assert_eq!(result.num_rows(), 2); assert_eq!( result.column(0).as_ref(), @@ -5707,33 +5722,16 @@ pub(crate) mod tests { // Page 9: [18, 19] -> select 19 // // This forces the reader to handle skipped pages correctly while using Mask policy. - let schema = Arc::new(Schema::new(vec![ - Field::new("key", arrow_schema::DataType::Int64, false), - Field::new("value", arrow_schema::DataType::Int64, false), - ])); - let mut int_values: Vec = (0..num_rows as i64).collect(); int_values[0] = first_value; int_values[num_rows - 1] = last_value; - let keys = Int64Array::from(int_values.clone()); - let values = Int64Array::from(int_values.clone()); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], - ) - .unwrap(); // Configure small pages to create multiple page boundaries let props = WriterProperties::builder() .set_write_batch_size(2) .set_data_page_row_count_limit(2) .build(); - - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - let data = Bytes::from(buffer); + let data = write_key_values_to_parquet(&int_values, &int_values, props); let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); let builder = @@ -5761,10 +5759,7 @@ pub(crate) mod tests { .with_row_selection_policy(RowSelectionPolicy::Mask) .build() .unwrap(); - - let schema = reader.schema().clone(); - let batches = reader.collect::, _>>().unwrap(); - let result = concat_batches(&schema, &batches).unwrap(); + let result = read_to_batch(reader); // Should have exactly 2 rows: first and last assert_eq!(result.num_rows(), 2); @@ -5775,6 +5770,15 @@ pub(crate) mod tests { assert_eq!(key_col.value(1), last_value); } + /// Single column schema with int64 id + fn id_schema() -> SchemaRef { + static ID_SCHEMA: OnceLock = OnceLock::new(); + Arc::clone(ID_SCHEMA.get_or_init(|| { + let field = Field::new("id", arrow_schema::DataType::Int64, false); + Arc::new(Schema::new(vec![field])) + })) + } + /// Test that page-aware bitmask handles edge cases at exact page boundaries. /// When mask_start aligns exactly with a page boundary, verify correct behavior. #[test] @@ -5783,11 +5787,7 @@ pub(crate) mod tests { let rows_per_page: usize = 2; // Create a file with 10 rows, 2 rows per page = 5 pages. - let schema = Arc::new(Schema::new(vec![Field::new( - "id", - arrow_schema::DataType::Int64, - false, - )])); + let schema = id_schema(); let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); @@ -5862,11 +5862,7 @@ pub(crate) mod tests { // Page 1: [3, 4, 5] // Page 2: [6, 7, 8] // Page 3: [9, 10, 11] - let schema = Arc::new(Schema::new(vec![Field::new( - "id", - arrow_schema::DataType::Int64, - false, - )])); + let schema = id_schema(); let ids: Vec = (0..num_rows as i64).collect(); let id_array = Int64Array::from(ids); From f619b8f77394e2748394d38578439214ef6ca717 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 27 Jan 2026 11:55:00 -0500 Subject: [PATCH 23/32] Remove redundant sync test --- parquet/src/arrow/arrow_reader/mod.rs | 65 --------------------------- 1 file changed, 65 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d518c5dbd875..b34f9ff5197c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5632,71 +5632,6 @@ pub(crate) mod tests { concat_batches(&schema, &batches).unwrap() } - #[test] - fn test_page_aware_mask_handles_page_skip() { - // Test that when using Auto policy with page-aware bitmask, the reader - // correctly handles pages that are skipped entirely due to row filtering. - // - // This creates a file with 12 rows across 6 pages (2 rows per page). - // After filtering, only the first and last rows remain, skipping 4 pages - // in the middle. The page-aware mask should handle this correctly by - // respecting page boundaries during chunk iteration. - let first_value: i64 = 1111; - let last_value: i64 = 9999; - let num_rows: usize = 12; - - let mut int_values: Vec = (0..num_rows as i64).collect(); - int_values[0] = first_value; - int_values[num_rows - 1] = last_value; - - let props = WriterProperties::builder() - .set_write_batch_size(2) - .set_data_page_row_count_limit(2) - .build(); - - let data = write_key_values_to_parquet(&int_values, &int_values, props); - - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); - let builder = - ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); - let schema = builder.parquet_schema().clone(); - let filter_mask = ProjectionMask::leaves(&schema, [0]); - - // Predicate: key == first_value or key == last_value - let make_predicate = |mask: ProjectionMask| { - ArrowPredicateFn::new(mask, move |batch: RecordBatch| { - let column = batch.column(0); - let match_first = eq(column, &Int64Array::new_scalar(first_value))?; - let match_second = eq(column, &Int64Array::new_scalar(last_value))?; - or(&match_first, &match_second) - }) - }; - - let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); - let predicate = make_predicate(filter_mask.clone()); - - // Using Auto policy with page index enabled - page-aware mask should handle - // the skipped pages correctly without panicking. - let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) - .unwrap() - .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) - .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 }) - .with_batch_size(12) - .build() - .unwrap(); - - let result = read_to_batch(reader); - assert_eq!(result.num_rows(), 2); - assert_eq!( - result.column(0).as_ref(), - &Int64Array::from(vec![first_value, last_value]) - ); - assert_eq!( - result.column(1).as_ref(), - &Int64Array::from(vec![first_value, last_value]) - ); - } - /// Test that bitmask-based row selection correctly handles page boundaries. /// This test creates a parquet file with multiple small pages and verifies that /// when using Mask policy, pages that are skipped entirely are handled correctly. From 0e0265f4e775ed66003140ad6c46ca6a3ac7a7fe Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 27 Jan 2026 11:59:13 -0500 Subject: [PATCH 24/32] Remove redundant async test --- parquet/src/arrow/async_reader/mod.rs | 89 +-------------------------- 1 file changed, 1 insertion(+), 88 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 91796d9eadd9..5bc46320751c 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1222,94 +1222,6 @@ mod tests { assert_eq!(actual_rows, expected_rows); } - /// Test that when using Auto policy with page-aware bitmask, the reader - /// correctly handles pages that are skipped entirely due to row filtering. - /// - /// This creates a file with 12 rows across 6 pages (2 rows per page). - /// After filtering, only the first and last rows remain, skipping 4 pages - /// in the middle. The page-aware mask should handle this correctly by - /// respecting page boundaries during chunk iteration. - #[tokio::test] - async fn test_page_aware_mask_handles_page_skip_async() { - let first_value: i64 = 1111; - let last_value: i64 = 9999; - let num_rows: usize = 12; - - let schema = Arc::new(Schema::new(vec![ - Field::new("key", DataType::Int64, false), - Field::new("value", DataType::Int64, false), - ])); - - let mut int_values: Vec = (0..num_rows as i64).collect(); - int_values[0] = first_value; - int_values[num_rows - 1] = last_value; - let keys = Int64Array::from(int_values.clone()); - let values = Int64Array::from(int_values.clone()); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], - ) - .unwrap(); - - let props = WriterProperties::builder() - .set_write_batch_size(2) - .set_data_page_row_count_limit(2) - .build(); - - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - let data = Bytes::from(buffer); - - let builder = ParquetRecordBatchStreamBuilder::new_with_options( - TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), - ) - .await - .unwrap(); - let schema = builder.parquet_schema().clone(); - let filter_mask = ProjectionMask::leaves(&schema, [0]); - - let make_predicate = |mask: ProjectionMask| { - ArrowPredicateFn::new(mask, move |batch: RecordBatch| { - let column = batch.column(0); - let match_first = eq(column, &Int64Array::new_scalar(first_value))?; - let match_second = eq(column, &Int64Array::new_scalar(last_value))?; - or(&match_first, &match_second) - }) - }; - - let predicate = make_predicate(filter_mask.clone()); - - // Using Auto policy with page index enabled - page-aware mask should handle - // the skipped pages correctly without panicking. - let stream = ParquetRecordBatchStreamBuilder::new_with_options( - TestReader::new(data.clone()), - ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), - ) - .await - .unwrap() - .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) - .with_batch_size(12) - .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 }) - .build() - .unwrap(); - - let schema = stream.schema().clone(); - let batches: Vec<_> = stream.try_collect().await.unwrap(); - let result = concat_batches(&schema, &batches).unwrap(); - assert_eq!(result.num_rows(), 2); - assert_eq!( - result.column(0).as_ref(), - &Int64Array::from(vec![first_value, last_value]) - ); - assert_eq!( - result.column(1).as_ref(), - &Int64Array::from(vec![first_value, last_value]) - ); - } - #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); @@ -2332,6 +2244,7 @@ mod tests { /// Regression test for adaptive predicate pushdown attempting to read skipped pages. /// Related issue: https://github.com/apache/arrow-rs/issues/9239 #[tokio::test] + async fn test_predicate_pushdown_with_skipped_pages() { use arrow_array::TimestampNanosecondArray; use arrow_schema::TimeUnit; From 9c142710e64dd5a2dcd829b54af249e5e5e6fe02 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Thu, 5 Feb 2026 11:26:43 -0500 Subject: [PATCH 25/32] clippy --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2fa44d060e05..eebb167f984a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1576,7 +1576,7 @@ pub(crate) mod tests { use std::fs::File; use std::io::Seek; use std::path::PathBuf; - use std::sync::{Arc, OnceLock}; + use std::sync::Arc; use rand::rngs::StdRng; use rand::{Rng, RngCore, SeedableRng, random, rng}; From 48cd54c82211601cea2333bdc53b5b5c3632cb86 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 10 Feb 2026 19:38:25 -0500 Subject: [PATCH 26/32] Add tests --- .../tests/arrow_reader/row_filter/async.rs | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) diff --git a/parquet/tests/arrow_reader/row_filter/async.rs b/parquet/tests/arrow_reader/row_filter/async.rs index 6fa616d714f1..bdd77807cd03 100644 --- a/parquet/tests/arrow_reader/row_filter/async.rs +++ b/parquet/tests/arrow_reader/row_filter/async.rs @@ -42,6 +42,7 @@ use parquet::{ metadata::{PageIndexPolicy, ParquetMetaDataReader}, properties::WriterProperties, }, + schema::types::ColumnPath, }; #[tokio::test] @@ -525,3 +526,268 @@ async fn test_predicate_pushdown_with_skipped_pages() { assert_eq!(batch.column(0).as_string(), &expected); } } + +/// Regression test for adaptive selection switching to mask materialization while +/// evaluating a predicate that references multiple columns with different page boundaries. +#[tokio::test] +async fn test_complex_predicate_pushdown_with_skipped_pages() { + const NUM_ROWS: usize = 240; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("int_flag", DataType::Int8, false), + Field::new("text_flag", DataType::Utf8, false), + ])); + + let huge_true = "T".repeat(128); + let huge_false = "F".repeat(128); + + let ids = Int32Array::from_iter_values(0..NUM_ROWS as i32); + let int_flag = Int8Array::from_iter_values((0..NUM_ROWS).map(|i| (i % 2 == 0) as i8)); + let text_flag = StringArray::from_iter_values((0..NUM_ROWS).map(|i| { + if i % 3 == 0 { + huge_true.as_str() + } else { + huge_false.as_str() + } + })); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(int_flag), Arc::new(text_flag)], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(1) + .set_data_page_size_limit(512) + .set_data_page_row_count_limit(128) + .set_column_data_page_size_limit(ColumnPath::from("text_flag"), 256) + .set_column_dictionary_enabled(ColumnPath::from("text_flag"), false) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), + ) + .await + .unwrap(); + + let schema_descr = builder.parquet_schema().clone(); + + // Start with a sparse RLE row selection, then apply a predicate that produces + // a dense, fragmented mask. + let selection = RowSelection::from(vec![ + RowSelector::select(60), + RowSelector::skip(120), + RowSelector::select(60), + ]); + + let huge_true_scalar = StringArray::from_iter_values([huge_true.as_str()]); + let predicate = + ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [1, 2]), move |batch| { + let int_true = eq(batch.column(0), &Int8Array::new_scalar(1))?; + let text_true = eq(batch.column(1), &Scalar::new(&huge_true_scalar))?; + or(&int_true, &text_true) + }); + + let stream = builder + .with_row_selection(selection) + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_projection(ProjectionMask::roots(&schema_descr, [0])) + .with_batch_size(NUM_ROWS) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + let result = concat_batches(&batches[0].schema(), &batches).unwrap(); + + let expected_ids = + Int32Array::from_iter_values((0..60).chain(180..240).filter(|i| i % 2 == 0 || i % 3 == 0)); + + assert_eq!(result.num_columns(), 1); + assert_eq!( + result + .column(0) + .as_primitive::(), + &expected_ids + ); +} + +/// Regression test for mask materialization when a manual selection skips full pages +/// and projected columns have different page boundaries. +#[tokio::test] +async fn test_mask_selection_projection_with_skipped_pages() { + const NUM_ROWS: usize = 240; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("int_flag", DataType::Int8, false), + Field::new("text_flag", DataType::Utf8, false), + ])); + + let huge_true = "T".repeat(128); + let huge_false = "F".repeat(128); + + let ids = Int32Array::from_iter_values(0..NUM_ROWS as i32); + let int_flag = Int8Array::from_iter_values((0..NUM_ROWS).map(|i| (i % 2 == 0) as i8)); + let text_flag = StringArray::from_iter_values((0..NUM_ROWS).map(|i| { + if i % 3 == 0 { + huge_true.as_str() + } else { + huge_false.as_str() + } + })); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(int_flag), Arc::new(text_flag)], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(1) + .set_data_page_size_limit(512) + .set_data_page_row_count_limit(128) + .set_column_data_page_size_limit(ColumnPath::from("text_flag"), 256) + .set_column_dictionary_enabled(ColumnPath::from("text_flag"), false) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), + ) + .await + .unwrap(); + + let schema_descr = builder.parquet_schema().clone(); + let selection = RowSelection::from(vec![ + RowSelector::select(60), + RowSelector::skip(120), + RowSelector::select(60), + ]); + + let stream = builder + .with_row_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .with_projection(ProjectionMask::roots(&schema_descr, [0, 2])) + .with_batch_size(NUM_ROWS) + .build() + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + let result = concat_batches(&batches[0].schema(), &batches).unwrap(); + + let expected_ids = + Int32Array::from_iter_values((0..60).chain(180..240).map(|v| v as i32)); + + assert_eq!(result.num_columns(), 2); + assert_eq!( + result + .column(0) + .as_primitive::(), + &expected_ids + ); + assert_eq!(result.num_rows(), expected_ids.len()); +} + +/// Regression test for mask materialization during filtering when predicates span +/// columns with different page boundaries. +#[tokio::test] +async fn test_mask_selection_multi_col_predicate_with_skipped_pages() { + const NUM_ROWS: usize = 240; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("int_flag", DataType::Int8, false), + Field::new("text_flag", DataType::Utf8, false), + ])); + + let huge_true = "T".repeat(128); + let huge_false = "F".repeat(128); + + let ids = Int32Array::from_iter_values(0..NUM_ROWS as i32); + let int_flag = Int8Array::from_iter_values((0..NUM_ROWS).map(|i| (i % 2 == 0) as i8)); + let text_flag = StringArray::from_iter_values((0..NUM_ROWS).map(|i| { + if i % 3 == 0 { + huge_true.as_str() + } else { + huge_false.as_str() + } + })); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(int_flag), Arc::new(text_flag)], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(1) + .set_data_page_size_limit(512) + .set_data_page_row_count_limit(128) + .set_column_data_page_size_limit(ColumnPath::from("text_flag"), 256) + .set_column_dictionary_enabled(ColumnPath::from("text_flag"), false) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data), + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required), + ) + .await + .unwrap(); + + let schema_descr = builder.parquet_schema().clone(); + let selection = RowSelection::from(vec![ + RowSelector::select(60), + RowSelector::skip(120), + RowSelector::select(60), + ]); + + let huge_true_scalar = StringArray::from_iter_values([huge_true.as_str()]); + let predicate = + ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [1, 2]), move |batch| { + let int_true = eq(batch.column(0), &Int8Array::new_scalar(1))?; + let text_true = eq(batch.column(1), &Scalar::new(&huge_true_scalar))?; + or(&int_true, &text_true) + }); + + let stream = builder + .with_row_selection(selection) + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_projection(ProjectionMask::roots(&schema_descr, [0])) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .with_batch_size(NUM_ROWS) + .build() + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + let result = concat_batches(&batches[0].schema(), &batches).unwrap(); + + let expected_ids = + Int32Array::from_iter_values((0..60).chain(180..240).filter(|i| i % 2 == 0 || i % 3 == 0)); + + assert_eq!(result.num_columns(), 1); + assert_eq!( + result + .column(0) + .as_primitive::(), + &expected_ids + ); +} From ddef11a3529501a813239f11c58d5d985f150a70 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 10 Feb 2026 19:41:23 -0500 Subject: [PATCH 27/32] cargo fmt --- parquet/tests/arrow_reader/row_filter/async.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet/tests/arrow_reader/row_filter/async.rs b/parquet/tests/arrow_reader/row_filter/async.rs index bdd77807cd03..9e1317fc5f06 100644 --- a/parquet/tests/arrow_reader/row_filter/async.rs +++ b/parquet/tests/arrow_reader/row_filter/async.rs @@ -689,8 +689,7 @@ async fn test_mask_selection_projection_with_skipped_pages() { let batches: Vec = stream.try_collect().await.unwrap(); let result = concat_batches(&batches[0].schema(), &batches).unwrap(); - let expected_ids = - Int32Array::from_iter_values((0..60).chain(180..240).map(|v| v as i32)); + let expected_ids = Int32Array::from_iter_values((0..60).chain(180..240).map(|v| v as i32)); assert_eq!(result.num_columns(), 2); assert_eq!( From eebc2f217de2c5cf60fcfd3936811d69839b8be5 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 10 Feb 2026 21:38:16 -0500 Subject: [PATCH 28/32] clippy --- parquet/tests/arrow_reader/row_filter/async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/row_filter/async.rs b/parquet/tests/arrow_reader/row_filter/async.rs index 9e1317fc5f06..ee3e2664a8ca 100644 --- a/parquet/tests/arrow_reader/row_filter/async.rs +++ b/parquet/tests/arrow_reader/row_filter/async.rs @@ -689,7 +689,7 @@ async fn test_mask_selection_projection_with_skipped_pages() { let batches: Vec = stream.try_collect().await.unwrap(); let result = concat_batches(&batches[0].schema(), &batches).unwrap(); - let expected_ids = Int32Array::from_iter_values((0..60).chain(180..240).map(|v| v as i32)); + let expected_ids = Int32Array::from_iter_values((0..60).chain(180..240)); assert_eq!(result.num_columns(), 2); assert_eq!( From 290abdf8c1d3e267db7bfd95498942118d9fd759 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Sun, 15 Feb 2026 19:19:04 -0500 Subject: [PATCH 29/32] Replace all page offsets with just the skipped ones --- parquet/src/arrow/arrow_reader/read_plan.rs | 29 +---- parquet/src/arrow/arrow_reader/selection.rs | 101 ++++++++++++++++++ .../arrow/push_decoder/reader_builder/mod.rs | 70 +++++++----- 3 files changed, 148 insertions(+), 52 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 482eeb5334d3..38f9d5b96585 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -18,7 +18,6 @@ //! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read //! from a Parquet file -use crate::arrow::ProjectionMask; use crate::arrow::array_reader::ArrayReader; use crate::arrow::arrow_reader::selection::RowSelectionPolicy; use crate::arrow::arrow_reader::selection::RowSelectionStrategy; @@ -26,7 +25,6 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; @@ -181,30 +179,9 @@ impl ReadPlanBuilder { Ok(self) } - /// Add offset index metadata for each column in a row group to this `ReadPlanBuilder` - /// - /// The computed page boundaries only include columns in the provided `projection`. - pub fn with_offset_index_metadata( - mut self, - metadata: Option>, - projection: &ProjectionMask, - ) -> Self { - self.page_boundaries = metadata.as_ref().map(|columns| { - let mut boundaries: Vec = columns - .iter() - .enumerate() - .filter(|(idx, _)| projection.leaf_included(*idx)) - .flat_map(|(_, column)| { - column - .page_locations() - .iter() - .map(|loc| loc.first_row_index as usize) - }) - .collect(); - boundaries.sort_unstable(); - boundaries.dedup(); - boundaries.into() - }); + /// Set page boundary rows directly for mask chunking + pub(crate) fn with_page_boundaries(mut self, boundaries: Option>) -> Self { + self.page_boundaries = boundaries; self } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index b1a57d258911..eb700df05bf8 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -271,6 +271,53 @@ impl RowSelection { }) } + /// Returns row offsets for the starts of skipped pages across projected columns + fn skipped_page_row_offsets( + &self, + projection: &ProjectionMask, + columns: &[OffsetIndexMetaData], + ) -> Vec { + let mut skipped_page_rows: Vec = Vec::new(); + + for (leaf_idx, column) in columns.iter().enumerate() { + if !projection.leaf_included(leaf_idx) { + continue; + } + + let locations = column.page_locations(); + if locations.is_empty() { + continue; + } + + let selected_ranges = self.scan_ranges(locations); + let mut selected_page_starts: Vec = + selected_ranges.iter().map(|r| r.start).collect(); + selected_page_starts.sort_unstable(); + + for page in locations { + let page_start = page.offset as u64; + if selected_page_starts.binary_search(&page_start).is_err() { + skipped_page_rows.push(page.first_row_index as usize); + } + } + } + + skipped_page_rows.sort_unstable(); + skipped_page_rows.dedup(); + skipped_page_rows + } + + /// Returns row offsets for skipped page starts when page-aware mask chunking is needed + pub(crate) fn page_aware_mask_boundaries( + &self, + projection: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, + ) -> Option> { + offset_index + .map(|columns| self.skipped_page_row_offsets(projection, columns)) + .filter(|offsets| !offsets.is_empty()) + } + /// Returns true if bitmasks should be page aware pub(crate) fn requires_page_aware_mask( &self, @@ -1599,6 +1646,60 @@ mod tests { assert_eq!(ranges, vec![10..20, 20..30, 30..40]); } + #[test] + fn test_page_aware_mask_boundaries_skipped_pages_only() { + let selection = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(20), + RowSelector::skip(20), + ]); + + let page_locations = vec![ + PageLocation { + offset: 0, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 10, + compressed_page_size: 10, + first_row_index: 10, + }, + PageLocation { + offset: 20, + compressed_page_size: 10, + first_row_index: 20, + }, + PageLocation { + offset: 30, + compressed_page_size: 10, + first_row_index: 30, + }, + PageLocation { + offset: 40, + compressed_page_size: 10, + first_row_index: 40, + }, + PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 50, + }, + ]; + + let offsets = selection.page_aware_mask_boundaries( + &ProjectionMask::all(), + Some(&[OffsetIndexMetaData { + page_locations, + unencoded_byte_array_data_bytes: None, + }]), + ); + + assert_eq!(offsets, Some(vec![0, 20, 50])); + } + #[test] fn test_from_ranges() { let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 7083f4fc2dbb..a78031f476ce 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -240,13 +240,9 @@ impl RowGroupReaderBuilder { "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}" ))); } - let offset_index_metadata = self - .row_group_offset_index(row_group_idx) - .map(|columns| columns.to_vec().into()); let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) - .with_row_selection_policy(self.row_selection_policy) - .with_offset_index_metadata(offset_index_metadata, &self.projection); + .with_row_selection_policy(self.row_selection_policy); let row_group_info = RowGroupInfo { row_group_idx, @@ -441,11 +437,13 @@ impl RowGroupReaderBuilder { .with_parquet_metadata(&self.metadata) .build_array_reader(self.fields.as_deref(), predicate.projection())?; - let offset_index_metadata = self - .row_group_offset_index(row_group_idx) - .map(|columns| columns.to_vec().into()); - plan_builder = plan_builder - .with_offset_index_metadata(offset_index_metadata, predicate.projection()); + let page_boundaries = self.compute_page_aware_boundaries( + row_group_idx, + plan_builder.selection(), + predicate.projection(), + plan_builder.resolve_selection_strategy() == RowSelectionStrategy::Mask, + ); + plan_builder = plan_builder.with_page_boundaries(page_boundaries); plan_builder = plan_builder.with_predicate(array_reader, filter_info.current_mut())?; @@ -591,23 +589,16 @@ impl RowGroupReaderBuilder { &mut self.buffers, )?; - // For mask-based selection, attach offset index metadata for page-aware chunking. - let offset_index_metadata = if plan_builder.resolve_selection_strategy() - == RowSelectionStrategy::Mask - && plan_builder.selection().is_some_and(|selection| { - selection.requires_page_aware_mask( - &self.projection, - self.row_group_offset_index(row_group_idx), - ) - }) { - self.row_group_offset_index(row_group_idx) - .map(|columns| columns.to_vec().into()) - } else { - None - }; + // For mask-based selection, chunk only at starts of skipped pages to + // coalesce contiguous selected and skipped page runs. + let page_boundaries = self.compute_page_aware_boundaries( + row_group_idx, + plan_builder.selection(), + &self.projection, + plan_builder.resolve_selection_strategy() == RowSelectionStrategy::Mask, + ); - let plan_builder = plan_builder - .with_offset_index_metadata(offset_index_metadata, &self.projection); + let plan_builder = plan_builder.with_page_boundaries(page_boundaries); let plan = plan_builder.build(); @@ -665,6 +656,33 @@ impl RowGroupReaderBuilder { mask.without_nested_types(self.metadata.file_metadata().schema_descr()) } + /// Compute page-aware mask chunk boundaries for the given projection, if needed. + fn compute_page_aware_boundaries( + &self, + row_group_idx: usize, + selection: Option<&RowSelection>, + projection: &ProjectionMask, + mask_strategy: bool, + ) -> Option> { + if !mask_strategy + || selection.is_none() + || self.row_group_offset_index(row_group_idx).is_none() + { + return None; + } + + selection + .and_then(|selection| { + let offset_index = self.row_group_offset_index(row_group_idx); + if selection.requires_page_aware_mask(projection, offset_index) { + selection.page_aware_mask_boundaries(projection, offset_index) + } else { + None + } + }) + .map(Arc::<[usize]>::from) + } + /// Get the column offset indexes for the specified row group, if any fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { self.metadata From 7563d6d610d5a7a0ef80d00997fe3986850a7907 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Mon, 16 Feb 2026 10:09:14 -0500 Subject: [PATCH 30/32] replace binary search with linear lookup --- parquet/src/arrow/arrow_reader/selection.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index eb700df05bf8..4303f4b9716c 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -290,13 +290,22 @@ impl RowSelection { } let selected_ranges = self.scan_ranges(locations); - let mut selected_page_starts: Vec = - selected_ranges.iter().map(|r| r.start).collect(); - selected_page_starts.sort_unstable(); - + let mut selected_idx = 0usize; for page in locations { let page_start = page.offset as u64; - if selected_page_starts.binary_search(&page_start).is_err() { + + while selected_idx < selected_ranges.len() + && selected_ranges[selected_idx].start < page_start + { + selected_idx += 1; + } + + let selected = selected_idx < selected_ranges.len() + && selected_ranges[selected_idx].start == page_start; + + if selected { + selected_idx += 1; + } else { skipped_page_rows.push(page.first_row_index as usize); } } From 2111012c67edac69da93c4f3315653df559338ec Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Mon, 16 Feb 2026 15:00:34 -0500 Subject: [PATCH 31/32] Make MaskCursor next skipped page aware --- parquet/src/arrow/arrow_reader/selection.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 4303f4b9716c..f261e8c5fee4 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -826,6 +826,9 @@ pub struct MaskCursor { mask: BooleanBuffer, /// Current absolute offset into the selection position: usize, + /// Index of the next page boundary candidate. This advances monotonically + /// as `position` advances. + next_boundary_idx: usize, } impl MaskCursor { @@ -864,9 +867,13 @@ impl MaskCursor { let max_chunk_rows = page_boundaries .and_then(|boundaries| { - let next_idx = boundaries.partition_point(|&start| start <= mask_start); + while self.next_boundary_idx < boundaries.len() + && boundaries[self.next_boundary_idx] <= mask_start + { + self.next_boundary_idx += 1; + } boundaries - .get(next_idx) + .get(self.next_boundary_idx) .and_then(|&start| (start > mask_start).then_some(start - mask_start)) }) .unwrap_or(usize::MAX); @@ -977,6 +984,7 @@ impl RowSelectionCursor { Self::Mask(MaskCursor { mask: boolean_mask_from_selectors(&selectors), position: 0, + next_boundary_idx: 0, }) } @@ -1165,7 +1173,11 @@ mod tests { fn test_mask_cursor_page_aware_chunking() { let selectors = vec![RowSelector::skip(2), RowSelector::select(10)]; let mask = boolean_mask_from_selectors(&selectors); - let mut cursor = MaskCursor { mask, position: 0 }; + let mut cursor = MaskCursor { + mask, + position: 0, + next_boundary_idx: 0, + }; let pages = [ PageLocation { From 8776f600806d50b8738b9a6478944e98430e2fb1 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Wed, 4 Mar 2026 15:08:46 -0500 Subject: [PATCH 32/32] remove redundant test --- parquet/src/arrow/arrow_reader/selection.rs | 51 --------------------- 1 file changed, 51 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index f261e8c5fee4..455ef849edaf 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -1169,57 +1169,6 @@ mod tests { ); } - #[test] - fn test_mask_cursor_page_aware_chunking() { - let selectors = vec![RowSelector::skip(2), RowSelector::select(10)]; - let mask = boolean_mask_from_selectors(&selectors); - let mut cursor = MaskCursor { - mask, - position: 0, - next_boundary_idx: 0, - }; - - let pages = [ - PageLocation { - offset: 0, - compressed_page_size: 1, - first_row_index: 0, - }, - PageLocation { - offset: 1, - compressed_page_size: 1, - first_row_index: 4, - }, - PageLocation { - offset: 2, - compressed_page_size: 1, - first_row_index: 8, - }, - PageLocation { - offset: 3, - compressed_page_size: 1, - first_row_index: 12, - }, - ]; - let boundaries: Vec = pages - .iter() - .map(|loc| loc.first_row_index as usize) - .collect(); - // First chunk is page 1 - let chunk = cursor.next_mask_chunk(100, Some(&boundaries)).unwrap(); - assert_eq!(chunk.initial_skip, 2); - assert_eq!(chunk.mask_start, 2); - assert_eq!(chunk.chunk_rows, 2); - assert_eq!(chunk.selected_rows, 2); - - // Second chunk is page 2 - let chunk = cursor.next_mask_chunk(100, Some(&boundaries)).unwrap(); - assert_eq!(chunk.initial_skip, 0); - assert_eq!(chunk.mask_start, 4); - assert_eq!(chunk.chunk_rows, 4); - assert_eq!(chunk.selected_rows, 4); - } - #[test] fn test_and() { let mut a = RowSelection::from(vec![