Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,20 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat
}
DataType::LargeList(Arc::new(children.get(0).into()))
}
crate::Type::ListView => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a listview to have one child")
}
DataType::ListView(Arc::new(children.get(0).into()))
}
crate::Type::LargeListView => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a large listview to have one child")
}
DataType::LargeListView(Arc::new(children.get(0).into()))
}
crate::Type::FixedSizeList => {
let children = field.children().unwrap();
if children.len() != 1 {
Expand Down Expand Up @@ -769,7 +783,24 @@ pub(crate) fn get_fb_field_type<'a>(
children: Some(fbb.create_vector(&[child])),
}
}
ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
ListView(list_type) => {
let child = build_field(fbb, dictionary_tracker, list_type);
FBFieldType {
type_type: crate::Type::ListView,
type_: crate::ListViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
}
LargeListView(list_type) => {
let child = build_field(fbb, dictionary_tracker, list_type);
FBFieldType {
type_type: crate::Type::LargeListView,
type_: crate::LargeListViewBuilder::new(fbb)
.finish()
.as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
}
LargeList(list_type) => {
let child = build_field(fbb, dictionary_tracker, list_type);
FBFieldType {
Expand Down
34 changes: 34 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ impl RecordBatchDecoder<'_> {
let values = self.create_array(list_field, variadic_counts)?;
self.create_list_array(list_node, data_type, &list_buffers, values)
}
ListView(list_field) | LargeListView(list_field) => {
let list_node = self.next_node(field)?;
let list_buffers = [
self.next_buffer()?, // null buffer
self.next_buffer()?, // offsets
self.next_buffer()?, // sizes
];
let values = self.create_array(list_field, variadic_counts)?;
self.create_list_view_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
Expand Down Expand Up @@ -322,6 +332,30 @@ impl RecordBatchDecoder<'_> {
self.create_array_from_builder(builder)
}

fn create_list_view_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> Result<ArrayRef, ArrowError> {
assert!(matches!(data_type, ListView(_) | LargeListView(_)));

let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let length = field_node.length() as usize;
let child_data = child_array.into_data();

self.create_array_from_builder(
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone()) // offsets
.add_buffer(buffers[2].clone()) // sizes
.add_child_data(child_data)
.null_bit_buffer(null_buffer)
.null_count(field_node.null_count() as usize),
)
}

fn create_struct_array(
&self,
struct_node: &FieldNode,
Expand Down
192 changes: 191 additions & 1 deletion arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,36 @@ fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Arra
(offsets, child_data)
}

/// Returns the offsets, sizes, and child data buffers for a ListView array.
///
/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
/// and offsets can be non-monotonic. When slicing, we simply pass through the
/// offsets and sizes without re-encoding, and do not slice the child data.
fn get_list_view_array_buffers<O: OffsetSizeTrait>(
data: &ArrayData,
) -> (Buffer, Buffer, ArrayData) {
if data.is_empty() {
return (
MutableBuffer::new(0).into(),
MutableBuffer::new(0).into(),
data.child_data()[0].slice(0, 0),
);
}

let offsets = &data.buffers()[0];
let sizes = &data.buffers()[1];

let element_size = std::mem::size_of::<O>();
let offsets_slice =
offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
let sizes_slice =
sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);

let child_data = data.child_data()[0].clone();

(offsets_slice, sizes_slice, child_data)
}

/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
///
/// The views buffer is sliced to only include views in the valid range based on
Expand Down Expand Up @@ -1901,6 +1931,52 @@ fn write_array_data(
write_options,
)?;
return Ok(offset);
} else if matches!(
data_type,
DataType::ListView(_) | DataType::LargeListView(_)
) {
assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
assert_eq!(array_data.child_data().len(), 1);

let (offsets, sizes, child_data) = match data_type {
DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
_ => unreachable!(),
};

offset = write_buffer(
offsets.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
compression_context,
write_options.alignment,
)?;

offset = write_buffer(
sizes.as_slice(),
buffers,
arrow_data,
offset,
compression_codec,
compression_context,
write_options.alignment,
)?;

offset = write_array_data(
&child_data,
buffers,
arrow_data,
nodes,
offset,
child_data.len(),
child_data.null_count(),
compression_codec,
compression_context,
write_options,
)?;
return Ok(offset);
} else if let DataType::FixedSizeList(_, fixed_size) = data_type {
assert_eq!(array_data.child_data().len(), 1);
let fixed_size = *fixed_size as usize;
Expand Down Expand Up @@ -2043,7 +2119,9 @@ mod tests {
use arrow_array::builder::MapBuilder;
use arrow_array::builder::StringViewBuilder;
use arrow_array::builder::UnionBuilder;
use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
use arrow_array::builder::{
GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
};
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
use arrow_array::types::*;
use arrow_buffer::ScalarBuffer;
Expand Down Expand Up @@ -3212,6 +3290,118 @@ mod tests {
roundtrip_ensure_sliced_smaller(in_batch, 1000);
}

fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());

for i in 0u32..100_000 {
if i.is_multiple_of(10_000) {
builder.append(false);
continue;
}
for value in [i, i, i] {
builder.values().append_value(value);
}
builder.append(true);
Comment thread
alamb marked this conversation as resolved.
}

builder.finish()
}

#[test]
fn encode_list_view_arrays() {
let val_inner = Field::new_list_field(DataType::UInt32, true);
let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
let schema = Arc::new(Schema::new(vec![val_field]));

let values = Arc::new(generate_list_view_data::<i32>());

let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
let out_batch = deserialize_file(serialize_file(&in_batch));
assert_eq!(in_batch, out_batch);
}

#[test]
fn encode_large_list_view_arrays() {
let val_inner = Field::new_list_field(DataType::UInt32, true);
let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
let schema = Arc::new(Schema::new(vec![val_field]));

let values = Arc::new(generate_list_view_data::<i64>());

let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
let out_batch = deserialize_file(serialize_file(&in_batch));
assert_eq!(in_batch, out_batch);
}

#[test]
fn check_sliced_list_view_array() {
let inner = Field::new_list_field(DataType::UInt32, true);
let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
let schema = Arc::new(Schema::new(vec![field]));
let values = Arc::new(generate_list_view_data::<i32>());

for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
.unwrap()
.slice(offset, len);
let out_batch = deserialize_file(serialize_file(&in_batch));
assert_eq!(in_batch, out_batch);
}
}

#[test]
fn check_sliced_large_list_view_array() {
let inner = Field::new_list_field(DataType::UInt32, true);
let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
let schema = Arc::new(Schema::new(vec![field]));
let values = Arc::new(generate_list_view_data::<i64>());

for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
.unwrap()
.slice(offset, len);
let out_batch = deserialize_file(serialize_file(&in_batch));
assert_eq!(in_batch, out_batch);
}
}

fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

let inner_builder = UInt32Builder::new();
let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);

for i in 0u32..10_000 {
if i.is_multiple_of(1_000) {
outer_builder.append(false);
continue;
}

for _ in 0..3 {
for value in [i, i + 1, i + 2] {
outer_builder.values().values().append_value(value);
}
outer_builder.values().append(true);
}
outer_builder.append(true);
}

outer_builder.finish()
}

#[test]
fn encode_nested_list_views() {
let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
let schema = Arc::new(Schema::new(vec![list_field]));

let values = Arc::new(generate_nested_list_view_data::<i32>());

let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
let out_batch = deserialize_file(serialize_file(&in_batch));
assert_eq!(in_batch, out_batch);
}

#[test]
fn test_decimal128_alignment16_is_sufficient() {
const IPC_ALIGNMENT: usize = 16;
Expand Down
Loading