From 169dfc1d2b9854e4fe094bc7a9f804548e05f640 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 18:32:42 -0600 Subject: [PATCH 1/9] disable array reader cache for nested fields --- parquet/src/arrow/array_reader/builder.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 818e06e8b81f..2c188ad62f90 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -153,7 +153,12 @@ impl<'a> ArrayReaderBuilder<'a> { return Ok(Some(reader)); }; - if cache_options.projection_mask.leaf_included(col_idx) { + // Skip caching for columns with nullable ancestors (def_level > 0) + // because CachedArrayReader doesn't support get_def_levels() yet. + // See: https://github.com/apache/arrow-rs/issues/XXXX + if cache_options.projection_mask.leaf_included(col_idx) + && field.def_level == 0 + { Ok(Some(Box::new(CachedArrayReader::new( reader, Arc::clone(cache_options.cache), From 5eb147e51d16439b0ec65c736d046b430c5b76c6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 19:22:07 -0600 Subject: [PATCH 2/9] try to fix cached array for nested columns --- parquet/src/arrow/array_reader/builder.rs | 7 +- .../arrow/array_reader/cached_array_reader.rs | 355 +++++++++++++++++- .../src/arrow/array_reader/row_group_cache.rs | 95 +++-- 3 files changed, 414 insertions(+), 43 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 2c188ad62f90..818e06e8b81f 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -153,12 +153,7 @@ impl<'a> ArrayReaderBuilder<'a> { return Ok(Some(reader)); }; - // Skip caching for columns with nullable ancestors (def_level > 0) - // because CachedArrayReader doesn't support get_def_levels() yet. - // See: https://github.com/apache/arrow-rs/issues/XXXX - if cache_options.projection_mask.leaf_included(col_idx) - && field.def_level == 0 - { + if cache_options.projection_mask.leaf_included(col_idx) { Ok(Some(Box::new(CachedArrayReader::new( reader, Arc::clone(cache_options.cache), diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 21f0c2afa410..36444b75586d 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -17,7 +17,7 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] -use crate::arrow::array_reader::row_group_cache::BatchID; +use crate::arrow::array_reader::row_group_cache::{BatchID, CachedBatch}; use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; @@ -84,9 +84,13 @@ pub struct CachedArrayReader { role: CacheRole, /// Local cache to store batches between read_records and consume_batch calls /// This ensures data is available even if the shared cache evicts items - local_cache: HashMap, + local_cache: HashMap, /// Statistics to report on the Cache behavior metrics: ArrowReaderMetrics, + /// Definition levels for the last consume_batch() output + def_levels_buffer: Option>, + /// Repetition levels for the last consume_batch() output + rep_levels_buffer: Option>, } impl CachedArrayReader { @@ -111,6 +115,8 @@ impl CachedArrayReader { role, local_cache: HashMap::new(), metrics, + def_levels_buffer: None, + rep_levels_buffer: None, } } @@ -146,6 +152,11 @@ impl CachedArrayReader { let array = self.inner.consume_batch()?; + // Capture definition and repetition levels from inner reader before they are cleared + let def_levels = self.inner.get_def_levels().map(|l| l.to_vec()); + let rep_levels = self.inner.get_rep_levels().map(|l| l.to_vec()); + let cached_batch = CachedBatch::with_levels(array, def_levels, rep_levels); + // Store in both shared cache and local cache // The shared cache is used to reuse results between readers // The local cache ensures data is available for our consume_batch call @@ -153,11 +164,11 @@ impl CachedArrayReader { self.shared_cache .write() .unwrap() - .insert(self.column_idx, batch_id, array.clone()); + .insert(self.column_idx, batch_id, cached_batch.clone()); // Note: if the shared cache is full (_cached == false), we continue without caching // The local cache will still store the data for this reader's use - self.local_cache.insert(batch_id, array); + self.local_cache.insert(batch_id, cached_batch); self.inner_position += read; Ok(read) @@ -200,8 +211,8 @@ impl ArrayReader for CachedArrayReader { let batch_id = self.get_batch_id_from_position(self.outer_position); // Check local cache first - let cached = if let Some(array) = self.local_cache.get(&batch_id) { - Some(Arc::clone(array)) + let cached = if let Some(batch) = self.local_cache.get(&batch_id) { + Some(batch.clone()) } else { // If not in local cache, i.e., we are consumer, check shared cache let cache_content = self @@ -209,16 +220,16 @@ impl ArrayReader for CachedArrayReader { .read() .unwrap() .get(self.column_idx, batch_id); - if let Some(array) = cache_content.as_ref() { + if let Some(batch) = cache_content.as_ref() { // Store in local cache for later use in consume_batch - self.local_cache.insert(batch_id, Arc::clone(array)); + self.local_cache.insert(batch_id, batch.clone()); } cache_content }; match cached { - Some(array) => { - let array_len = array.len(); + Some(batch) => { + let array_len = batch.array.len(); if array_len + batch_id.val * self.batch_size > self.outer_position { // the cache batch has some records that we can select let v = array_len + batch_id.val * self.batch_size - self.outer_position; @@ -270,6 +281,8 @@ impl ArrayReader for CachedArrayReader { fn consume_batch(&mut self) -> Result { let row_count = self.selections.len(); if row_count == 0 { + self.def_levels_buffer = None; + self.rep_levels_buffer = None; return Ok(new_empty_array(self.inner.get_data_type())); } @@ -281,6 +294,11 @@ impl ArrayReader for CachedArrayReader { let end_batch = (start_position + row_count - 1) / self.batch_size; let mut selected_arrays = Vec::new(); + let mut selected_def_levels: Vec = Vec::new(); + let mut selected_rep_levels: Vec = Vec::new(); + let mut has_def_levels = false; + let mut has_rep_levels = false; + for batch_id in start_batch..=end_batch { let batch_start = batch_id * self.batch_size; let batch_end = batch_start + self.batch_size - 1; @@ -302,19 +320,56 @@ impl ArrayReader for CachedArrayReader { continue; } - let mask_array = BooleanArray::from(mask); + let mask_array = BooleanArray::from(mask.clone()); // Read from local cache instead of shared cache to avoid cache eviction issues - let cached = self + let cached_batch = self .local_cache .get(&batch_id) .expect("data must be already cached in the read_records call, this is a bug"); - let cached = cached.slice(overlap_start - batch_start, selection_length); - let filtered = arrow_select::filter::filter(&cached, &mask_array)?; + + // Slice and filter the array + let slice_start = overlap_start - batch_start; + let cached_array = cached_batch.array.slice(slice_start, selection_length); + let filtered = arrow_select::filter::filter(&cached_array, &mask_array)?; selected_arrays.push(filtered); + + // Slice and filter definition levels if present + if let Some(ref def_levels) = cached_batch.def_levels { + has_def_levels = true; + let sliced = &def_levels[slice_start..slice_start + selection_length]; + for (level, selected) in sliced.iter().zip(mask.iter()) { + if selected { + selected_def_levels.push(*level); + } + } + } + + // Slice and filter repetition levels if present + if let Some(ref rep_levels) = cached_batch.rep_levels { + has_rep_levels = true; + let sliced = &rep_levels[slice_start..slice_start + selection_length]; + for (level, selected) in sliced.iter().zip(mask.iter()) { + if selected { + selected_rep_levels.push(*level); + } + } + } } self.selections = BooleanBufferBuilder::new(0); + // Store the filtered levels + self.def_levels_buffer = if has_def_levels { + Some(selected_def_levels) + } else { + None + }; + self.rep_levels_buffer = if has_rep_levels { + Some(selected_rep_levels) + } else { + None + }; + // Only remove batches from local buffer that are completely behind current position // Keep the current batch and any future batches as they might still be needed let current_batch_id = self.get_batch_id_from_position(self.outer_position); @@ -340,11 +395,11 @@ impl ArrayReader for CachedArrayReader { } fn get_def_levels(&self) -> Option<&[i16]> { - None // we don't allow nullable parent for now. + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - None + self.rep_levels_buffer.as_deref() } } @@ -759,4 +814,272 @@ mod tests { let int32_array = array.as_any().downcast_ref::().unwrap(); assert_eq!(int32_array.values(), &[1, 2, 3, 4]); } + + // Mock ArrayReader that returns definition and repetition levels + struct MockArrayReaderWithLevels { + data: Vec, + def_levels: Vec, + rep_levels: Vec, + position: usize, + records_to_consume: usize, + data_type: ArrowType, + // Buffers to hold levels after consume_batch + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + } + + impl MockArrayReaderWithLevels { + fn new(data: Vec, def_levels: Vec, rep_levels: Vec) -> Self { + assert_eq!(data.len(), def_levels.len()); + assert_eq!(data.len(), rep_levels.len()); + Self { + data, + def_levels, + rep_levels, + position: 0, + records_to_consume: 0, + data_type: ArrowType::Int32, + def_levels_buffer: None, + rep_levels_buffer: None, + } + } + } + + impl ArrayReader for MockArrayReaderWithLevels { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + let slice = &self.data[start..end]; + self.def_levels_buffer = Some(self.def_levels[start..end].to_vec()); + self.rep_levels_buffer = Some(self.rep_levels[start..end].to_vec()); + self.position = end; + self.records_to_consume = 0; + Ok(Arc::new(Int32Array::from(slice.to_vec()))) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } + } + + #[test] + fn test_level_propagation_basic() { + let metrics = ArrowReaderMetrics::disabled(); + // Data with corresponding def and rep levels + let data = vec![1, 2, 3, 4, 5]; + let def_levels = vec![1, 1, 0, 1, 1]; // Third value is null + let rep_levels = vec![0, 1, 1, 0, 1]; // New list at positions 0 and 3 + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + // Read all 5 records + let records_read = cached_reader.read_records(5).unwrap(); + assert_eq!(records_read, 5); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 5); + + // Verify levels are captured + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 1, 0, 1, 1]); + + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_some()); + assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 0, 1]); + } + + #[test] + fn test_level_propagation_with_skip() { + let metrics = ArrowReaderMetrics::disabled(); + // Data with corresponding def and rep levels + let data = vec![1, 2, 3, 4, 5, 6]; + let def_levels = vec![1, 1, 0, 1, 1, 0]; + let rep_levels = vec![0, 1, 1, 0, 1, 1]; + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(6, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + // Read 2 records + let records_read = cached_reader.read_records(2).unwrap(); + assert_eq!(records_read, 2); + + // Skip 2 records + let skipped = cached_reader.skip_records(2).unwrap(); + assert_eq!(skipped, 2); + + // Read 2 more records + let records_read = cached_reader.read_records(2).unwrap(); + assert_eq!(records_read, 2); + + let array = cached_reader.consume_batch().unwrap(); + // Should have 4 values: positions 0, 1 (read), positions 4, 5 (read) + // Positions 2, 3 were skipped + assert_eq!(array.len(), 4); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 5, 6]); + + // Verify levels match the selected values + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 1, 1, 0]); // def_levels for positions 0, 1, 4, 5 + + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_some()); + assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 1]); // rep_levels for positions 0, 1, 4, 5 + } + + #[test] + fn test_level_propagation_multi_batch() { + let metrics = ArrowReaderMetrics::disabled(); + // Data spanning multiple batches + let data = vec![1, 2, 3, 4, 5, 6]; + let def_levels = vec![1, 0, 1, 1, 0, 1]; + let rep_levels = vec![0, 0, 1, 0, 0, 1]; + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + // Read all 6 records (spanning 2 batches) + let records_read = cached_reader.read_records(6).unwrap(); + assert_eq!(records_read, 6); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 6); + + // Verify levels are correctly concatenated from both batches + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 0, 1, 1, 0, 1]); + + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_some()); + assert_eq!(rep_levels.unwrap(), &[0, 0, 1, 0, 0, 1]); + } + + #[test] + fn test_no_levels_returns_none() { + let metrics = ArrowReaderMetrics::disabled(); + // Use the original MockArrayReader which returns no levels + let mock_reader = MockArrayReader::new(vec![1, 2, 3]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + let _array = cached_reader.consume_batch().unwrap(); + + // Should return None since inner reader has no levels + assert!(cached_reader.get_def_levels().is_none()); + assert!(cached_reader.get_rep_levels().is_none()); + } + + #[test] + fn test_level_propagation_cache_sharing() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); + + // Producer populates cache with levels + let data = vec![1, 2, 3, 4, 5]; + let def_levels = vec![1, 0, 1, 1, 0]; + let rep_levels = vec![0, 1, 0, 1, 1]; + + let mock_reader = MockArrayReaderWithLevels::new( + data.clone(), + def_levels.clone(), + rep_levels.clone(), + ); + let mut producer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + // Producer reads and populates cache + producer.read_records(5).unwrap(); + producer.consume_batch().unwrap(); + + // Consumer reads from cache - should get the same levels + let mock_reader2 = MockArrayReaderWithLevels::new( + vec![10, 20, 30, 40, 50], // Different data (shouldn't be used) + vec![0, 0, 0, 0, 0], + vec![0, 0, 0, 0, 0], + ); + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader2), + cache.clone(), + 0, // Same column index + CacheRole::Consumer, + metrics, + ); + + consumer.read_records(5).unwrap(); + let array = consumer.consume_batch().unwrap(); + + // Should get original data from cache + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3, 4, 5]); + + // Should get original levels from cache + assert_eq!(consumer.get_def_levels().unwrap(), &[1, 0, 1, 1, 0]); + assert_eq!(consumer.get_rep_levels().unwrap(), &[0, 1, 0, 1, 1]); + } } diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs b/parquet/src/arrow/array_reader/row_group_cache.rs index ef726e16495f..5d1254f3fc67 100644 --- a/parquet/src/arrow/array_reader/row_group_cache.rs +++ b/parquet/src/arrow/array_reader/row_group_cache.rs @@ -19,6 +19,58 @@ use arrow_array::{Array, ArrayRef}; use arrow_schema::DataType; use std::collections::HashMap; +/// A cached batch containing array data and optional definition/repetition levels +#[derive(Debug, Clone)] +pub struct CachedBatch { + /// The decoded array data + pub array: ArrayRef, + /// Definition levels for nullability tracking in nested structures + pub def_levels: Option>, + /// Repetition levels for list boundary tracking + pub rep_levels: Option>, +} + +impl CachedBatch { + /// Creates a new cached batch with just array data (no levels) + #[cfg(test)] + pub fn new(array: ArrayRef) -> Self { + Self { + array, + def_levels: None, + rep_levels: None, + } + } + + /// Creates a new cached batch with array data and levels + pub fn with_levels( + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + ) -> Self { + Self { + array, + def_levels, + rep_levels, + } + } + + /// Returns the memory size of this cached batch + fn memory_size(&self) -> usize { + let array_size = get_array_memory_size_for_cache(&self.array); + let def_size = self + .def_levels + .as_ref() + .map(|l| l.capacity() * std::mem::size_of::()) + .unwrap_or(0); + let rep_size = self + .rep_levels + .as_ref() + .map(|l| l.capacity() * std::mem::size_of::()) + .unwrap_or(0); + array_size + def_size + rep_size + } +} + /// Starting row ID for this batch /// /// The `BatchID` is used to identify batches of rows within a row group. @@ -62,8 +114,8 @@ fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize { /// appears in both filter predicates and output projection. #[derive(Debug)] pub struct RowGroupCache { - /// Cache storage mapping (column_idx, row_id) -> ArrayRef - cache: HashMap, + /// Cache storage mapping (column_idx, row_id) -> CachedBatch + cache: HashMap, /// Cache granularity batch_size: usize, /// Maximum cache size in bytes @@ -83,13 +135,13 @@ impl RowGroupCache { } } - /// Inserts an array into the cache for the given column and starting row ID - /// Returns true if the array was inserted, false if it would exceed the cache size limit - pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, array: ArrayRef) -> bool { - let array_size = get_array_memory_size_for_cache(&array); + /// Inserts a batch into the cache for the given column and starting row ID + /// Returns true if the batch was inserted, false if it would exceed the cache size limit + pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, batch: CachedBatch) -> bool { + let batch_size = batch.memory_size(); - // Check if adding this array would exceed the cache size limit - if self.current_cache_size + array_size > self.max_cache_bytes { + // Check if adding this batch would exceed the cache size limit + if self.current_cache_size + batch_size > self.max_cache_bytes { return false; // Cache is full, don't insert } @@ -98,15 +150,15 @@ impl RowGroupCache { batch_id, }; - let existing = self.cache.insert(key, array); + let existing = self.cache.insert(key, batch); assert!(existing.is_none()); - self.current_cache_size += array_size; + self.current_cache_size += batch_size; true } - /// Retrieves a cached array for the given column and row ID + /// Retrieves a cached batch for the given column and row ID /// Returns None if not found in cache - pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { + pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { let key = CacheKey { column_idx, batch_id, @@ -119,15 +171,15 @@ impl RowGroupCache { self.batch_size } - /// Removes a cached array for the given column and row ID + /// Removes a cached batch for the given column and row ID /// Returns true if the entry was found and removed, false otherwise pub fn remove(&mut self, column_idx: usize, batch_id: BatchID) -> bool { let key = CacheKey { column_idx, batch_id, }; - if let Some(array) = self.cache.remove(&key) { - self.current_cache_size -= get_array_memory_size_for_cache(&array); + if let Some(batch) = self.cache.remove(&key) { + self.current_cache_size -= batch.memory_size(); true } else { false @@ -147,13 +199,14 @@ mod tests { // Create test array let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let batch = CachedBatch::new(array); // Test insert and get let batch_id = BatchID { val: 0 }; - assert!(cache.insert(0, batch_id, array.clone())); + assert!(cache.insert(0, batch_id, batch)); let retrieved = cache.get(0, batch_id); assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().len(), 5); + assert_eq!(retrieved.unwrap().array.len(), 5); // Test miss let miss = cache.get(1, batch_id); @@ -172,10 +225,10 @@ mod tests { let array1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); - // Insert arrays - assert!(cache.insert(0, BatchID { val: 0 }, array1.clone())); - assert!(cache.insert(0, BatchID { val: 1000 }, array2.clone())); - assert!(cache.insert(1, BatchID { val: 0 }, array1.clone())); + // Insert batches + assert!(cache.insert(0, BatchID { val: 0 }, CachedBatch::new(array1.clone()))); + assert!(cache.insert(0, BatchID { val: 1000 }, CachedBatch::new(array2.clone()))); + assert!(cache.insert(1, BatchID { val: 0 }, CachedBatch::new(array1.clone()))); // Verify they're there assert!(cache.get(0, BatchID { val: 0 }).is_some()); From 133b892afecc554d93ed5051b3f965758d3a27e7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 20:00:10 -0600 Subject: [PATCH 3/9] add nested nullable field support for CachedArrayReader --- .../arrow/array_reader/cached_array_reader.rs | 11 ++++- parquet/src/arrow/mod.rs | 45 ------------------- .../arrow/push_decoder/reader_builder/mod.rs | 7 +-- parquet/tests/arrow_reader/predicate_cache.rs | 28 +++++------- 4 files changed, 22 insertions(+), 69 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 36444b75586d..44c7b4ab3a8a 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -281,8 +281,15 @@ impl ArrayReader for CachedArrayReader { fn consume_batch(&mut self) -> Result { let row_count = self.selections.len(); if row_count == 0 { - self.def_levels_buffer = None; - self.rep_levels_buffer = None; + // When there's no data to consume, set empty level buffers if we + // previously had levels. This ensures the levels match the empty array. + // We keep Some([]) rather than None to indicate this reader provides levels. + if self.def_levels_buffer.is_some() { + self.def_levels_buffer = Some(Vec::new()); + } + if self.rep_levels_buffer.is_some() { + self.rep_levels_buffer = Some(Vec::new()); + } return Ok(new_empty_array(self.inner.get_data_type())); } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 52152988166f..99d2f987380d 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -419,51 +419,6 @@ impl ProjectionMask { } } } - - /// Return a new [`ProjectionMask`] that excludes any leaf columns that are - /// part of a nested type, such as struct, list, or map - /// - /// If there are no non-nested columns in the mask, returns `None` - pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option { - let num_leaves = schema.num_columns(); - - // Count how many leaves each root column has - let num_roots = schema.root_schema().get_fields().len(); - let mut root_leaf_counts = vec![0usize; num_roots]; - for leaf_idx in 0..num_leaves { - let root_idx = schema.get_column_root_idx(leaf_idx); - root_leaf_counts[root_idx] += 1; - } - - // Keep only leaves whose root has exactly one leaf (non-nested) and is not a - // LIST. LIST is encoded as a wrapped logical type with a single leaf, e.g. - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - // - // ```text - // // List (list non-null, elements nullable) - // required group my_list (LIST) { - // repeated group list { - // optional binary element (STRING); - // } - // } - // ``` - let mut included_leaves = Vec::new(); - for leaf_idx in 0..num_leaves { - if self.leaf_included(leaf_idx) { - let root = schema.get_column_root(leaf_idx); - let root_idx = schema.get_column_root_idx(leaf_idx); - if root_leaf_counts[root_idx] == 1 && !root.is_list() { - included_leaves.push(leaf_idx); - } - } - } - - if included_leaves.is_empty() { - None - } else { - Some(ProjectionMask::leaves(schema, included_leaves)) - } - } } /// Lookups up the parquet column by name diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d3d78ca7c263..c3bc02d15569 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -653,12 +653,7 @@ impl RowGroupReaderBuilder { cache_projection.union(predicate.projection()); } cache_projection.intersect(&self.projection); - self.exclude_nested_columns_from_cache(&cache_projection) - } - - /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) - fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { - mask.without_nested_types(self.metadata.file_metadata().schema_descr()) + Some(cache_projection) } /// Get the offset index for the specified row group, if any diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index 85dba68c9c69..7a06234a21d0 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -82,13 +82,11 @@ async fn test_cache_disabled_with_filters() { } #[tokio::test] -async fn test_cache_projection_excludes_nested_columns() { - let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0); - - let sync_builder = test.sync_builder().add_nested_filter(); - test.run_sync(sync_builder); - - let async_builder = test.async_builder().await.add_nested_filter(); +async fn test_async_cache_with_nested_columns() { + // Nested columns now work with cache - expect records from cache + // 100 rows × 2 leaf columns (b.aa, b.bb) = 200 records + let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(200); + let async_builder = test.async_builder().await.add_nested_root_filter(); test.run_async(async_builder).await; } @@ -279,9 +277,9 @@ trait ArrowReaderBuilderExt { /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) fn add_project_ab_and_filter_b(self) -> Self; - /// Adds a row filter that projects the nested leaf column "b.aa" and + /// Adds a row filter that projects the nested ROOT column "b" and /// returns true for all rows. - fn add_nested_filter(self) -> Self; + fn add_nested_root_filter(self) -> Self; } impl ArrowReaderBuilderExt for ArrowReaderBuilder { @@ -303,14 +301,13 @@ impl ArrowReaderBuilderExt for ArrowReaderBuilder { .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) } - fn add_nested_filter(self) -> Self { + fn add_nested_root_filter(self) -> Self { let schema_descr = self.metadata().file_metadata().schema_descr_ptr(); - // Build a RowFilter whose predicate projects a leaf under the nested root `b` - // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) - let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]); + // Project the ROOT struct column "b", not just leaf "b.aa" + let root_mask = ProjectionMask::roots(&schema_descr, [1]); // column index 1 = "b" - let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + let always_true = ArrowPredicateFn::new(root_mask.clone(), |batch: RecordBatch| { Ok(arrow_array::BooleanArray::from(vec![ true; batch.num_rows() @@ -318,7 +315,6 @@ impl ArrowReaderBuilderExt for ArrowReaderBuilder { }); let row_filter = RowFilter::new(vec![Box::new(always_true)]); - self.with_projection(nested_leaf_mask) - .with_row_filter(row_filter) + self.with_projection(root_mask).with_row_filter(row_filter) } } From 8827433edd90f48e8209ef5ebc1dad949a82ec52 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 20:09:45 -0600 Subject: [PATCH 4/9] remove tests: --- parquet/src/arrow/mod.rs | 216 --------------------------------------- 1 file changed, 216 deletions(-) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 99d2f987380d..d03b16754222 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -781,222 +781,6 @@ mod test { assert_eq!(mask1.mask, None); } - #[test] - fn test_projection_mask_without_nested_no_nested() { - // Schema with no nested types - let schema = parse_schema( - " - message test_schema { - OPTIONAL INT32 a; - OPTIONAL INT32 b; - REQUIRED DOUBLE d; - } - ", - ); - - let mask = ProjectionMask::all(); - // All columns are non-nested, but without_nested_types returns a new mask - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 1, 2])), - mask.without_nested_types(&schema) - ); - - // select b, c - let mask = ProjectionMask::leaves(&schema, [1, 2]); - assert_eq!(Some(mask.clone()), mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_nested() { - // Schema with nested types (structs) - let schema = parse_schema( - " - message test_schema { - OPTIONAL INT32 a; - OPTIONAL group b { - REQUIRED INT32 b1; - OPTIONAL INT64 b2; - } - OPTIONAL group c (LIST) { - REPEATED group list { - OPTIONAL INT32 element; - } - } - REQUIRED DOUBLE d; - } - ", - ); - - // all leaves --> a, d - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 4])), - mask.without_nested_types(&schema) - ); - - // b1 --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [1]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // b2, d --> d - let mask = ProjectionMask::leaves(&schema, [1, 4]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [4])), - mask.without_nested_types(&schema) - ); - - // element --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [3]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_map_only() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - let schema = parse_schema( - " - message test_schema { - required group my_map (MAP) { - repeated group key_value { - required binary key (STRING); - optional int32 value; - } - } - } - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!(None, mask.without_nested_types(&schema)); - - // key --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // value --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [1]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_map_with_non_nested() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - // with an additional non-nested field - let schema = parse_schema( - " - message test_schema { - REQUIRED INT32 a; - required group my_map (MAP) { - repeated group key_value { - required binary key (STRING); - optional int32 value; - } - } - REQUIRED INT32 b; - } - ", - ); - - // all leaves --> a, b which are the only non nested ones - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 3])), - mask.without_nested_types(&schema) - ); - - // key, value, b --> b (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [1, 2, 3]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3])), - mask.without_nested_types(&schema) - ); - - // key, value --> NONE - let mask = ProjectionMask::leaves(&schema, [1, 2]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_deeply_nested() { - // Map of Maps - let schema = parse_schema( - " - message test_schema { - OPTIONAL group a (MAP) { - REPEATED group key_value { - REQUIRED BYTE_ARRAY key (UTF8); - OPTIONAL group value (MAP) { - REPEATED group key_value { - REQUIRED INT32 key; - REQUIRED BOOLEAN value; - } - } - } - } - REQUIRED INT32 b; - REQUIRED DOUBLE c; - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3, 4])), - mask.without_nested_types(&schema) - ); - - // (first) key, c --> c (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [0, 4]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [4])), - mask.without_nested_types(&schema) - ); - - // (second) key, value, b --> b (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [1, 2, 3]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3])), - mask.without_nested_types(&schema) - ); - - // key --> NONE (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_list() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - let schema = parse_schema( - " - message test_schema { - required group my_list (LIST) { - repeated group list { - optional binary element (STRING); - } - } - REQUIRED INT32 b; - } - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [1])), - mask.without_nested_types(&schema), - ); - - // element --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // element, b --> b (it is nested) - let mask = ProjectionMask::leaves(&schema, [0, 1]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [1])), - mask.without_nested_types(&schema), - ); - } - /// Converts a schema string into a `SchemaDescriptor` fn parse_schema(schema: &str) -> SchemaDescriptor { let parquet_group_type = parse_message_type(schema).unwrap(); From 9f1d4e222842294b1ebca3f03f9c9dd735fb35f9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 23:26:26 -0600 Subject: [PATCH 5/9] fix --- parquet/src/arrow/array_reader/builder.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 818e06e8b81f..9a7b4f565046 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -153,7 +153,10 @@ impl<'a> ArrayReaderBuilder<'a> { return Ok(Some(reader)); }; - if cache_options.projection_mask.leaf_included(col_idx) { + // Skip caching for nested fields (inside List/Map) where rep_level > 0 + // because the cache tracks by row count, but nested fields have + // values that don't correspond 1:1 with rows due to repetition + if cache_options.projection_mask.leaf_included(col_idx) && field.rep_level == 0 { Ok(Some(Box::new(CachedArrayReader::new( reader, Arc::clone(cache_options.cache), From 72b36e5dd0b31b35e31d4bdfc8d0980b9e36fcd4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 4 Jan 2026 09:37:30 -0600 Subject: [PATCH 6/9] add test --- .../arrow/array_reader/cached_array_reader.rs | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 44c7b4ab3a8a..203f7b5dd1db 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -324,6 +324,20 @@ impl ArrayReader for CachedArrayReader { let mask = selection_buffer.slice(selection_start, selection_length); if mask.count_set_bits() == 0 { + // Even when all records are filtered out, check if the batch has levels + // so we can return Some([]) instead of None to indicate this reader provides levels. + // Check local cache first, then shared cache (since skip_records doesn't populate local cache) + let cached_batch = self.local_cache.get(&batch_id).cloned().or_else(|| { + self.shared_cache.lock().unwrap().get(self.column_idx, batch_id) + }); + if let Some(batch) = cached_batch { + if batch.def_levels.is_some() { + has_def_levels = true; + } + if batch.rep_levels.is_some() { + has_rep_levels = true; + } + } continue; } @@ -1089,4 +1103,52 @@ mod tests { assert_eq!(consumer.get_def_levels().unwrap(), &[1, 0, 1, 1, 0]); assert_eq!(consumer.get_rep_levels().unwrap(), &[0, 1, 0, 1, 1]); } + + #[test] + fn test_level_propagation_empty_after_skip() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(4, usize::MAX))); + + // Producer populates cache with levels + let data = vec![1, 2, 3, 4]; + let def_levels = vec![1, 0, 1, 1]; + let rep_levels = vec![0, 1, 1, 0]; + let mock_reader = + MockArrayReaderWithLevels::new(data, def_levels.clone(), rep_levels.clone()); + let mut producer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + producer.read_records(4).unwrap(); + producer.consume_batch().unwrap(); + + // Consumer skips all rows, resulting in an empty output batch + let mock_reader2 = MockArrayReaderWithLevels::new( + vec![10, 20, 30, 40], + vec![0, 0, 0, 0], + vec![0, 0, 0, 0], + ); + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader2), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + let skipped = consumer.skip_records(4).unwrap(); + assert_eq!(skipped, 4); + + let array = consumer.consume_batch().unwrap(); + assert_eq!(array.len(), 0); + + let def_levels = consumer.get_def_levels().map(|l| l.to_vec()); + assert_eq!(def_levels, Some(vec![])); + let rep_levels = consumer.get_rep_levels().map(|l| l.to_vec()); + assert_eq!(rep_levels, Some(vec![])); + } } From 2e40b7858231222f5aaff6d3ba972813fc01e8df Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:52:39 -0500 Subject: [PATCH 7/9] fmt --- .../src/arrow/array_reader/cached_array_reader.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 203f7b5dd1db..2e6ac6de0bde 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -328,7 +328,10 @@ impl ArrayReader for CachedArrayReader { // so we can return Some([]) instead of None to indicate this reader provides levels. // Check local cache first, then shared cache (since skip_records doesn't populate local cache) let cached_batch = self.local_cache.get(&batch_id).cloned().or_else(|| { - self.shared_cache.lock().unwrap().get(self.column_idx, batch_id) + self.shared_cache + .lock() + .unwrap() + .get(self.column_idx, batch_id) }); if let Some(batch) = cached_batch { if batch.def_levels.is_some() { @@ -1061,11 +1064,8 @@ mod tests { let def_levels = vec![1, 0, 1, 1, 0]; let rep_levels = vec![0, 1, 0, 1, 1]; - let mock_reader = MockArrayReaderWithLevels::new( - data.clone(), - def_levels.clone(), - rep_levels.clone(), - ); + let mock_reader = + MockArrayReaderWithLevels::new(data.clone(), def_levels.clone(), rep_levels.clone()); let mut producer = CachedArrayReader::new( Box::new(mock_reader), cache.clone(), From 236820c6a675f9dd97adb2eba06ded385ba89f6f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 11 Jan 2026 09:00:04 -0500 Subject: [PATCH 8/9] Add integration tests --- parquet/src/arrow/array_reader/builder.rs | 1 + .../arrow/array_reader/cached_array_reader.rs | 82 +++- parquet/tests/arrow_reader/predicate_cache.rs | 374 +++++++++++++++++- 3 files changed, 439 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 9a7b4f565046..f2a4eed6cdb1 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -163,6 +163,7 @@ impl<'a> ArrayReaderBuilder<'a> { col_idx, cache_options.role, self.metrics.clone(), // cheap clone + field.def_level > 0, // needs_def_levels: true if has nullable ancestors )))) } else { Ok(Some(reader)) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 2e6ac6de0bde..ee7c225d8803 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -91,16 +91,31 @@ pub struct CachedArrayReader { def_levels_buffer: Option>, /// Repetition levels for the last consume_batch() output rep_levels_buffer: Option>, + /// Whether this reader needs definition levels (def_level > 0, i.e., has nullable ancestors) + /// When false, we skip copying definition levels to avoid unnecessary allocations. + needs_def_levels: bool, } impl CachedArrayReader { /// Creates a new cached array reader with the specified role + /// + /// # Arguments + /// * `inner` - The underlying array reader + /// * `cache` - Shared cache for this row group + /// * `column_idx` - Column index for cache key generation + /// * `role` - Producer or Consumer role + /// * `metrics` - Statistics to report on cache behavior + /// * `needs_def_levels` - Whether this column needs definition levels (def_level > 0). + /// When false, definition levels are not copied to avoid unnecessary allocations. + /// Note: repetition levels are never copied for cached columns since caching is + /// only enabled for columns with rep_level == 0. pub fn new( inner: Box, cache: Arc>, column_idx: usize, role: CacheRole, metrics: ArrowReaderMetrics, + needs_def_levels: bool, ) -> Self { let batch_size = cache.read().unwrap().batch_size(); @@ -117,6 +132,7 @@ impl CachedArrayReader { metrics, def_levels_buffer: None, rep_levels_buffer: None, + needs_def_levels, } } @@ -152,9 +168,17 @@ impl CachedArrayReader { let array = self.inner.consume_batch()?; - // Capture definition and repetition levels from inner reader before they are cleared - let def_levels = self.inner.get_def_levels().map(|l| l.to_vec()); - let rep_levels = self.inner.get_rep_levels().map(|l| l.to_vec()); + // Capture definition levels from inner reader only when needed (def_level > 0). + // This avoids unnecessary allocations for columns without nullable ancestors. + // Repetition levels are never copied because caching is only enabled for + // columns with rep_level == 0 (non-nested, non-repeated columns). + let def_levels = if self.needs_def_levels { + self.inner.get_def_levels().map(|l| l.to_vec()) + } else { + None + }; + // rep_levels always None for cached columns (rep_level == 0 per builder.rs) + let rep_levels = None; let cached_batch = CachedBatch::with_levels(array, def_levels, rep_levels); // Store in both shared cache and local cache @@ -506,6 +530,7 @@ mod tests { 0, CacheRole::Producer, metrics, + false, // needs_def_levels: basic test doesn't use levels ); // Read 3 records @@ -534,6 +559,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels: basic test doesn't use levels ); let read1 = cached_reader.read_records(2).unwrap(); @@ -568,6 +594,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Multiple reads should accumulate @@ -595,6 +622,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Try to read more than available @@ -625,6 +653,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + false, // needs_def_levels ); cached_reader1.read_records(3).unwrap(); @@ -639,6 +668,7 @@ mod tests { 1, CacheRole::Consumer, metrics.clone(), + false, // needs_def_levels ); cached_reader2.read_records(2).unwrap(); @@ -661,6 +691,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read first batch (positions 0-2, batch 0) @@ -714,6 +745,7 @@ mod tests { 0, CacheRole::Producer, metrics, + false, // needs_def_levels ); // Read first batch (positions 0-2) @@ -747,6 +779,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read records which should populate both shared and local cache @@ -784,6 +817,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read records which should populate both shared and local cache @@ -811,6 +845,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + false, // needs_def_levels ); // Populate cache with first batch (1, 2, 3) @@ -824,6 +859,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // - We want to read 4 records starting from position 0 @@ -928,6 +964,7 @@ mod tests { 0, CacheRole::Producer, metrics, + true, // needs_def_levels: test level propagation ); // Read all 5 records @@ -937,14 +974,15 @@ mod tests { let array = cached_reader.consume_batch().unwrap(); assert_eq!(array.len(), 5); - // Verify levels are captured + // Verify definition levels are captured let def_levels = cached_reader.get_def_levels(); assert!(def_levels.is_some()); assert_eq!(def_levels.unwrap(), &[1, 1, 0, 1, 1]); + // Repetition levels are not copied because caching is only enabled + // for columns with rep_level == 0 (non-nested columns) let rep_levels = cached_reader.get_rep_levels(); - assert!(rep_levels.is_some()); - assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 0, 1]); + assert!(rep_levels.is_none()); } #[test] @@ -963,6 +1001,7 @@ mod tests { 0, CacheRole::Producer, metrics, + true, // needs_def_levels: test level propagation ); // Read 2 records @@ -985,14 +1024,14 @@ mod tests { let int32_array = array.as_any().downcast_ref::().unwrap(); assert_eq!(int32_array.values(), &[1, 2, 5, 6]); - // Verify levels match the selected values + // Verify definition levels match the selected values let def_levels = cached_reader.get_def_levels(); assert!(def_levels.is_some()); assert_eq!(def_levels.unwrap(), &[1, 1, 1, 0]); // def_levels for positions 0, 1, 4, 5 + // Repetition levels are not copied (rep_level == 0 for cached columns) let rep_levels = cached_reader.get_rep_levels(); - assert!(rep_levels.is_some()); - assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 1]); // rep_levels for positions 0, 1, 4, 5 + assert!(rep_levels.is_none()); } #[test] @@ -1011,6 +1050,7 @@ mod tests { 0, CacheRole::Producer, metrics, + true, // needs_def_levels: test level propagation ); // Read all 6 records (spanning 2 batches) @@ -1020,14 +1060,14 @@ mod tests { let array = cached_reader.consume_batch().unwrap(); assert_eq!(array.len(), 6); - // Verify levels are correctly concatenated from both batches + // Verify definition levels are correctly concatenated from both batches let def_levels = cached_reader.get_def_levels(); assert!(def_levels.is_some()); assert_eq!(def_levels.unwrap(), &[1, 0, 1, 1, 0, 1]); + // Repetition levels are not copied (rep_level == 0 for cached columns) let rep_levels = cached_reader.get_rep_levels(); - assert!(rep_levels.is_some()); - assert_eq!(rep_levels.unwrap(), &[0, 0, 1, 0, 0, 1]); + assert!(rep_levels.is_none()); } #[test] @@ -1042,6 +1082,7 @@ mod tests { 0, CacheRole::Producer, metrics, + false, // needs_def_levels: false since inner reader has no levels ); let records_read = cached_reader.read_records(3).unwrap(); @@ -1049,7 +1090,7 @@ mod tests { let _array = cached_reader.consume_batch().unwrap(); - // Should return None since inner reader has no levels + // Should return None since inner reader has no levels and needs_def_levels is false assert!(cached_reader.get_def_levels().is_none()); assert!(cached_reader.get_rep_levels().is_none()); } @@ -1072,6 +1113,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + true, // needs_def_levels: test level propagation ); // Producer reads and populates cache @@ -1090,6 +1132,7 @@ mod tests { 0, // Same column index CacheRole::Consumer, metrics, + true, // needs_def_levels: test level propagation ); consumer.read_records(5).unwrap(); @@ -1099,9 +1142,10 @@ mod tests { let int32_array = array.as_any().downcast_ref::().unwrap(); assert_eq!(int32_array.values(), &[1, 2, 3, 4, 5]); - // Should get original levels from cache + // Should get original definition levels from cache assert_eq!(consumer.get_def_levels().unwrap(), &[1, 0, 1, 1, 0]); - assert_eq!(consumer.get_rep_levels().unwrap(), &[0, 1, 0, 1, 1]); + // Repetition levels are not cached (rep_level == 0 for cached columns) + assert!(consumer.get_rep_levels().is_none()); } #[test] @@ -1121,6 +1165,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + true, // needs_def_levels: test level propagation ); producer.read_records(4).unwrap(); @@ -1138,6 +1183,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + true, // needs_def_levels: test level propagation ); let skipped = consumer.skip_records(4).unwrap(); @@ -1146,9 +1192,11 @@ mod tests { let array = consumer.consume_batch().unwrap(); assert_eq!(array.len(), 0); + // Definition levels should be empty (not None) after skip let def_levels = consumer.get_def_levels().map(|l| l.to_vec()); assert_eq!(def_levels, Some(vec![])); - let rep_levels = consumer.get_rep_levels().map(|l| l.to_vec()); - assert_eq!(rep_levels, Some(vec![])); + // Repetition levels are not cached (rep_level == 0 for cached columns) + let rep_levels = consumer.get_rep_levels(); + assert!(rep_levels.is_none()); } } diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index 7a06234a21d0..fe853c8cb54d 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -21,7 +21,7 @@ use super::io::TestReader; use arrow::array::ArrayRef; use arrow::array::Int64Array; use arrow::compute::and; -use arrow::compute::kernels::cmp::{gt, lt}; +use arrow::compute::kernels::cmp::{eq, gt, lt}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray}; @@ -90,6 +90,299 @@ async fn test_async_cache_with_nested_columns() { test.run_async(async_builder).await; } +/// Test RowSelectionPolicy impact on cache reads with a struct filter that selects sparse rows. +/// +/// Filter: `bb % 2 == 0 AND (id < 25 OR id > 75)` selects rows at both ends (sparse, non-contiguous) +/// Filter mask: `[id, b]` (3 leaf columns: id, b.aa, b.bb) +/// Projection: `[id, b]` +/// +/// Both policies must return identical row counts and data, but differ in cache reads: +/// - Auto (Mask strategy): reads more rows due to covering the range +/// - Selectors: reads only the selected rows +#[tokio::test] +async fn test_struct_filter_sparse_selection_policy() { + use arrow::compute::kernels::numeric::rem; + use parquet::arrow::arrow_reader::RowSelectionPolicy; + + // Helper to create the filter: bb % 2 == 0 AND (id < 25 OR id > 75) + fn make_filter( + schema_descr: &std::sync::Arc, + ) -> RowFilter { + let filter_mask = ProjectionMask::roots(schema_descr, [0, 1]); // id and b + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + // id < 25 OR id > 75 (sparse: both ends) + let id_sparse = arrow::compute::or( + <(id, &Int64Array::new_scalar(25))?, + >(id, &Int64Array::new_scalar(75))?, + )?; + + let struct_col = batch.column(1).as_struct(); + let bb = struct_col.column_by_name("bb").unwrap(); + let bb = bb.as_primitive::(); + let remainder = rem(bb, &arrow_array::Int32Array::new_scalar(2))?; + let bb_even = eq(&remainder, &arrow_array::Int32Array::new_scalar(0))?; + + and(&id_sparse, &bb_even) + }); + RowFilter::new(vec![Box::new(row_filter)]) + } + + // Test with Auto policy + let (auto_rows, auto_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + let projection = ProjectionMask::roots(&schema_descr, [0, 1]); + + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::default()); // Auto + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + // Test with Selectors policy + let (selectors_rows, selectors_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + let projection = ProjectionMask::roots(&schema_descr, [0, 1]); + + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::Selectors); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + eprintln!("Sparse struct filter:"); + eprintln!(" Auto policy: rows={auto_rows}, cache={auto_cache}"); + eprintln!(" Selectors policy: rows={selectors_rows}, cache={selectors_cache}"); + + // Both policies must return identical row counts + assert_eq!( + auto_rows, selectors_rows, + "Both policies must return same row count" + ); + + // Selectors policy reads exactly: filtered_rows × 3 leaf columns + assert_eq!( + selectors_cache, + selectors_rows * 3, + "Selectors policy: cache reads = rows × 3 columns" + ); + + // Auto policy reads more due to Mask strategy covering the range + assert!( + auto_cache >= selectors_cache, + "Auto policy should read >= Selectors policy" + ); +} + +/// Test struct field projections with a filter on id (non-struct column). +/// +/// Filter: `id > 50` (filter mask includes only `id`) +/// Tests various projections to verify cache behavior: +/// - Only columns in (filter_mask ∩ projection) are read from cache +/// - Since filter only uses `id`, struct columns are never cached +/// +/// Schema leaf columns: 0=id, 1=b.aa, 2=b.bb +#[tokio::test] +async fn test_struct_field_projections_filter_on_id() { + // Expected: 49 rows (id 51-99) + const EXPECTED_ROWS: usize = 49; + + // Test cases: (projection_leaves, expected_cache_reads, description) + let test_cases: &[(&[usize], usize, &str)] = &[ + ( + &[0, 2], + EXPECTED_ROWS, + "[id, b.bb]: id in filter mask → cached", + ), + ( + &[1, 2], + 0, + "[b.aa, b.bb]: neither in filter mask → not cached", + ), + ( + &[0, 1, 2], + EXPECTED_ROWS, + "[id, b]: id in filter mask → cached", + ), + (&[2], 0, "[b.bb]: not in filter mask → not cached"), + (&[0], EXPECTED_ROWS, "[id]: id in filter mask → cached"), + ]; + + for (projection_leaves, expected_cache, description) in test_cases { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on id only: id > 50 + let filter_mask = ProjectionMask::leaves(&schema_descr, [0]); // only id + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + gt(id, &Int64Array::new_scalar(50)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "Filter id>50, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + assert_eq!( + total_rows, EXPECTED_ROWS, + "Expected {EXPECTED_ROWS} rows for {description}" + ); + assert_eq!( + cache_reads, *expected_cache, + "Cache reads mismatch for {description}" + ); + } +} + +/// Test struct field projections with a filter on bb (struct field). +/// +/// Filter: `bb > 50` (filter mask includes `b` which means both b.aa and b.bb leaves) +/// Tests various projections with both RowSelectionPolicy options. +/// +/// Schema leaf columns: 0=id, 1=b.aa, 2=b.bb +/// Filter mask: roots([1]) = leaves([1, 2]) +/// +/// Cache reads (Selectors policy) = rows × (filter_mask ∩ projection).len() +#[tokio::test] +async fn test_struct_field_projections_filter_on_bb() { + use parquet::arrow::arrow_reader::RowSelectionPolicy; + + // Helper to create the filter: bb > 50 + // Filter mask includes the entire struct `b` (both b.aa and b.bb) + fn make_filter( + schema_descr: &std::sync::Arc, + ) -> RowFilter { + let filter_mask = ProjectionMask::roots(schema_descr, [1]); // struct b (includes b.aa, b.bb) + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let struct_col = batch.column(0).as_struct(); + let bb = struct_col.column_by_name("bb").unwrap(); + let bb = bb.as_primitive::(); + gt(bb, &arrow_array::Int32Array::new_scalar(50)) + }); + RowFilter::new(vec![Box::new(row_filter)]) + } + + // Test cases: (projection_leaves, cached_leaf_count, description) + // cached_leaf_count = number of leaves in (filter_mask ∩ projection) + // filter_mask = leaves [1, 2] (b.aa, b.bb) + let test_cases: &[(&[usize], usize, &str)] = &[ + (&[0, 2], 1, "[id, b.bb]: b.bb in filter mask → 1 cached"), + (&[1, 2], 2, "[b.aa, b.bb]: both in filter mask → 2 cached"), + ( + &[0, 1, 2], + 2, + "[id, b]: b.aa, b.bb in filter mask → 2 cached", + ), + (&[2], 1, "[b.bb]: in filter mask → 1 cached"), + (&[0], 0, "[id]: not in filter mask → 0 cached"), + ]; + + for (projection_leaves, cached_leaf_count, description) in test_cases { + // Test with Selectors policy (exact cache reads) + let (selectors_rows, selectors_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + let projection = + ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::Selectors); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + // Test with Auto policy + let (auto_rows, auto_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + let projection = + ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::default()); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + eprintln!( + "Filter bb>50, projection {:?}: Selectors(rows={}, cache={}), Auto(rows={}, cache={}) ({})", + projection_leaves, selectors_rows, selectors_cache, auto_rows, auto_cache, description + ); + + // Both policies must return identical row counts + assert_eq!( + selectors_rows, auto_rows, + "Row count mismatch between policies for {description}" + ); + + // Selectors policy: exact cache reads = rows × cached_leaf_count + let expected_selectors_cache = selectors_rows * cached_leaf_count; + assert_eq!( + selectors_cache, expected_selectors_cache, + "Selectors cache mismatch for {description}: expected {expected_selectors_cache}" + ); + + // Auto policy reads >= Selectors due to Mask strategy + assert!( + auto_cache >= selectors_cache, + "Auto should read >= Selectors for {description}" + ); + } +} + // -- Begin test infrastructure -- /// A test parquet file @@ -126,6 +419,16 @@ impl ParquetPredicateCacheTest { } } + /// Create a new test file with nullable nested struct. + /// + /// See [`NESTED_NULLABLE_TEST_FILE_DATA`] for data details. + fn new_nested_nullable() -> Self { + Self { + bytes: NESTED_NULLABLE_TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + /// Set the expected number of records read from the cache fn with_expected_records_read_from_cache( mut self, @@ -235,6 +538,75 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +/// Build a ParquetFile with nullable nested struct for testing row filters with definition levels. +/// +/// Schema: +/// * `id: Int64` +/// * `b: Struct { aa: String (nullable), bb: Int32 (nullable) }` (nullable struct) +/// +/// Null patterns: +/// - Every 3rd row (i % 3 == 0): aa is null +/// - Every 5th row (i % 5 == 0): bb is null +/// - Every 7th row (i % 7 == 0): entire struct is null (overrides field nulls) +/// +/// The non-null values are: +/// - aa = "v{i}" +/// - bb = i as i32 +/// +/// Where i is the row index from 0 to 99 +static NESTED_NULLABLE_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + use arrow_array::Int32Array; + use arrow_buffer::NullBuffer; + use arrow_schema::Fields; + + const NUM_ROWS: usize = 100; + + // id column + let id: Int64Array = (0..NUM_ROWS as i64).collect(); + + // aa: String column - null every 3rd row + let aa: StringArray = (0..NUM_ROWS) + .map(|i| { + if i % 3 == 0 { + None + } else { + Some(format!("v{i}")) + } + }) + .collect(); + + // bb: Int32 column - null every 5th row + let bb: Int32Array = (0..NUM_ROWS) + .map(|i| if i % 5 == 0 { None } else { Some(i as i32) }) + .collect(); + + // Struct null buffer - null every 7th row + let struct_nulls: Vec = (0..NUM_ROWS).map(|i| i % 7 != 0).collect(); + let struct_null_buffer = NullBuffer::from(struct_nulls); + + // Create struct with null buffer + let b = StructArray::new( + Fields::from(vec![ + Field::new("aa", DataType::Utf8, true), + Field::new("bb", DataType::Int32, true), + ]), + vec![Arc::new(aa) as ArrayRef, Arc::new(bb) as ArrayRef], + Some(struct_null_buffer), + ); + + let input_batch = RecordBatch::try_from_iter([ + ("id", Arc::new(id) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ]) + .unwrap(); + + let mut output = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap(); + writer.write(&input_batch).unwrap(); + writer.close().unwrap(); + Bytes::from(output) +}); + /// Build a ParquetFile with a /// /// * string column `a` From 5c03896dd04679819db04d84f0d27708bc4245ff Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 11 Jan 2026 14:52:26 -0500 Subject: [PATCH 9/9] Add integration tests for list->struct cache behavior Add tests demonstrating that nested fields inside List structures (with rep_level > 0) are NOT cached, while top-level columns work correctly: - test_list_struct_fields_not_cached_filter_on_id: filter on id (rep_level=0), shows id is cached but list struct fields are not - test_list_struct_fields_not_cached_filter_on_struct_field: filter on struct_field_b inside the list, shows 0 cache reads for all projections since filter mask leaves have rep_level > 0 Co-Authored-By: Claude Opus 4.5 --- parquet/tests/arrow_reader/predicate_cache.rs | 243 +++++++++++++++++- 1 file changed, 242 insertions(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index fe853c8cb54d..8d0fa5bbff57 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -24,7 +24,7 @@ use arrow::compute::and; use arrow::compute::kernels::cmp::{eq, gt, lt}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; -use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray}; +use arrow_array::{Array, RecordBatch, StringArray, StringViewArray, StructArray}; use arrow_schema::{DataType, Field}; use bytes::Bytes; use futures::StreamExt; @@ -383,6 +383,171 @@ async fn test_struct_field_projections_filter_on_bb() { } } +/// Test that List->Struct fields are NOT cached (rep_level > 0). +/// +/// Schema: +/// - id: Int64 (leaf 0, rep_level=0 - CAN be cached) +/// - list_col: List +/// - struct_field_a (leaf 1, rep_level=1 - NOT cached) +/// - struct_field_b (leaf 2, rep_level=1 - NOT cached) +/// +/// Filter: `id > 50` (filter mask includes only `id`) +/// Expected: 49 rows (id 51-99) +/// +/// Cache behavior: +/// - Only `id` (leaf 0) is cached because rep_level=0 +/// - List struct fields (leaves 1,2) are NOT cached because rep_level > 0 +#[tokio::test] +async fn test_list_struct_fields_not_cached_filter_on_id() { + const EXPECTED_ROWS: usize = 49; + + // Test cases: (projection_leaves, expected_cache_reads, description) + // Filter mask = [id] (leaf 0), so only id can be cached + let test_cases: &[(&[usize], usize, &str)] = &[ + ( + &[0, 1, 2], + EXPECTED_ROWS, + "[id, list_col]: only id cached (rep_level=0)", + ), + ( + &[1, 2], + 0, + "[list_col]: nothing cached (rep_level > 0 for list fields)", + ), + (&[0], EXPECTED_ROWS, "[id]: id cached (rep_level=0)"), + (&[1], 0, "[struct_field_a]: not cached (rep_level > 0)"), + (&[2], 0, "[struct_field_b]: not cached (rep_level > 0)"), + ]; + + for (projection_leaves, expected_cache, description) in test_cases { + let test = ParquetPredicateCacheTest::new_list_struct(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on id only: id > 50 + let filter_mask = ProjectionMask::leaves(&schema_descr, [0]); // only id + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + gt(id, &Int64Array::new_scalar(50)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "List struct filter id>50, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + assert_eq!( + total_rows, EXPECTED_ROWS, + "Expected {EXPECTED_ROWS} rows for {description}" + ); + assert_eq!( + cache_reads, *expected_cache, + "Cache reads mismatch for {description}" + ); + } +} + +/// Test that filtering on List->Struct fields also results in 0 cache hits. +/// +/// Filter: on struct_field_b (field inside list->struct) +/// Filter mask: [list_col] (includes leaves 1,2) +/// +/// Since all leaves in the filter mask have rep_level > 0, nothing is cached. +#[tokio::test] +async fn test_list_struct_fields_not_cached_filter_on_struct_field() { + // Filter on struct_field_b > 500 (i.e., rows where i*10 + j > 500, so i > 50) + // This selects rows with id >= 51, so ~49 rows + + // Test cases: (projection_leaves, description) + // All should have 0 cache reads because filter mask leaves have rep_level > 0 + let test_cases: &[(&[usize], &str)] = &[ + (&[0, 1, 2], "[id, list_col]: filter mask has rep_level > 0"), + (&[1, 2], "[list_col]: filter mask has rep_level > 0"), + ( + &[0], + "[id]: id not in filter mask, filter leaves have rep_level > 0", + ), + (&[1], "[struct_field_a]: rep_level > 0"), + (&[2], "[struct_field_b]: rep_level > 0"), + ]; + + for (projection_leaves, description) in test_cases { + let test = ParquetPredicateCacheTest::new_list_struct(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on struct_field_b: check if any element has struct_field_b > 500 + // Filter mask includes the entire list_col (leaves 1, 2) + let filter_mask = ProjectionMask::leaves(&schema_descr, [1, 2]); + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + // batch.column(0) is list_col + let list = batch.column(0).as_list::(); + + let result: Vec = (0..batch.num_rows()) + .map(|row| { + if list.is_null(row) { + return false; + } + let struct_array = list.value(row); + let struct_arr = struct_array.as_struct(); + let field_b = struct_arr + .column_by_name("struct_field_b") + .unwrap() + .as_primitive::(); + + // Check if any element has struct_field_b > 500 + (0..field_b.len()).any(|i| !field_b.is_null(i) && field_b.value(i) > 500) + }) + .collect(); + + Ok(arrow_array::BooleanArray::from(result)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "List struct filter on struct_field_b, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + // All cases should have 0 cache reads because filter mask leaves have rep_level > 0 + assert_eq!( + cache_reads, 0, + "Expected 0 cache reads for {description}, got {cache_reads}" + ); + + // Should have some rows (those where any struct_field_b > 500) + assert!( + total_rows > 0, + "Expected some rows for {description}, got {total_rows}" + ); + } +} + // -- Begin test infrastructure -- /// A test parquet file @@ -429,6 +594,16 @@ impl ParquetPredicateCacheTest { } } + /// Create a new test file with List containing Struct elements. + /// + /// See [`LIST_STRUCT_TEST_FILE_DATA`] for data details. + fn new_list_struct() -> Self { + Self { + bytes: LIST_STRUCT_TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + /// Set the expected number of records read from the cache fn with_expected_records_read_from_cache( mut self, @@ -643,6 +818,72 @@ static NESTED_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +/// Build a ParquetFile with a List containing Struct elements. +/// +/// Schema: +/// * `id: Int64` (leaf 0, rep_level=0 - CAN be cached) +/// * `list_col: List` +/// - struct_field_a (leaf 1, rep_level=1 - NOT cached) +/// - struct_field_b (leaf 2, rep_level=1 - NOT cached) +/// +/// Data: +/// - 100 rows (id 0..99) +/// - Each row has (i % 3 + 1) list elements (1-3 elements per row) +/// - struct_field_a = "val_{i}_{j}" where i=row, j=element index +/// - struct_field_b = i * 10 + j +static LIST_STRUCT_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder}; + + const NUM_ROWS: usize = 100; + + // id column + let id: Int64Array = (0..NUM_ROWS as i64).collect(); + + // Build list of structs + let struct_fields = vec![ + Field::new("struct_field_a", DataType::Utf8, true), + Field::new("struct_field_b", DataType::Int32, true), + ]; + let mut list_builder = ListBuilder::new(StructBuilder::new( + struct_fields.clone(), + vec![ + Box::new(StringBuilder::new()) as Box, + Box::new(Int32Builder::new()) as Box, + ], + )); + + for i in 0..NUM_ROWS { + let num_elements = (i % 3) + 1; // 1, 2, or 3 elements + for j in 0..num_elements { + let struct_builder = list_builder.values(); + struct_builder + .field_builder::(0) + .unwrap() + .append_value(format!("val_{}_{}", i, j)); + struct_builder + .field_builder::(1) + .unwrap() + .append_value((i * 10 + j) as i32); + struct_builder.append(true); + } + list_builder.append(true); + } + + let list_col = list_builder.finish(); + + let input_batch = RecordBatch::try_from_iter([ + ("id", Arc::new(id) as ArrayRef), + ("list_col", Arc::new(list_col) as ArrayRef), + ]) + .unwrap(); + + let mut output = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap(); + writer.write(&input_batch).unwrap(); + writer.close().unwrap(); + Bytes::from(output) +}); + trait ArrowReaderBuilderExt { /// Applies the following: /// 1. a projection selecting the "a" and "b" column