Skip to content

Commit f8796fd

Browse files
fabianmurariualamb
andauthored
Fix ipc errors for LargeList containing sliced StringViews (#8979)
# Which issue does this PR close? - closes apache/arrow#48428 # Rationale for this change LargeList containing StringView with offsets that do not start at zero are wrongly encoded in IPC # What changes are included in this PR? Before writing the StringView slice the buffers according to the offsets # Are these changes tested? added unit tests # Are there any user-facing changes? No --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent b25a93b commit f8796fd

File tree

1 file changed

+114
-15
lines changed

1 file changed

+114
-15
lines changed

arrow-ipc/src/writer.rs

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,28 @@ fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Arra
17051705
(offsets, child_data)
17061706
}
17071707

1708+
/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1709+
///
1710+
/// The views buffer is sliced to only include views in the valid range based on
1711+
/// the array's offset and length. This helps reduce the encoded size of sliced
1712+
/// arrays
1713+
///
1714+
fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1715+
let buffer = &array_data.buffers()[0];
1716+
let layout = layout(array_data.data_type());
1717+
let spec = &layout.buffers[0];
1718+
1719+
let byte_width = get_buffer_element_width(spec);
1720+
let min_length = array_data.len() * byte_width;
1721+
if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1722+
let byte_offset = array_data.offset() * byte_width;
1723+
let buffer_length = min(min_length, buffer.len() - byte_offset);
1724+
&buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1725+
} else {
1726+
buffer.as_slice()
1727+
}
1728+
}
1729+
17081730
/// Write array data to a vector of bytes
17091731
#[allow(clippy::too_many_arguments)]
17101732
fn write_array_data(
@@ -1772,7 +1794,18 @@ fn write_array_data(
17721794
// Current implementation just serialize the raw arrays as given and not try to optimize anything.
17731795
// If users wants to "compact" the arrays prior to sending them over IPC,
17741796
// they should consider the gc API suggested in #5513
1775-
for buffer in array_data.buffers() {
1797+
let views = get_or_truncate_buffer(array_data);
1798+
offset = write_buffer(
1799+
views,
1800+
buffers,
1801+
arrow_data,
1802+
offset,
1803+
compression_codec,
1804+
compression_context,
1805+
write_options.alignment,
1806+
)?;
1807+
1808+
for buffer in array_data.buffers().iter().skip(1) {
17761809
offset = write_buffer(
17771810
buffer.as_slice(),
17781811
buffers,
@@ -1806,21 +1839,9 @@ fn write_array_data(
18061839
// Truncate values
18071840
assert_eq!(array_data.buffers().len(), 1);
18081841

1809-
let buffer = &array_data.buffers()[0];
1810-
let layout = layout(data_type);
1811-
let spec = &layout.buffers[0];
1812-
1813-
let byte_width = get_buffer_element_width(spec);
1814-
let min_length = array_data.len() * byte_width;
1815-
let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1816-
let byte_offset = array_data.offset() * byte_width;
1817-
let buffer_length = min(min_length, buffer.len() - byte_offset);
1818-
&buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1819-
} else {
1820-
buffer.as_slice()
1821-
};
1842+
let buffer = get_or_truncate_buffer(array_data);
18221843
offset = write_buffer(
1823-
buffer_slice,
1844+
buffer,
18241845
buffers,
18251846
arrow_data,
18261847
offset,
@@ -2020,6 +2041,7 @@ mod tests {
20202041
use arrow_array::builder::Float32Builder;
20212042
use arrow_array::builder::Int64Builder;
20222043
use arrow_array::builder::MapBuilder;
2044+
use arrow_array::builder::StringViewBuilder;
20232045
use arrow_array::builder::UnionBuilder;
20242046
use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
20252047
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
@@ -2915,6 +2937,40 @@ mod tests {
29152937
ls.finish()
29162938
}
29172939

2940+
fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2941+
let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
2942+
2943+
for i in 0..100_000 {
2944+
for value in [
2945+
format!("value{}", i),
2946+
format!("value{}", i),
2947+
format!("value{}", i),
2948+
] {
2949+
ls.values().append_value(&value);
2950+
}
2951+
ls.append(true)
2952+
}
2953+
2954+
ls.finish()
2955+
}
2956+
2957+
fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2958+
let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
2959+
2960+
for i in 0..100_000 {
2961+
for value in [
2962+
format!("value{}", i),
2963+
format!("value{}", i),
2964+
format!("value{}", i),
2965+
] {
2966+
ls.values().append_value(&value);
2967+
}
2968+
ls.append(true)
2969+
}
2970+
2971+
ls.finish()
2972+
}
2973+
29182974
fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
29192975
let mut ls =
29202976
GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
@@ -3074,6 +3130,49 @@ mod tests {
30743130
roundtrip_ensure_sliced_smaller(in_batch, 1000);
30753131
}
30763132

3133+
#[test]
3134+
fn encode_large_lists_non_zero_offset() {
3135+
let val_inner = Field::new_list_field(DataType::UInt32, true);
3136+
let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3137+
let schema = Arc::new(Schema::new(vec![val_list_field]));
3138+
3139+
let values = Arc::new(generate_list_data::<i64>());
3140+
3141+
check_sliced_list_array(schema, values);
3142+
}
3143+
3144+
#[test]
3145+
fn encode_large_lists_string_non_zero_offset() {
3146+
let val_inner = Field::new_list_field(DataType::Utf8, true);
3147+
let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3148+
let schema = Arc::new(Schema::new(vec![val_list_field]));
3149+
3150+
let values = Arc::new(generate_string_list_data::<i64>());
3151+
3152+
check_sliced_list_array(schema, values);
3153+
}
3154+
3155+
#[test]
3156+
fn encode_large_list_string_view_non_zero_offset() {
3157+
let val_inner = Field::new_list_field(DataType::Utf8View, true);
3158+
let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3159+
let schema = Arc::new(Schema::new(vec![val_list_field]));
3160+
3161+
let values = Arc::new(generate_utf8view_list_data::<i64>());
3162+
3163+
check_sliced_list_array(schema, values);
3164+
}
3165+
3166+
fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3167+
for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3168+
let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3169+
.unwrap()
3170+
.slice(offset, len);
3171+
let out_batch = deserialize_file(serialize_file(&in_batch));
3172+
assert_eq!(in_batch, out_batch);
3173+
}
3174+
}
3175+
30773176
#[test]
30783177
fn encode_nested_lists() {
30793178
let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));

0 commit comments

Comments
 (0)