Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 0 additions & 196 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)

use crate::column::chunker::CdcChunk;
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
Expand Down Expand Up @@ -802,58 +801,11 @@ impl ArrayLevels {
pub fn non_null_indices(&self) -> &[usize] {
&self.non_null_indices
}

/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
///
/// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy),
/// while `array` is sliced without copying.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
let level_offset = chunk.level_offset;
let num_levels = chunk.num_levels;
let value_offset = chunk.value_offset;
let num_values = chunk.num_values;
let def_levels = self
.def_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
let rep_levels = self
.rep_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());

// Filter non_null_indices to [value_offset, value_offset + num_values)
// and shift by -value_offset. Use binary search since the slice is sorted.
let value_end = value_offset + num_values;
let start = self
.non_null_indices
.partition_point(|&idx| idx < value_offset);
let end = self
.non_null_indices
.partition_point(|&idx| idx < value_end);
let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
.iter()
.map(|&idx| idx - value_offset)
.collect();

let array = self.array.slice(value_offset, num_values);
let logical_nulls = array.logical_nulls();

Self {
def_levels,
rep_levels,
non_null_indices,
max_def_level: self.max_def_level,
max_rep_level: self.max_rep_level,
array,
logical_nulls,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::column::chunker::CdcChunk;

use arrow_array::builder::*;
use arrow_array::types::Int32Type;
Expand Down Expand Up @@ -2144,152 +2096,4 @@ mod tests {
let v = Arc::new(array) as ArrayRef;
LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
}

#[test]
fn test_slice_for_chunk_flat() {
// Case 1: required field (max_def_level=0, no def/rep levels stored).
// Array has 6 values; all are non-null so non_null_indices covers every position.
// The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5].
// Since there are no levels, num_levels=0 and level_offset are irrelevant.
// non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2].
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: None,
rep_levels: None,
non_null_indices: vec![0, 1, 2, 3, 4, 5],
max_def_level: 0,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 0,
value_offset: 2,
num_values: 3,
});
assert!(sliced.def_levels.is_none());
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);

// Case 2: optional field (max_def_level=1, def levels present, no rep levels).
// Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
// def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null)
// non_null_indices: [0, 2, 4, 5] (array positions of the four non-null values)
//
// The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3:
// - def_levels[1..4] = [0, 1, 0] → null, non-null, null
// - sub-array slice(1, 3) = [None, Some(3), None]
// - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies,
// shifted by -1 → [1] (position of Some(3) within the sliced sub-array)
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
Some(3),
None,
Some(5),
Some(6),
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
rep_levels: None,
non_null_indices: vec![0, 2, 4, 5],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 3,
value_offset: 1,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![1]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_nested() {
// [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
// Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![2, 2, 2, 2, 2]),
rep_levels: Some(vec![0, 1, 0, 0, 1]),
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 2,
max_rep_level: 1,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 2,
num_levels: 3,
value_offset: 2,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
// [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_non_null_indices_boundary() {
// [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1]),
rep_levels: None,
non_null_indices: vec![0, 2],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 1,
value_offset: 0,
num_values: 1
})
.non_null_indices,
vec![0]
);
// idx 2 in range [1,3), shifted -1 → 1
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 2,
value_offset: 1,
num_values: 2
})
.non_null_indices,
vec![1]
);
// idx 2 excluded from [1,2)
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 1,
value_offset: 1,
num_values: 1
})
.non_null_indices,
Vec::<usize>::new()
);
}
}
68 changes: 1 addition & 67 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! Contains writer which writes arrow data into parquet data.

use crate::column::chunker::ContentDefinedChunker;

use bytes::Bytes;
use std::io::{Read, Write};
use std::iter::Peekable;
Expand Down Expand Up @@ -194,9 +192,6 @@ pub struct ArrowWriter<W: Write> {

/// The maximum size in bytes for a row group, or None for unlimited
max_row_group_bytes: Option<usize>,

/// CDC chunkers persisted across row groups (one per leaf column).
cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
}

impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
Expand Down Expand Up @@ -266,26 +261,13 @@ impl<W: Write + Send> ArrowWriter<W> {
let row_group_writer_factory =
ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());

let cdc_chunkers = props_ptr
.content_defined_chunking()
.map(|opts| {
file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()
})
.transpose()?;

Ok(Self {
writer: file_writer,
in_progress: None,
arrow_schema,
row_group_writer_factory,
max_row_group_row_count,
max_row_group_bytes,
cdc_chunkers,
})
}

Expand Down Expand Up @@ -401,10 +383,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}
}

match self.cdc_chunkers.as_mut() {
Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
None => in_progress.write(batch)?,
}
in_progress.write(batch)?;

let should_flush = self
.max_row_group_row_count
Expand Down Expand Up @@ -893,32 +872,6 @@ impl ArrowColumnWriter {
self.write_internal(&col.0)
}

/// Write with content-defined chunking, inserting page flushes at chunk boundaries.
fn write_with_chunker(
&mut self,
col: &ArrowLeafColumn,
chunker: &mut ContentDefinedChunker,
) -> Result<()> {
let levels = &col.0;
let chunks =
chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;

let num_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let chunk_levels = levels.slice_for_chunk(chunk);
self.write_internal(&chunk_levels)?;

// Add a page break after each chunk except the last
if i + 1 < num_chunks {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
}
}
}
Ok(())
}

fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => {
Expand Down Expand Up @@ -1015,25 +968,6 @@ impl ArrowRowGroupWriter {
Ok(())
}

fn write_with_chunkers(
&mut self,
batch: &RecordBatch,
chunkers: &mut [ContentDefinedChunker],
) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut();
let mut chunkers = chunkers.iter_mut();
for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
for leaf in compute_leaves(field.as_ref(), column)? {
writers
.next()
.unwrap()
.write_with_chunker(&leaf, chunkers.next().unwrap())?;
}
}
Ok(())
}

/// Returns the estimated total encoded bytes for this row group
fn get_estimated_total_bytes(&self) -> usize {
self.writers
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/chunker/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl ContentDefinedChunker {
Ok(Self {
max_def_level: desc.max_def_level(),
max_rep_level: desc.max_rep_level(),
repeated_ancestor_def_level: desc.repeated_ancestor_def_level(),
repeated_ancestor_def_level: 0,
min_chunk_size: options.min_chunk_size as i64,
max_chunk_size: options.max_chunk_size as i64,
rolling_hash_mask,
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/column/chunker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
mod cdc;
mod cdc_generated;

pub(crate) use cdc::ContentDefinedChunker;

/// A chunk of data with level and value offsets for record-shredded nested data.
#[derive(Debug, Clone, Copy)]
pub(crate) struct CdcChunk {
Expand Down
Loading