Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 105 additions & 98 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,37 +805,29 @@ impl ArrayLevels {

/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
///
/// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy),
/// while `array` is sliced without copying.
/// The chunk's `value_offset`/`num_values` select the relevant slice of
/// `non_null_indices`. The array is sliced to the range covered by
/// those indices, and they are shifted to be relative to the slice.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
let level_offset = chunk.level_offset;
let num_levels = chunk.num_levels;
let value_offset = chunk.value_offset;
let num_values = chunk.num_values;
let def_levels = self
.def_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
let rep_levels = self
.rep_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());

// Filter non_null_indices to [value_offset, value_offset + num_values)
// and shift by -value_offset. Use binary search since the slice is sorted.
let value_end = value_offset + num_values;
let start = self
.non_null_indices
.partition_point(|&idx| idx < value_offset);
let end = self
.non_null_indices
.partition_point(|&idx| idx < value_end);
let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
.iter()
.map(|&idx| idx - value_offset)
.collect();
let def_levels = self.def_levels.as_ref().map(|levels| {
levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
});
let rep_levels = self.rep_levels.as_ref().map(|levels| {
levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
});

let array = self.array.slice(value_offset, num_values);
// Select the non-null indices for this chunk.
let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
// Compute the array range spanned by the non-null indices.
// When nni is empty (all-null chunk), start=0, end=0 → zero-length
// array slice; write_batch_internal will process only the def/rep
// levels and write no values.
let start = nni.first().copied().unwrap_or(0);
let end = nni.last().map_or(0, |&i| i + 1);
// Shift indices to be relative to the sliced array.
let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
// Slice the array to the computed range.
let array = self.array.slice(start, end - start);
let logical_nulls = array.logical_nulls();

Self {
Expand Down Expand Up @@ -2149,9 +2141,8 @@ mod tests {
fn test_slice_for_chunk_flat() {
// Case 1: required field (max_def_level=0, no def/rep levels stored).
// Array has 6 values; all are non-null so non_null_indices covers every position.
// The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5].
// Since there are no levels, num_levels=0 and level_offset are irrelevant.
// non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2].
// value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
// Array is sliced (no def_levels → write_batch_internal uses values.len()).
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
Expand All @@ -2176,14 +2167,9 @@ mod tests {

// Case 2: optional field (max_def_level=1, def levels present, no rep levels).
// Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
// def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null)
// non_null_indices: [0, 2, 4, 5] (array positions of the four non-null values)
//
// The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3:
// - def_levels[1..4] = [0, 1, 0] → null, non-null, null
// - sub-array slice(1, 3) = [None, Some(3), None]
// - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies,
// shifted by -1 → [1] (position of Some(3) within the sliced sub-array)
// non_null_indices: [0, 2, 4, 5]
// value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
// Array is not sliced (def_levels present → num_levels from def_levels.len()).
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
Expand All @@ -2206,90 +2192,111 @@ mod tests {
level_offset: 1,
num_levels: 3,
value_offset: 1,
num_values: 3,
num_values: 1,
});
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![1]);
assert_eq!(sliced.array.len(), 3);
assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0])
assert_eq!(sliced.array.len(), 1);
}

#[test]
fn test_slice_for_chunk_nested() {
// [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
// Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
fn test_slice_for_chunk_nested_with_nulls() {
// Regression test for https://github.com/apache/arrow-rs/issues/9637
//
// Simulates a List<Int32?> where null list entries have non-zero child
// ranges (valid per Arrow spec: "a null value may correspond to a
// non-empty segment in the child array"). This creates gaps in the
// leaf array that don't correspond to any levels.
//
// 5 rows with 2 null list entries owning non-empty child ranges:
// row 0: [1] → leaf[0]
// row 1: null list → owns leaf[1..3] (gap of 2)
// row 2: [2, null] → leaf[3], leaf[4]=null element
// row 3: null list → owns leaf[5..8] (gap of 3)
// row 4: [4, 5] → leaf[8], leaf[9]
//
// def_levels: [3, 0, 3, 2, 0, 3, 3]
// rep_levels: [0, 0, 0, 1, 0, 0, 1]
// non_null_indices: [0, 3, 8, 9]
// gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1), // 0: row 0
None, // 1: gap (null list row 1)
None, // 2: gap (null list row 1)
Some(2), // 3: row 2
None, // 4: row 2, null element
None, // 5: gap (null list row 3)
None, // 6: gap (null list row 3)
None, // 7: gap (null list row 3)
Some(4), // 8: row 4
Some(5), // 9: row 4
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![2, 2, 2, 2, 2]),
rep_levels: Some(vec![0, 1, 0, 0, 1]),
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 2,
def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
non_null_indices: vec![0, 3, 8, 9],
max_def_level: 3,
max_rep_level: 1,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {

// Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
let chunk0 = levels.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 2,
value_offset: 0,
num_values: 1,
});
assert_eq!(chunk0.non_null_indices, vec![0]);
assert_eq!(chunk0.array.len(), 1);

// Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
let chunk1 = levels.slice_for_chunk(&CdcChunk {
level_offset: 2,
num_levels: 3,
value_offset: 1,
num_values: 1,
});
assert_eq!(chunk1.non_null_indices, vec![0]);
assert_eq!(chunk1.array.len(), 1);

// Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
let chunk2 = levels.slice_for_chunk(&CdcChunk {
level_offset: 5,
num_levels: 2,
value_offset: 2,
num_values: 3,
num_values: 2,
});
assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
// [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);
assert_eq!(chunk2.non_null_indices, vec![0, 1]);
assert_eq!(chunk2.array.len(), 2);
}

#[test]
fn test_slice_for_chunk_non_null_indices_boundary() {
// [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
fn test_slice_for_chunk_all_null() {
// All-null chunk: num_values=0 → empty nni slice → zero-length array.
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1]),
def_levels: Some(vec![1, 0, 0, 1]),
rep_levels: None,
non_null_indices: vec![0, 2],
non_null_indices: vec![0, 3],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 1,
value_offset: 0,
num_values: 1
})
.non_null_indices,
vec![0]
);
// idx 2 in range [1,3), shifted -1 → 1
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 2,
value_offset: 1,
num_values: 2
})
.non_null_indices,
vec![1]
);
// idx 2 excluded from [1,2)
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 1,
value_offset: 1,
num_values: 1
})
.non_null_indices,
Vec::<usize>::new()
);
// Chunk covering only the two null rows (levels 1..3), zero non-null values.
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 2,
value_offset: 1,
num_values: 0,
});
assert_eq!(sliced.def_levels, Some(vec![0, 0]));
assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
assert_eq!(sliced.array.len(), 0);
}
}
Loading
Loading