Skip to content

Commit 260f092

Browse files
adriangbclaude
andcommitted
Add integration tests for list->struct cache behavior
Add tests demonstrating that nested fields inside List structures (with rep_level > 0) are NOT cached, while top-level columns work correctly: - test_list_struct_fields_not_cached_filter_on_id: filter on id (rep_level=0), shows id is cached but list struct fields are not - test_list_struct_fields_not_cached_filter_on_struct_field: filter on struct_field_b inside the list, shows 0 cache reads for all projections since filter mask leaves have rep_level > 0 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4e6c1ea commit 260f092

File tree

1 file changed

+242
-1
lines changed

1 file changed

+242
-1
lines changed

parquet/tests/arrow_reader/predicate_cache.rs

Lines changed: 242 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::compute::and;
2424
use arrow::compute::kernels::cmp::{eq, gt, lt};
2525
use arrow_array::cast::AsArray;
2626
use arrow_array::types::Int64Type;
27-
use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
27+
use arrow_array::{Array, RecordBatch, StringArray, StringViewArray, StructArray};
2828
use arrow_schema::{DataType, Field};
2929
use bytes::Bytes;
3030
use futures::StreamExt;
@@ -383,6 +383,171 @@ async fn test_struct_field_projections_filter_on_bb() {
383383
}
384384
}
385385

386+
/// Test that List->Struct fields are NOT cached (rep_level > 0).
387+
///
388+
/// Schema:
389+
/// - id: Int64 (leaf 0, rep_level=0 - CAN be cached)
390+
/// - list_col: List<Struct { struct_field_a: String, struct_field_b: Int32 }>
391+
/// - struct_field_a (leaf 1, rep_level=1 - NOT cached)
392+
/// - struct_field_b (leaf 2, rep_level=1 - NOT cached)
393+
///
394+
/// Filter: `id > 50` (filter mask includes only `id`)
395+
/// Expected: 49 rows (id 51-99)
396+
///
397+
/// Cache behavior:
398+
/// - Only `id` (leaf 0) is cached because rep_level=0
399+
/// - List struct fields (leaves 1,2) are NOT cached because rep_level > 0
400+
#[tokio::test]
401+
async fn test_list_struct_fields_not_cached_filter_on_id() {
402+
const EXPECTED_ROWS: usize = 49;
403+
404+
// Test cases: (projection_leaves, expected_cache_reads, description)
405+
// Filter mask = [id] (leaf 0), so only id can be cached
406+
let test_cases: &[(&[usize], usize, &str)] = &[
407+
(
408+
&[0, 1, 2],
409+
EXPECTED_ROWS,
410+
"[id, list_col]: only id cached (rep_level=0)",
411+
),
412+
(
413+
&[1, 2],
414+
0,
415+
"[list_col]: nothing cached (rep_level > 0 for list fields)",
416+
),
417+
(&[0], EXPECTED_ROWS, "[id]: id cached (rep_level=0)"),
418+
(&[1], 0, "[struct_field_a]: not cached (rep_level > 0)"),
419+
(&[2], 0, "[struct_field_b]: not cached (rep_level > 0)"),
420+
];
421+
422+
for (projection_leaves, expected_cache, description) in test_cases {
423+
let test = ParquetPredicateCacheTest::new_list_struct();
424+
let async_builder = test.async_builder().await;
425+
let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr();
426+
427+
// Filter on id only: id > 50
428+
let filter_mask = ProjectionMask::leaves(&schema_descr, [0]); // only id
429+
let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| {
430+
let id = batch.column(0).as_primitive::<Int64Type>();
431+
gt(id, &Int64Array::new_scalar(50))
432+
});
433+
434+
let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied());
435+
let async_builder = async_builder
436+
.with_projection(projection)
437+
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]));
438+
439+
let metrics = ArrowReaderMetrics::enabled();
440+
let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap();
441+
let mut total_rows = 0;
442+
while let Some(batch) = stream.next().await {
443+
total_rows += batch.expect("Error").num_rows();
444+
}
445+
446+
let cache_reads = metrics.records_read_from_cache().unwrap();
447+
eprintln!(
448+
"List struct filter id>50, projection {:?}: rows={total_rows}, cache={cache_reads} ({})",
449+
projection_leaves, description
450+
);
451+
452+
assert_eq!(
453+
total_rows, EXPECTED_ROWS,
454+
"Expected {EXPECTED_ROWS} rows for {description}"
455+
);
456+
assert_eq!(
457+
cache_reads, *expected_cache,
458+
"Cache reads mismatch for {description}"
459+
);
460+
}
461+
}
462+
463+
/// Test that filtering on List->Struct fields also results in 0 cache hits.
464+
///
465+
/// Filter: on struct_field_b (field inside list->struct)
466+
/// Filter mask: [list_col] (includes leaves 1,2)
467+
///
468+
/// Since all leaves in the filter mask have rep_level > 0, nothing is cached.
469+
#[tokio::test]
470+
async fn test_list_struct_fields_not_cached_filter_on_struct_field() {
471+
// Filter on struct_field_b > 500 (i.e., rows where i*10 + j > 500, so i > 50)
472+
// This selects rows with id >= 51, so ~49 rows
473+
474+
// Test cases: (projection_leaves, description)
475+
// All should have 0 cache reads because filter mask leaves have rep_level > 0
476+
let test_cases: &[(&[usize], &str)] = &[
477+
(&[0, 1, 2], "[id, list_col]: filter mask has rep_level > 0"),
478+
(&[1, 2], "[list_col]: filter mask has rep_level > 0"),
479+
(
480+
&[0],
481+
"[id]: id not in filter mask, filter leaves have rep_level > 0",
482+
),
483+
(&[1], "[struct_field_a]: rep_level > 0"),
484+
(&[2], "[struct_field_b]: rep_level > 0"),
485+
];
486+
487+
for (projection_leaves, description) in test_cases {
488+
let test = ParquetPredicateCacheTest::new_list_struct();
489+
let async_builder = test.async_builder().await;
490+
let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr();
491+
492+
// Filter on struct_field_b: check if any element has struct_field_b > 500
493+
// Filter mask includes the entire list_col (leaves 1, 2)
494+
let filter_mask = ProjectionMask::leaves(&schema_descr, [1, 2]);
495+
let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| {
496+
// batch.column(0) is list_col
497+
let list = batch.column(0).as_list::<i32>();
498+
499+
let result: Vec<bool> = (0..batch.num_rows())
500+
.map(|row| {
501+
if list.is_null(row) {
502+
return false;
503+
}
504+
let struct_array = list.value(row);
505+
let struct_arr = struct_array.as_struct();
506+
let field_b = struct_arr
507+
.column_by_name("struct_field_b")
508+
.unwrap()
509+
.as_primitive::<arrow_array::types::Int32Type>();
510+
511+
// Check if any element has struct_field_b > 500
512+
(0..field_b.len()).any(|i| !field_b.is_null(i) && field_b.value(i) > 500)
513+
})
514+
.collect();
515+
516+
Ok(arrow_array::BooleanArray::from(result))
517+
});
518+
519+
let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied());
520+
let async_builder = async_builder
521+
.with_projection(projection)
522+
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]));
523+
524+
let metrics = ArrowReaderMetrics::enabled();
525+
let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap();
526+
let mut total_rows = 0;
527+
while let Some(batch) = stream.next().await {
528+
total_rows += batch.expect("Error").num_rows();
529+
}
530+
531+
let cache_reads = metrics.records_read_from_cache().unwrap();
532+
eprintln!(
533+
"List struct filter on struct_field_b, projection {:?}: rows={total_rows}, cache={cache_reads} ({})",
534+
projection_leaves, description
535+
);
536+
537+
// All cases should have 0 cache reads because filter mask leaves have rep_level > 0
538+
assert_eq!(
539+
cache_reads, 0,
540+
"Expected 0 cache reads for {description}, got {cache_reads}"
541+
);
542+
543+
// Should have some rows (those where any struct_field_b > 500)
544+
assert!(
545+
total_rows > 0,
546+
"Expected some rows for {description}, got {total_rows}"
547+
);
548+
}
549+
}
550+
386551
// -- Begin test infrastructure --
387552

388553
/// A test parquet file
@@ -429,6 +594,16 @@ impl ParquetPredicateCacheTest {
429594
}
430595
}
431596

597+
/// Create a new test file with List containing Struct elements.
598+
///
599+
/// See [`LIST_STRUCT_TEST_FILE_DATA`] for data details.
600+
fn new_list_struct() -> Self {
601+
Self {
602+
bytes: LIST_STRUCT_TEST_FILE_DATA.clone(),
603+
expected_records_read_from_cache: 0,
604+
}
605+
}
606+
432607
/// Set the expected number of records read from the cache
433608
fn with_expected_records_read_from_cache(
434609
mut self,
@@ -643,6 +818,72 @@ static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
643818
Bytes::from(output)
644819
});
645820

821+
/// Build a ParquetFile with a List containing Struct elements.
822+
///
823+
/// Schema:
824+
/// * `id: Int64` (leaf 0, rep_level=0 - CAN be cached)
825+
/// * `list_col: List<Struct { struct_field_a: String, struct_field_b: Int32 }>`
826+
/// - struct_field_a (leaf 1, rep_level=1 - NOT cached)
827+
/// - struct_field_b (leaf 2, rep_level=1 - NOT cached)
828+
///
829+
/// Data:
830+
/// - 100 rows (id 0..99)
831+
/// - Each row has (i % 3 + 1) list elements (1-3 elements per row)
832+
/// - struct_field_a = "val_{i}_{j}" where i=row, j=element index
833+
/// - struct_field_b = i * 10 + j
834+
static LIST_STRUCT_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
835+
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
836+
837+
const NUM_ROWS: usize = 100;
838+
839+
// id column
840+
let id: Int64Array = (0..NUM_ROWS as i64).collect();
841+
842+
// Build list of structs
843+
let struct_fields = vec![
844+
Field::new("struct_field_a", DataType::Utf8, true),
845+
Field::new("struct_field_b", DataType::Int32, true),
846+
];
847+
let mut list_builder = ListBuilder::new(StructBuilder::new(
848+
struct_fields.clone(),
849+
vec![
850+
Box::new(StringBuilder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
851+
Box::new(Int32Builder::new()) as Box<dyn arrow_array::builder::ArrayBuilder>,
852+
],
853+
));
854+
855+
for i in 0..NUM_ROWS {
856+
let num_elements = (i % 3) + 1; // 1, 2, or 3 elements
857+
for j in 0..num_elements {
858+
let struct_builder = list_builder.values();
859+
struct_builder
860+
.field_builder::<StringBuilder>(0)
861+
.unwrap()
862+
.append_value(format!("val_{}_{}", i, j));
863+
struct_builder
864+
.field_builder::<Int32Builder>(1)
865+
.unwrap()
866+
.append_value((i * 10 + j) as i32);
867+
struct_builder.append(true);
868+
}
869+
list_builder.append(true);
870+
}
871+
872+
let list_col = list_builder.finish();
873+
874+
let input_batch = RecordBatch::try_from_iter([
875+
("id", Arc::new(id) as ArrayRef),
876+
("list_col", Arc::new(list_col) as ArrayRef),
877+
])
878+
.unwrap();
879+
880+
let mut output = Vec::new();
881+
let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap();
882+
writer.write(&input_batch).unwrap();
883+
writer.close().unwrap();
884+
Bytes::from(output)
885+
});
886+
646887
trait ArrowReaderBuilderExt {
647888
/// Applies the following:
648889
/// 1. a projection selecting the "a" and "b" column

0 commit comments

Comments
 (0)