Skip to content

Commit f4599fb

Browse files
committed
Add integration tests
1 parent 918a7fc commit f4599fb

File tree

3 files changed

+439
-18
lines changed

3 files changed

+439
-18
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ impl<'a> ArrayReaderBuilder<'a> {
162162
col_idx,
163163
cache_options.role,
164164
self.metrics.clone(), // cheap clone
165+
field.def_level > 0, // needs_def_levels: true if has nullable ancestors
165166
))))
166167
} else {
167168
Ok(Some(reader))

parquet/src/arrow/array_reader/cached_array_reader.rs

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,31 @@ pub struct CachedArrayReader {
9191
def_levels_buffer: Option<Vec<i16>>,
9292
/// Repetition levels for the last consume_batch() output
9393
rep_levels_buffer: Option<Vec<i16>>,
94+
/// Whether this reader needs definition levels (def_level > 0, i.e., has nullable ancestors)
95+
/// When false, we skip copying definition levels to avoid unnecessary allocations.
96+
needs_def_levels: bool,
9497
}
9598

9699
impl CachedArrayReader {
97100
/// Creates a new cached array reader with the specified role
101+
///
102+
/// # Arguments
103+
/// * `inner` - The underlying array reader
104+
/// * `cache` - Shared cache for this row group
105+
/// * `column_idx` - Column index for cache key generation
106+
/// * `role` - Producer or Consumer role
107+
/// * `metrics` - Statistics to report on cache behavior
108+
/// * `needs_def_levels` - Whether this column needs definition levels (def_level > 0).
109+
/// When false, definition levels are not copied to avoid unnecessary allocations.
110+
/// Note: repetition levels are never copied for cached columns since caching is
111+
/// only enabled for columns with rep_level == 0.
98112
pub fn new(
99113
inner: Box<dyn ArrayReader>,
100114
cache: Arc<Mutex<RowGroupCache>>,
101115
column_idx: usize,
102116
role: CacheRole,
103117
metrics: ArrowReaderMetrics,
118+
needs_def_levels: bool,
104119
) -> Self {
105120
let batch_size = cache.lock().unwrap().batch_size();
106121

@@ -117,6 +132,7 @@ impl CachedArrayReader {
117132
metrics,
118133
def_levels_buffer: None,
119134
rep_levels_buffer: None,
135+
needs_def_levels,
120136
}
121137
}
122138

@@ -152,9 +168,17 @@ impl CachedArrayReader {
152168

153169
let array = self.inner.consume_batch()?;
154170

155-
// Capture definition and repetition levels from inner reader before they are cleared
156-
let def_levels = self.inner.get_def_levels().map(|l| l.to_vec());
157-
let rep_levels = self.inner.get_rep_levels().map(|l| l.to_vec());
171+
// Capture definition levels from inner reader only when needed (def_level > 0).
172+
// This avoids unnecessary allocations for columns without nullable ancestors.
173+
// Repetition levels are never copied because caching is only enabled for
174+
// columns with rep_level == 0 (non-nested, non-repeated columns).
175+
let def_levels = if self.needs_def_levels {
176+
self.inner.get_def_levels().map(|l| l.to_vec())
177+
} else {
178+
None
179+
};
180+
// rep_levels always None for cached columns (rep_level == 0 per builder.rs)
181+
let rep_levels = None;
158182
let cached_batch = CachedBatch::with_levels(array, def_levels, rep_levels);
159183

160184
// Store in both shared cache and local cache
@@ -506,6 +530,7 @@ mod tests {
506530
0,
507531
CacheRole::Producer,
508532
metrics,
533+
false, // needs_def_levels: basic test doesn't use levels
509534
);
510535

511536
// Read 3 records
@@ -534,6 +559,7 @@ mod tests {
534559
0,
535560
CacheRole::Consumer,
536561
metrics,
562+
false, // needs_def_levels: basic test doesn't use levels
537563
);
538564

539565
let read1 = cached_reader.read_records(2).unwrap();
@@ -568,6 +594,7 @@ mod tests {
568594
0,
569595
CacheRole::Consumer,
570596
metrics,
597+
false, // needs_def_levels
571598
);
572599

573600
// Multiple reads should accumulate
@@ -595,6 +622,7 @@ mod tests {
595622
0,
596623
CacheRole::Consumer,
597624
metrics,
625+
false, // needs_def_levels
598626
);
599627

600628
// Try to read more than available
@@ -625,6 +653,7 @@ mod tests {
625653
0,
626654
CacheRole::Producer,
627655
metrics.clone(),
656+
false, // needs_def_levels
628657
);
629658

630659
cached_reader1.read_records(3).unwrap();
@@ -639,6 +668,7 @@ mod tests {
639668
1,
640669
CacheRole::Consumer,
641670
metrics.clone(),
671+
false, // needs_def_levels
642672
);
643673

644674
cached_reader2.read_records(2).unwrap();
@@ -661,6 +691,7 @@ mod tests {
661691
0,
662692
CacheRole::Consumer,
663693
metrics,
694+
false, // needs_def_levels
664695
);
665696

666697
// Read first batch (positions 0-2, batch 0)
@@ -714,6 +745,7 @@ mod tests {
714745
0,
715746
CacheRole::Producer,
716747
metrics,
748+
false, // needs_def_levels
717749
);
718750

719751
// Read first batch (positions 0-2)
@@ -747,6 +779,7 @@ mod tests {
747779
0,
748780
CacheRole::Consumer,
749781
metrics,
782+
false, // needs_def_levels
750783
);
751784

752785
// Read records which should populate both shared and local cache
@@ -784,6 +817,7 @@ mod tests {
784817
0,
785818
CacheRole::Consumer,
786819
metrics,
820+
false, // needs_def_levels
787821
);
788822

789823
// Read records which should populate both shared and local cache
@@ -811,6 +845,7 @@ mod tests {
811845
0,
812846
CacheRole::Producer,
813847
metrics.clone(),
848+
false, // needs_def_levels
814849
);
815850

816851
// Populate cache with first batch (1, 2, 3)
@@ -824,6 +859,7 @@ mod tests {
824859
0,
825860
CacheRole::Consumer,
826861
metrics,
862+
false, // needs_def_levels
827863
);
828864

829865
// - We want to read 4 records starting from position 0
@@ -928,6 +964,7 @@ mod tests {
928964
0,
929965
CacheRole::Producer,
930966
metrics,
967+
true, // needs_def_levels: test level propagation
931968
);
932969

933970
// Read all 5 records
@@ -937,14 +974,15 @@ mod tests {
937974
let array = cached_reader.consume_batch().unwrap();
938975
assert_eq!(array.len(), 5);
939976

940-
// Verify levels are captured
977+
// Verify definition levels are captured
941978
let def_levels = cached_reader.get_def_levels();
942979
assert!(def_levels.is_some());
943980
assert_eq!(def_levels.unwrap(), &[1, 1, 0, 1, 1]);
944981

982+
// Repetition levels are not copied because caching is only enabled
983+
// for columns with rep_level == 0 (non-nested columns)
945984
let rep_levels = cached_reader.get_rep_levels();
946-
assert!(rep_levels.is_some());
947-
assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 0, 1]);
985+
assert!(rep_levels.is_none());
948986
}
949987

950988
#[test]
@@ -963,6 +1001,7 @@ mod tests {
9631001
0,
9641002
CacheRole::Producer,
9651003
metrics,
1004+
true, // needs_def_levels: test level propagation
9661005
);
9671006

9681007
// Read 2 records
@@ -985,14 +1024,14 @@ mod tests {
9851024
let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
9861025
assert_eq!(int32_array.values(), &[1, 2, 5, 6]);
9871026

988-
// Verify levels match the selected values
1027+
// Verify definition levels match the selected values
9891028
let def_levels = cached_reader.get_def_levels();
9901029
assert!(def_levels.is_some());
9911030
assert_eq!(def_levels.unwrap(), &[1, 1, 1, 0]); // def_levels for positions 0, 1, 4, 5
9921031

1032+
// Repetition levels are not copied (rep_level == 0 for cached columns)
9931033
let rep_levels = cached_reader.get_rep_levels();
994-
assert!(rep_levels.is_some());
995-
assert_eq!(rep_levels.unwrap(), &[0, 1, 1, 1]); // rep_levels for positions 0, 1, 4, 5
1034+
assert!(rep_levels.is_none());
9961035
}
9971036

9981037
#[test]
@@ -1011,6 +1050,7 @@ mod tests {
10111050
0,
10121051
CacheRole::Producer,
10131052
metrics,
1053+
true, // needs_def_levels: test level propagation
10141054
);
10151055

10161056
// Read all 6 records (spanning 2 batches)
@@ -1020,14 +1060,14 @@ mod tests {
10201060
let array = cached_reader.consume_batch().unwrap();
10211061
assert_eq!(array.len(), 6);
10221062

1023-
// Verify levels are correctly concatenated from both batches
1063+
// Verify definition levels are correctly concatenated from both batches
10241064
let def_levels = cached_reader.get_def_levels();
10251065
assert!(def_levels.is_some());
10261066
assert_eq!(def_levels.unwrap(), &[1, 0, 1, 1, 0, 1]);
10271067

1068+
// Repetition levels are not copied (rep_level == 0 for cached columns)
10281069
let rep_levels = cached_reader.get_rep_levels();
1029-
assert!(rep_levels.is_some());
1030-
assert_eq!(rep_levels.unwrap(), &[0, 0, 1, 0, 0, 1]);
1070+
assert!(rep_levels.is_none());
10311071
}
10321072

10331073
#[test]
@@ -1042,14 +1082,15 @@ mod tests {
10421082
0,
10431083
CacheRole::Producer,
10441084
metrics,
1085+
false, // needs_def_levels: false since inner reader has no levels
10451086
);
10461087

10471088
let records_read = cached_reader.read_records(3).unwrap();
10481089
assert_eq!(records_read, 3);
10491090

10501091
let _array = cached_reader.consume_batch().unwrap();
10511092

1052-
// Should return None since inner reader has no levels
1093+
// Should return None since inner reader has no levels and needs_def_levels is false
10531094
assert!(cached_reader.get_def_levels().is_none());
10541095
assert!(cached_reader.get_rep_levels().is_none());
10551096
}
@@ -1072,6 +1113,7 @@ mod tests {
10721113
0,
10731114
CacheRole::Producer,
10741115
metrics.clone(),
1116+
true, // needs_def_levels: test level propagation
10751117
);
10761118

10771119
// Producer reads and populates cache
@@ -1090,6 +1132,7 @@ mod tests {
10901132
0, // Same column index
10911133
CacheRole::Consumer,
10921134
metrics,
1135+
true, // needs_def_levels: test level propagation
10931136
);
10941137

10951138
consumer.read_records(5).unwrap();
@@ -1099,9 +1142,10 @@ mod tests {
10991142
let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
11001143
assert_eq!(int32_array.values(), &[1, 2, 3, 4, 5]);
11011144

1102-
// Should get original levels from cache
1145+
// Should get original definition levels from cache
11031146
assert_eq!(consumer.get_def_levels().unwrap(), &[1, 0, 1, 1, 0]);
1104-
assert_eq!(consumer.get_rep_levels().unwrap(), &[0, 1, 0, 1, 1]);
1147+
// Repetition levels are not cached (rep_level == 0 for cached columns)
1148+
assert!(consumer.get_rep_levels().is_none());
11051149
}
11061150

11071151
#[test]
@@ -1121,6 +1165,7 @@ mod tests {
11211165
0,
11221166
CacheRole::Producer,
11231167
metrics.clone(),
1168+
true, // needs_def_levels: test level propagation
11241169
);
11251170

11261171
producer.read_records(4).unwrap();
@@ -1138,6 +1183,7 @@ mod tests {
11381183
0,
11391184
CacheRole::Consumer,
11401185
metrics,
1186+
true, // needs_def_levels: test level propagation
11411187
);
11421188

11431189
let skipped = consumer.skip_records(4).unwrap();
@@ -1146,9 +1192,11 @@ mod tests {
11461192
let array = consumer.consume_batch().unwrap();
11471193
assert_eq!(array.len(), 0);
11481194

1195+
// Definition levels should be empty (not None) after skip
11491196
let def_levels = consumer.get_def_levels().map(|l| l.to_vec());
11501197
assert_eq!(def_levels, Some(vec![]));
1151-
let rep_levels = consumer.get_rep_levels().map(|l| l.to_vec());
1152-
assert_eq!(rep_levels, Some(vec![]));
1198+
// Repetition levels are not cached (rep_level == 0 for cached columns)
1199+
let rep_levels = consumer.get_rep_levels();
1200+
assert!(rep_levels.is_none());
11531201
}
11541202
}

0 commit comments

Comments
 (0)