Skip to content

Commit f8f421e

Browse files
committed
add nested nullable field support for CachedArrayReader
1 parent ca47c5c commit f8f421e

File tree

4 files changed

+22
-69
lines changed

4 files changed

+22
-69
lines changed

parquet/src/arrow/array_reader/cached_array_reader.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,15 @@ impl ArrayReader for CachedArrayReader {
281281
fn consume_batch(&mut self) -> Result<ArrayRef> {
282282
let row_count = self.selections.len();
283283
if row_count == 0 {
284-
self.def_levels_buffer = None;
285-
self.rep_levels_buffer = None;
284+
// When there's no data to consume, set empty level buffers if we
285+
// previously had levels. This ensures the levels match the empty array.
286+
// We keep Some([]) rather than None to indicate this reader provides levels.
287+
if self.def_levels_buffer.is_some() {
288+
self.def_levels_buffer = Some(Vec::new());
289+
}
290+
if self.rep_levels_buffer.is_some() {
291+
self.rep_levels_buffer = Some(Vec::new());
292+
}
286293
return Ok(new_empty_array(self.inner.get_data_type()));
287294
}
288295

parquet/src/arrow/mod.rs

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -419,51 +419,6 @@ impl ProjectionMask {
419419
}
420420
}
421421
}
422-
423-
/// Return a new [`ProjectionMask`] that excludes any leaf columns that are
424-
/// part of a nested type, such as struct, list, or map
425-
///
426-
/// If there are no non-nested columns in the mask, returns `None`
427-
pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option<Self> {
428-
let num_leaves = schema.num_columns();
429-
430-
// Count how many leaves each root column has
431-
let num_roots = schema.root_schema().get_fields().len();
432-
let mut root_leaf_counts = vec![0usize; num_roots];
433-
for leaf_idx in 0..num_leaves {
434-
let root_idx = schema.get_column_root_idx(leaf_idx);
435-
root_leaf_counts[root_idx] += 1;
436-
}
437-
438-
// Keep only leaves whose root has exactly one leaf (non-nested) and is not a
439-
// LIST. LIST is encoded as a wrapped logical type with a single leaf, e.g.
440-
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
441-
//
442-
// ```text
443-
// // List<String> (list non-null, elements nullable)
444-
// required group my_list (LIST) {
445-
// repeated group list {
446-
// optional binary element (STRING);
447-
// }
448-
// }
449-
// ```
450-
let mut included_leaves = Vec::new();
451-
for leaf_idx in 0..num_leaves {
452-
if self.leaf_included(leaf_idx) {
453-
let root = schema.get_column_root(leaf_idx);
454-
let root_idx = schema.get_column_root_idx(leaf_idx);
455-
if root_leaf_counts[root_idx] == 1 && !root.is_list() {
456-
included_leaves.push(leaf_idx);
457-
}
458-
}
459-
}
460-
461-
if included_leaves.is_empty() {
462-
None
463-
} else {
464-
Some(ProjectionMask::leaves(schema, included_leaves))
465-
}
466-
}
467422
}
468423

469424
/// Lookups up the parquet column by name

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -636,12 +636,7 @@ impl RowGroupReaderBuilder {
636636
cache_projection.union(predicate.projection());
637637
}
638638
cache_projection.intersect(&self.projection);
639-
self.exclude_nested_columns_from_cache(&cache_projection)
640-
}
641-
642-
/// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
643-
fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
644-
mask.without_nested_types(self.metadata.file_metadata().schema_descr())
639+
Some(cache_projection)
645640
}
646641

647642
/// Get the offset index for the specified row group, if any

parquet/tests/arrow_reader/predicate_cache.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,11 @@ async fn test_cache_disabled_with_filters() {
8585
}
8686

8787
#[tokio::test]
88-
async fn test_cache_projection_excludes_nested_columns() {
89-
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
90-
91-
let sync_builder = test.sync_builder().add_nested_filter();
92-
test.run_sync(sync_builder);
93-
94-
let async_builder = test.async_builder().await.add_nested_filter();
88+
async fn test_async_cache_with_nested_columns() {
89+
// Nested columns now work with cache - expect records from cache
90+
// 100 rows × 2 leaf columns (b.aa, b.bb) = 200 records
91+
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(200);
92+
let async_builder = test.async_builder().await.add_nested_root_filter();
9593
test.run_async(async_builder).await;
9694
}
9795

@@ -282,9 +280,9 @@ trait ArrowReaderBuilderExt {
282280
/// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group)
283281
fn add_project_ab_and_filter_b(self) -> Self;
284282

285-
/// Adds a row filter that projects the nested leaf column "b.aa" and
283+
/// Adds a row filter that projects the nested ROOT column "b" and
286284
/// returns true for all rows.
287-
fn add_nested_filter(self) -> Self;
285+
fn add_nested_root_filter(self) -> Self;
288286
}
289287

290288
impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
@@ -306,23 +304,21 @@ impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
306304
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
307305
}
308306

309-
fn add_nested_filter(self) -> Self {
307+
fn add_nested_root_filter(self) -> Self {
310308
let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
311309

312-
// Build a RowFilter whose predicate projects a leaf under the nested root `b`
313-
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
314-
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
310+
// Project the ROOT struct column "b", not just leaf "b.aa"
311+
let root_mask = ProjectionMask::roots(&schema_descr, [1]); // column index 1 = "b"
315312

316-
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
313+
let always_true = ArrowPredicateFn::new(root_mask.clone(), |batch: RecordBatch| {
317314
Ok(arrow_array::BooleanArray::from(vec![
318315
true;
319316
batch.num_rows()
320317
]))
321318
});
322319
let row_filter = RowFilter::new(vec![Box::new(always_true)]);
323320

324-
self.with_projection(nested_leaf_mask)
325-
.with_row_filter(row_filter)
321+
self.with_projection(root_mask).with_row_filter(row_filter)
326322
}
327323
}
328324

0 commit comments

Comments
 (0)