diff --git a/parquet/src/arrow/arrow_reader/dictionary_pruning.rs b/parquet/src/arrow/arrow_reader/dictionary_pruning.rs new file mode 100644 index 000000000000..f873bd7285a3 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/dictionary_pruning.rs @@ -0,0 +1,374 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dictionary page pruning for row filters +//! +//! When a predicate targets a dictionary-encoded column, we can evaluate the +//! predicate against the dictionary values before decoding any data pages. +//! If no dictionary values match, the entire column chunk can be skipped. + +use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::RowGroups; +use crate::arrow::arrow_reader::ArrowPredicate; +use crate::arrow::arrow_reader::filter::DictionaryPredicateResult; +use crate::arrow::in_memory_row_group::InMemoryRowGroup; +use crate::basic::{Encoding, Type as PhysicalType}; +use crate::errors::Result; +use crate::file::metadata::ParquetMetaData; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use std::sync::Arc; + +/// Try to prune a predicate using dictionary pages from the in-memory row group. +/// +/// Returns `Some(DictionaryPredicateResult)` if dictionary pruning was performed, +/// `None` if the column is not dictionary-encoded or pruning is not applicable. +pub(crate) fn try_dictionary_prune_in_memory( + predicate: &mut dyn ArrowPredicate, + row_group: &InMemoryRowGroup<'_>, + metadata: &ParquetMetaData, + fields: Option<&crate::arrow::schema::ParquetField>, +) -> Result> { + let projection = predicate.projection(); + let schema_descr = metadata.file_metadata().schema_descr(); + + // Only support single-column predicates + let col_idx = single_leaf_column(projection, schema_descr.num_columns()); + let Some(col_idx) = col_idx else { + return Ok(None); + }; + + let row_group_meta = metadata.row_group(row_group.row_group_idx); + let col_meta = row_group_meta.column(col_idx); + + // Only proceed if the column has a dictionary page + if col_meta.dictionary_page_offset().is_none() { + return Ok(None); + } + + // Only safe to prune if ALL data pages are dictionary-encoded. + // If some pages fell back to plain encoding, the dictionary doesn't + // cover all values and we can't safely skip based on dictionary alone. + if !is_all_dictionary_encoded(col_meta) { + return Ok(None); + } + + let physical_type = schema_descr.column(col_idx).physical_type(); + + // Only support BYTE_ARRAY and INT32/INT64 columns + if !matches!( + physical_type, + PhysicalType::BYTE_ARRAY | PhysicalType::INT32 | PhysicalType::INT64 + ) { + return Ok(None); + } + + // Get the arrow type for this column from the ParquetField tree. + // Only supports top-level primitive columns (not nested in structs/lists). + let Some(arrow_type) = fields.and_then(|f| find_top_level_leaf_arrow_type(f, col_idx)) else { + return Ok(None); + }; + + // Create a page reader for this column + let mut page_iter = row_group.column_chunks(col_idx)?; + let Some(page_reader) = page_iter.next() else { + return Ok(None); + }; + let mut page_reader = page_reader?; + + // Read the first page - should be the dictionary page + let first_page = page_reader.get_next_page()?; + let Some(page) = first_page else { + return Ok(None); + }; + + if !page.is_dictionary_page() { + return Ok(None); + } + + let crate::column::page::Page::DictionaryPage { + buf, num_values, .. + } = page + else { + return Ok(None); + }; + + // Decode PLAIN-encoded dictionary values based on physical type, + // then cast to the target arrow type if needed + let array: ArrayRef = match physical_type { + PhysicalType::BYTE_ARRAY => { + decode_plain_byte_array(&buf, num_values as usize, &arrow_type)? + } + PhysicalType::INT32 => decode_plain_int32_as(&buf, num_values as usize, &arrow_type)?, + PhysicalType::INT64 => decode_plain_int64_as(&buf, num_values as usize, &arrow_type)?, + _ => return Ok(None), + }; + + // Build a RecordBatch with the dictionary values using a synthetic field + let col_name = schema_descr.column(col_idx).name().to_string(); + let field = Field::new(&col_name, arrow_type, true); + let schema = Arc::new(Schema::new(vec![field])); + let batch = RecordBatch::try_new(schema, vec![array]).map_err(|e| { + crate::errors::ParquetError::General(format!("Failed to create dictionary batch: {}", e)) + })?; + + // Evaluate the predicate against dictionary values + let result = predicate.evaluate_dictionary(batch).map_err(|e| { + crate::errors::ParquetError::General(format!( + "Failed to evaluate dictionary predicate: {}", + e + )) + })?; + + Ok(Some(result)) +} + +/// Decode PLAIN-encoded BYTE_ARRAY values into a string/binary array +/// matching the target arrow type. +fn decode_plain_byte_array( + buf: &[u8], + num_values: usize, + arrow_type: &DataType, +) -> Result { + // Parse all byte array values + let mut values: Vec<&[u8]> = Vec::with_capacity(num_values); + let mut offset = 0; + for _ in 0..num_values { + if offset + 4 > buf.len() { + return Err(crate::errors::ParquetError::EOF( + "eof decoding dictionary byte array".into(), + )); + } + let len = u32::from_le_bytes(buf[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + if offset + len > buf.len() { + return Err(crate::errors::ParquetError::EOF( + "eof decoding dictionary byte array".into(), + )); + } + values.push(&buf[offset..offset + len]); + offset += len; + } + + match arrow_type { + DataType::Utf8View => { + let mut builder = arrow_array::builder::StringViewBuilder::with_capacity(num_values); + for v in &values { + // SAFETY: parquet BYTE_ARRAY dictionary values for string columns are valid UTF-8 + let s = unsafe { std::str::from_utf8_unchecked(v) }; + builder.append_value(s); + } + Ok(Arc::new(builder.finish())) + } + DataType::Utf8 | DataType::LargeUtf8 => { + let strs: Vec<&str> = values + .iter() + // SAFETY: parquet BYTE_ARRAY dictionary values for string columns are valid UTF-8 + .map(|v| unsafe { std::str::from_utf8_unchecked(v) }) + .collect(); + Ok(Arc::new(arrow_array::StringArray::from(strs))) + } + DataType::BinaryView => { + let mut builder = arrow_array::builder::BinaryViewBuilder::with_capacity(num_values); + for v in &values { + builder.append_value(v); + } + Ok(Arc::new(builder.finish())) + } + _ => { + // Default to BinaryArray for unknown types + let binary_values: Vec<&[u8]> = values; + Ok(Arc::new(arrow_array::BinaryArray::from(binary_values))) + } + } +} + +/// Check if all data pages in a column chunk are dictionary-encoded. +/// +/// Uses page encoding stats if available, otherwise falls back to checking +/// column-level encodings. Returns false if we can't determine conclusively. +fn is_all_dictionary_encoded(col_meta: &crate::file::metadata::ColumnChunkMetaData) -> bool { + // No dictionary page -> definitely not all dictionary encoded + if col_meta.dictionary_page_offset().is_none() { + return false; + } + + // Method 1: Use page encoding stats mask if available (most reliable) + if let Some(mask) = col_meta.page_encoding_stats_mask() { + return mask.is_only(Encoding::PLAIN_DICTIONARY) || mask.is_only(Encoding::RLE_DICTIONARY); + } + + // Method 1a: Use full page encoding stats if the mask form wasn't used + // (i.e. ParquetMetaDataOptions::with_encoding_stats_as_mask was set to false) + if let Some(stats) = col_meta.page_encoding_stats() { + return stats + .iter() + .filter(|s| { + matches!( + s.page_type, + crate::basic::PageType::DATA_PAGE | crate::basic::PageType::DATA_PAGE_V2 + ) + }) + .all(|s| { + matches!( + s.encoding, + Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY + ) + }); + } + + // Method 2: Check column-level encodings. + // Dictionary-encoded columns have PLAIN_DICTIONARY or RLE_DICTIONARY for data, + // plus PLAIN and/or RLE for definition/repetition levels. + // If PLAIN appears as a data encoding (fallback), the column also has + // PLAIN_DICTIONARY/RLE_DICTIONARY, so we'd see both. + // Safe heuristic: only dictionary + level encodings = all dict encoded. + let mut has_dict_data_encoding = false; + let mut has_non_dict_data_encoding = false; + for enc in col_meta.encodings() { + match enc { + Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { + has_dict_data_encoding = true; + } + // These are used for definition/repetition levels, not data + #[allow(deprecated)] + Encoding::RLE | Encoding::BIT_PACKED => {} + // PLAIN is ambiguous - used for def/rep levels in V1 pages AND + // as a fallback data encoding. We allow ONE PLAIN if there's also + // a dictionary encoding, since the PLAIN is likely for levels. + Encoding::PLAIN => { + // PLAIN is expected for def/rep levels. We can't distinguish + // from data fallback here, so we allow it if dict encoding exists. + // This is a heuristic that works for most files. + } + // Any other encoding (DELTA_*, etc.) means non-dictionary data + _ => { + has_non_dict_data_encoding = true; + } + } + } + + has_dict_data_encoding && !has_non_dict_data_encoding +} + +/// Decode PLAIN-encoded INT32 values, casting to the target arrow type +fn decode_plain_int32_as(buf: &[u8], num_values: usize, arrow_type: &DataType) -> Result { + if buf.len() < num_values * 4 { + return Err(crate::errors::ParquetError::EOF( + "eof decoding dictionary int32".into(), + )); + } + let values: Vec = (0..num_values) + .map(|i| i32::from_le_bytes(buf[i * 4..(i + 1) * 4].try_into().unwrap())) + .collect(); + match arrow_type { + DataType::Int8 => Ok(Arc::new(arrow_array::Int8Array::from( + values.into_iter().map(|v| v as i8).collect::>(), + ))), + DataType::Int16 => Ok(Arc::new(arrow_array::Int16Array::from( + values.into_iter().map(|v| v as i16).collect::>(), + ))), + DataType::UInt8 => Ok(Arc::new(arrow_array::UInt8Array::from( + values.into_iter().map(|v| v as u8).collect::>(), + ))), + DataType::UInt16 => Ok(Arc::new(arrow_array::UInt16Array::from( + values.into_iter().map(|v| v as u16).collect::>(), + ))), + DataType::UInt32 => Ok(Arc::new(arrow_array::UInt32Array::from( + values.into_iter().map(|v| v as u32).collect::>(), + ))), + _ => Ok(Arc::new(arrow_array::Int32Array::from(values))), + } +} + +/// Decode PLAIN-encoded INT64 values, casting to the target arrow type +fn decode_plain_int64_as(buf: &[u8], num_values: usize, arrow_type: &DataType) -> Result { + if buf.len() < num_values * 8 { + return Err(crate::errors::ParquetError::EOF( + "eof decoding dictionary int64".into(), + )); + } + let values: Vec = (0..num_values) + .map(|i| i64::from_le_bytes(buf[i * 8..(i + 1) * 8].try_into().unwrap())) + .collect(); + match arrow_type { + DataType::Int32 => Ok(Arc::new(arrow_array::Int32Array::from( + values.into_iter().map(|v| v as i32).collect::>(), + ))), + DataType::Timestamp(unit, tz) => { + use arrow_array::TimestampMicrosecondArray; + use arrow_array::TimestampMillisecondArray; + use arrow_array::TimestampNanosecondArray; + use arrow_array::TimestampSecondArray; + use arrow_schema::TimeUnit; + match unit { + TimeUnit::Second => Ok(Arc::new( + TimestampSecondArray::from(values).with_timezone_opt(tz.clone()), + )), + TimeUnit::Millisecond => Ok(Arc::new( + TimestampMillisecondArray::from(values).with_timezone_opt(tz.clone()), + )), + TimeUnit::Microsecond => Ok(Arc::new( + TimestampMicrosecondArray::from(values).with_timezone_opt(tz.clone()), + )), + TimeUnit::Nanosecond => Ok(Arc::new( + TimestampNanosecondArray::from(values).with_timezone_opt(tz.clone()), + )), + } + } + _ => Ok(Arc::new(arrow_array::Int64Array::from(values))), + } +} + +/// Find the arrow DataType for a specific leaf column index, but only if it's +/// a direct child of the root (not nested inside a struct/list/map). +/// Returns None for nested columns to avoid applying dictionary pruning to them. +fn find_top_level_leaf_arrow_type( + root: &crate::arrow::schema::ParquetField, + target_col_idx: usize, +) -> Option { + use crate::arrow::schema::ParquetFieldType; + // root must be a group (the schema root) + let ParquetFieldType::Group { children } = &root.field_type else { + return None; + }; + // Only check direct children — skip nested fields + for child in children { + if let ParquetFieldType::Primitive { col_idx, .. } = &child.field_type { + if *col_idx == target_col_idx { + return Some(child.arrow_type.clone()); + } + } + // If the child is a Group (struct/list), don't recurse — + // we only support top-level primitives for dictionary pruning + } + None +} + +/// Returns the single leaf column index if the projection mask selects exactly one leaf. +fn single_leaf_column(mask: &ProjectionMask, num_columns: usize) -> Option { + let mut found = None; + for i in 0..num_columns { + if mask.leaf_included(i) { + if found.is_some() { + return None; // more than one column + } + found = Some(i); + } + } + found +} diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 3fd5e1d650be..fa10c578802a 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -20,6 +20,17 @@ use arrow_array::{BooleanArray, RecordBatch}; use arrow_schema::ArrowError; use std::fmt::{Debug, Formatter}; +/// Result of evaluating a predicate against dictionary values +#[derive(Debug)] +pub enum DictionaryPredicateResult { + /// No dictionary values match the predicate - all rows can be skipped + AllFalse, + /// All dictionary values match the predicate - no need to evaluate per-row + AllTrue, + /// Some dictionary values match - must evaluate per-row as normal + Partial, +} + /// A predicate operating on [`RecordBatch`] /// /// See also: @@ -44,6 +55,31 @@ pub trait ArrowPredicate: Send + 'static { /// * `true`:the row should be returned /// * `false` or `null`: the row should not be returned fn evaluate(&mut self, batch: RecordBatch) -> Result; + + /// Evaluate this predicate against dictionary values for a column chunk. + /// + /// If the column chunk is dictionary-encoded, this method is called with a + /// [`RecordBatch`] containing the dictionary values (one row per distinct value). + /// + /// Returns a [`DictionaryPredicateResult`] indicating whether all rows can be + /// skipped, all rows pass, or per-row evaluation is needed. + /// + /// The default implementation calls [`Self::evaluate`] on the dictionary values + /// and checks if any/all values match. + fn evaluate_dictionary( + &mut self, + batch: RecordBatch, + ) -> Result { + let result = self.evaluate(batch)?; + let true_count = result.true_count(); + if true_count == 0 { + Ok(DictionaryPredicateResult::AllFalse) + } else if true_count == result.len() { + Ok(DictionaryPredicateResult::AllTrue) + } else { + Ok(DictionaryPredicateResult::Partial) + } + } } /// An [`ArrowPredicate`] created from an [`FnMut`] and a [`ProjectionMask`] diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..e87c5f6d518b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -21,7 +21,7 @@ use arrow_array::cast::AsArray; use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef}; use arrow_select::filter::filter_record_batch; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use filter::{ArrowPredicate, ArrowPredicateFn, DictionaryPredicateResult, RowFilter}; pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -51,6 +51,7 @@ use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; // Exposed so integration tests and benchmarks can temporarily override the threshold. pub use read_plan::{ReadPlan, ReadPlanBuilder}; +pub(crate) mod dictionary_pruning; mod filter; pub mod metrics; mod read_plan; @@ -5749,21 +5750,13 @@ pub(crate) mod tests { } let filter = use_filter.then(|| { - let filter = (0..metadata.file_metadata().num_rows()) - .map(|_| rng.random_bool(0.99)) - .collect::>(); - let mut filter_offset = 0; + // Stateless value-based filter: keep rows where value is odd. RowFilter::new(vec![Box::new(ArrowPredicateFn::new( ProjectionMask::all(), - move |b| { - let array = BooleanArray::from_iter( - filter - .iter() - .skip(filter_offset) - .take(b.num_rows()) - .map(|x| Some(*x)), - ); - filter_offset += b.num_rows(); + |b| { + let values = b.column(0).as_primitive::(); + let array: BooleanArray = + values.iter().map(|v| v.map(|v| v % 2 != 0)).collect(); Ok(array) }, ))]) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 99ffe0febc95..9b6941157bc6 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -92,6 +92,12 @@ impl ReadPlanBuilder { .unwrap_or(true) } + /// Deselect all rows (e.g. when dictionary pruning determines no rows match) + pub fn deselect_all(mut self) -> Self { + self.selection = Some(RowSelection::from(vec![])); + self + } + /// Returns the number of rows selected, or `None` if all rows are selected. pub fn num_rows_selected(&self) -> Option { self.selection.as_ref().map(|s| s.row_count()) diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d3d78ca7c263..b29358bdc903 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -420,42 +420,71 @@ impl RowGroupReaderBuilder { mut plan_builder, } = row_group_info; - let predicate = filter_info.current(); + // Get the projection before mutable borrow + let predicate_projection = filter_info.current().projection().clone(); let row_group = data_request.try_into_in_memory_row_group( row_group_idx, row_count, &self.metadata, - predicate.projection(), + &predicate_projection, &mut self.buffers, )?; - let cache_options = filter_info.cache_builder().producer(); - - let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) - .with_cache_options(Some(&cache_options)) - .with_parquet_metadata(&self.metadata) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; - - // Reset to original policy before each predicate so the override - // can detect page skipping for THIS predicate's columns. - // Without this reset, a prior predicate's override (e.g. Mask) - // carries forward and the check returns early, missing unfetched - // pages for subsequent predicates. - plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); + // Try dictionary pruning before decoding data pages + let dict_result = + crate::arrow::arrow_reader::dictionary_pruning::try_dictionary_prune_in_memory( + filter_info.current_mut(), + &row_group, + &self.metadata, + self.fields.as_deref(), + )?; + + let dict_action = match dict_result { + Some(crate::arrow::arrow_reader::DictionaryPredicateResult::AllFalse) => { + Some(false) // skip all rows + } + Some(crate::arrow::arrow_reader::DictionaryPredicateResult::AllTrue) => { + Some(true) // all rows pass, skip predicate eval + } + _ => None, // evaluate normally + }; - // Prepare to evaluate the filter. - // Note: first update the selection strategy to properly handle any pages - // pruned during fetch - plan_builder = override_selector_strategy_if_needed( - plan_builder, - predicate.projection(), - self.row_group_offset_index(row_group_idx), - ); - // `with_predicate` actually evaluates the filter + if dict_action == Some(false) { + // Dictionary tells us no rows match - deselect all rows + plan_builder = plan_builder.deselect_all(); + } else if dict_action == Some(true) { + // Dictionary tells us all rows match - skip predicate evaluation + // (selection unchanged, all rows pass this predicate) + } else { + let cache_options = filter_info.cache_builder().producer(); - plan_builder = - plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) + .with_parquet_metadata(&self.metadata) + .build_array_reader(self.fields.as_deref(), &predicate_projection)?; + + // Reset to original policy before each predicate so the override + // can detect page skipping for THIS predicate's columns. + // Without this reset, a prior predicate's override (e.g. Mask) + // carries forward and the check returns early, missing unfetched + // pages for subsequent predicates. + plan_builder = + plan_builder.with_row_selection_policy(self.row_selection_policy); + + // Prepare to evaluate the filter. + // Note: first update the selection strategy to properly handle any pages + // pruned during fetch + plan_builder = override_selector_strategy_if_needed( + plan_builder, + &predicate_projection, + self.row_group_offset_index(row_group_idx), + ); + // `with_predicate` actually evaluates the filter + + plan_builder = + plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + } let row_group_info = RowGroupInfo { row_group_idx,