Skip to content

Commit 677ceca

Browse files
committed
Preserve VariantType for missing variant fields
Import VariantArrayBuilder and change shredded path handling to gracefully handle non-Struct fields by falling back to the value column instead of erroring.
1 parent 368d86c commit 677ceca

File tree

13 files changed

+536
-161
lines changed

13 files changed

+536
-161
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,14 @@ parquet-variant-compute = { version = "58.1.0", path = "./parquet-variant-comput
108108

109109
chrono = { version = "0.4.40", default-features = false, features = ["clock"] }
110110

111-
simdutf8 = { version = "0.1.5", default-features = false }
112-
113111
criterion = { version = "0.8.0", default-features = false }
114112

115113
insta = { version = "1.46.3", default-features = false }
116114

115+
object_store = { version = "0.13.2", default-features = false }
116+
117+
simdutf8 = { version = "0.1.5", default-features = false }
118+
117119
# release inherited profile keeping debug information and symbols
118120
# for mem/cpu profiling
119121
[profile.profiling]

arrow-avro/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ arrow-buffer = { workspace = true }
5656
arrow-array = { workspace = true }
5757
arrow-select = { workspace = true, optional = true }
5858

59-
object_store = { version = "0.13", default-features = false, optional = true }
59+
object_store = { workspace = true, optional = true }
6060

6161
bytes = { version = "1.11.0", default-features = false, features = ["std"] }
6262
serde_json = { version = "1.0", default-features = false, features = ["std"] }
@@ -75,7 +75,7 @@ uuid = "1.17"
7575
indexmap = "2.10"
7676
rand = "0.9"
7777
md5 = { version = "0.8", optional = true }
78-
sha2 = { version = "0.10", optional = true }
78+
sha2 = { version = "0.11", optional = true }
7979
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
8080

8181
[dev-dependencies]
@@ -93,7 +93,7 @@ futures = "0.3.31"
9393
async-stream = "0.3.6"
9494
apache-avro = "0.21.0"
9595
num-bigint = "0.4"
96-
object_store = { version = "0.13", default-features = false, features = ["fs"] }
96+
object_store = { workspace = true, features = ["fs"] }
9797
once_cell = "1.21.3"
9898
half = { version = "2.1", default-features = false }
9999
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] }

arrow-data/src/transform/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,8 +813,8 @@ impl<'a> MutableArrayData<'a> {
813813
};
814814

815815
let nulls = match data.data_type {
816-
// RunEndEncoded and Null arrays cannot have top-level null bitmasks
817-
DataType::RunEndEncoded(_, _) | DataType::Null => None,
816+
// RunEndEncoded, Null, and Union arrays cannot have top-level null bitmasks
817+
DataType::RunEndEncoded(_, _) | DataType::Null | DataType::Union(_, _) => None,
818818
_ => data
819819
.null_buffer
820820
.map(|nulls| {

arrow-data/src/transform/union.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use super::{_MutableArrayData, Extend};
1919
use crate::ArrayData;
20+
use arrow_schema::DataType;
2021

2122
pub(super) fn build_extend_sparse(array: &ArrayData) -> Extend<'_> {
2223
let type_ids = array.buffer::<i8>(0);
@@ -68,10 +69,42 @@ pub(super) fn build_extend_dense(array: &ArrayData) -> Extend<'_> {
6869
)
6970
}
7071

71-
pub(super) fn extend_nulls_dense(_mutable: &mut _MutableArrayData, _len: usize) {
72-
panic!("cannot call extend_nulls on UnionArray as cannot infer type");
72+
pub(super) fn extend_nulls_dense(mutable: &mut _MutableArrayData, len: usize) {
73+
let DataType::Union(fields, _) = &mutable.data_type else {
74+
unreachable!()
75+
};
76+
let first_type_id = fields
77+
.iter()
78+
.next()
79+
.expect("union must have at least one field")
80+
.0;
81+
82+
// Extend type_ids buffer
83+
mutable.buffer1.extend_from_slice(&vec![first_type_id; len]);
84+
85+
// Dense: extend offsets pointing into the first child, then extend nulls in that child
86+
let child_offset = mutable.child_data[0].len();
87+
let (start, end) = (child_offset as i32, (child_offset + len) as i32);
88+
mutable.buffer2.extend(start..end);
89+
mutable.child_data[0].extend_nulls(len);
7390
}
7491

75-
pub(super) fn extend_nulls_sparse(_mutable: &mut _MutableArrayData, _len: usize) {
76-
panic!("cannot call extend_nulls on UnionArray as cannot infer type");
92+
pub(super) fn extend_nulls_sparse(mutable: &mut _MutableArrayData, len: usize) {
93+
let DataType::Union(fields, _) = &mutable.data_type else {
94+
unreachable!()
95+
};
96+
let first_type_id = fields
97+
.iter()
98+
.next()
99+
.expect("union must have at least one field")
100+
.0;
101+
102+
// Extend type_ids buffer
103+
mutable.buffer1.extend_from_slice(&vec![first_type_id; len]);
104+
105+
// Sparse: extend nulls in ALL children
106+
mutable
107+
.child_data
108+
.iter_mut()
109+
.for_each(|child| child.extend_nulls(len));
77110
}

arrow-json/src/lib.rs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,17 @@ impl JsonSerializable for f64 {
179179

180180
#[cfg(test)]
181181
mod tests {
182-
use std::sync::Arc;
183-
184-
use crate::writer::JsonArray;
185-
186182
use super::*;
187-
183+
use crate::writer::JsonArray;
184+
use crate::writer::LineDelimited;
188185
use arrow_array::{
189-
ArrayRef, GenericBinaryArray, GenericByteViewArray, RecordBatch, RecordBatchWriter,
190-
builder::FixedSizeBinaryBuilder, types::BinaryViewType,
186+
ArrayRef, GenericBinaryArray, GenericByteViewArray, GenericListViewArray, RecordBatch,
187+
RecordBatchWriter, builder::FixedSizeBinaryBuilder, types::BinaryViewType,
191188
};
189+
use arrow_schema::{DataType, Field, Fields, Schema};
192190
use serde_json::Value::{Bool, Number as VNumber, String as VString};
191+
use std::io::Cursor;
192+
use std::sync::Arc;
193193

194194
#[test]
195195
fn test_arrow_native_type_to_json() {
@@ -216,13 +216,6 @@ mod tests {
216216

217217
#[test]
218218
fn test_json_roundtrip_structs() {
219-
use crate::writer::LineDelimited;
220-
use arrow_schema::DataType;
221-
use arrow_schema::Field;
222-
use arrow_schema::Fields;
223-
use arrow_schema::Schema;
224-
use std::sync::Arc;
225-
226219
let schema = Arc::new(Schema::new(vec![
227220
Field::new(
228221
"c1",
@@ -352,4 +345,49 @@ mod tests {
352345

353346
assert_eq!(batch, decoded);
354347
}
348+
349+
fn assert_list_view_roundtrip<O: arrow_array::OffsetSizeTrait>() {
350+
let flat_field = Arc::new(Field::new("item", DataType::Int32, true));
351+
let flat_dt = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(flat_field);
352+
353+
let nested_inner = Arc::new(Field::new("item", DataType::Int32, false));
354+
let nested_inner_dt = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(nested_inner);
355+
let nested_outer = Arc::new(Field::new("item", nested_inner_dt, true));
356+
let nested_dt = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(nested_outer);
357+
358+
let schema = Arc::new(Schema::new(vec![
359+
Field::new("flat", flat_dt, true),
360+
Field::new("nested", nested_dt, true),
361+
]));
362+
363+
let input = r#"{"flat":[1,2,3],"nested":[[1,2],[3]]}
364+
{"flat":[4,null]}
365+
{}
366+
{"flat":[6],"nested":[[4,5,6]]}
367+
{"flat":[]}
368+
"#
369+
.as_bytes();
370+
371+
let batches: Vec<RecordBatch> = ReaderBuilder::new(schema.clone())
372+
.with_batch_size(1024)
373+
.build(Cursor::new(input))
374+
.unwrap()
375+
.collect::<Result<Vec<_>, _>>()
376+
.unwrap();
377+
378+
let mut output = Vec::new();
379+
let mut writer = WriterBuilder::new().build::<_, LineDelimited>(&mut output);
380+
for batch in &batches {
381+
writer.write(batch).unwrap();
382+
}
383+
writer.finish().unwrap();
384+
385+
assert_eq!(input, &output);
386+
}
387+
388+
#[test]
389+
fn test_json_roundtrip_list_view() {
390+
assert_list_view_roundtrip::<i32>();
391+
assert_list_view_roundtrip::<i64>();
392+
}
355393
}

arrow-json/src/reader/list_array.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,33 @@
1818
use crate::reader::tape::{Tape, TapeElement};
1919
use crate::reader::{ArrayDecoder, DecoderContext};
2020
use arrow_array::OffsetSizeTrait;
21-
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
22-
use arrow_buffer::buffer::NullBuffer;
21+
use arrow_array::builder::BooleanBufferBuilder;
22+
use arrow_buffer::{Buffer, buffer::NullBuffer};
2323
use arrow_data::{ArrayData, ArrayDataBuilder};
2424
use arrow_schema::{ArrowError, DataType};
2525
use std::marker::PhantomData;
2626

27-
pub struct ListArrayDecoder<O> {
27+
pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
28+
pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;
29+
30+
pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
2831
data_type: DataType,
2932
decoder: Box<dyn ArrayDecoder>,
3033
phantom: PhantomData<O>,
3134
is_nullable: bool,
3235
}
3336

34-
impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
37+
impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
3538
pub fn new(
3639
ctx: &DecoderContext,
3740
data_type: &DataType,
3841
is_nullable: bool,
3942
) -> Result<Self, ArrowError> {
40-
let field = match data_type {
41-
DataType::List(f) if !O::IS_LARGE => f,
42-
DataType::LargeList(f) if O::IS_LARGE => f,
43+
let field = match (IS_VIEW, data_type) {
44+
(false, DataType::List(f)) if !O::IS_LARGE => f,
45+
(false, DataType::LargeList(f)) if O::IS_LARGE => f,
46+
(true, DataType::ListView(f)) if !O::IS_LARGE => f,
47+
(true, DataType::LargeListView(f)) if O::IS_LARGE => f,
4348
_ => unreachable!(),
4449
};
4550
let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
@@ -53,11 +58,11 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
5358
}
5459
}
5560

56-
impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
61+
impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
5762
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
5863
let mut child_pos = Vec::with_capacity(pos.len());
59-
let mut offsets = BufferBuilder::<O>::new(pos.len() + 1);
60-
offsets.append(O::from_usize(0).unwrap());
64+
let mut offsets = Vec::with_capacity(pos.len() + 1);
65+
offsets.push(O::from_usize(0).unwrap());
6166

6267
let mut nulls = self
6368
.is_nullable
@@ -88,18 +93,30 @@ impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
8893
let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
8994
ArrowError::JsonError(format!("offset overflow decoding {}", self.data_type))
9095
})?;
91-
offsets.append(offset)
96+
offsets.push(offset);
9297
}
9398

9499
let child_data = self.decoder.decode(tape, &child_pos)?;
95100
let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
96101

97-
let data = ArrayDataBuilder::new(self.data_type.clone())
102+
let mut data = ArrayDataBuilder::new(self.data_type.clone())
98103
.len(pos.len())
99104
.nulls(nulls)
100-
.add_buffer(offsets.finish())
101105
.child_data(vec![child_data]);
102106

107+
if IS_VIEW {
108+
let mut sizes = Vec::with_capacity(offsets.len() - 1);
109+
for i in 1..offsets.len() {
110+
sizes.push(offsets[i] - offsets[i - 1]);
111+
}
112+
offsets.pop();
113+
data = data
114+
.add_buffer(Buffer::from_vec(offsets))
115+
.add_buffer(Buffer::from_vec(sizes));
116+
} else {
117+
data = data.add_buffer(Buffer::from_vec(offsets));
118+
}
119+
103120
// Safety
104121
// Validated lengths above
105122
Ok(unsafe { data.build_unchecked() })

arrow-json/src/reader/mod.rs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub use value_iter::ValueIter;
154154

155155
use crate::reader::boolean_array::BooleanArrayDecoder;
156156
use crate::reader::decimal_array::DecimalArrayDecoder;
157-
use crate::reader::list_array::ListArrayDecoder;
157+
use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder};
158158
use crate::reader::map_array::MapArrayDecoder;
159159
use crate::reader::null_array::NullArrayDecoder;
160160
use crate::reader::primitive_array::PrimitiveArrayDecoder;
@@ -792,6 +792,8 @@ fn make_decoder(
792792
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
793793
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
794794
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
795+
DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
796+
DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
795797
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
796798
DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
797799
DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
@@ -815,7 +817,10 @@ mod tests {
815817
use std::io::{BufReader, Cursor, Seek};
816818

817819
use arrow_array::cast::AsArray;
818-
use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
820+
use arrow_array::{
821+
Array, BooleanArray, Float64Array, GenericListViewArray, ListArray, OffsetSizeTrait,
822+
StringArray, StringViewArray,
823+
};
819824
use arrow_buffer::{ArrowNativeType, Buffer};
820825
use arrow_cast::display::{ArrayFormatter, FormatOptions};
821826
use arrow_data::ArrayDataBuilder;
@@ -2192,6 +2197,77 @@ mod tests {
21922197
assert_eq!(read, expected);
21932198
}
21942199

2200+
fn assert_read_list_view<O: OffsetSizeTrait>() {
2201+
let field = Arc::new(Field::new("item", DataType::Int32, true));
2202+
let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2203+
let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2204+
2205+
let buf = r#"
2206+
{"lv": [1, 2, 3]}
2207+
{"lv": [4, null]}
2208+
{"lv": null}
2209+
{"lv": [6]}
2210+
{"lv": []}
2211+
"#;
2212+
2213+
let batches = do_read(buf, 1024, false, false, schema);
2214+
assert_eq!(batches.len(), 1);
2215+
let batch = &batches[0];
2216+
let col = batch.column(0);
2217+
let list_view = col
2218+
.as_any()
2219+
.downcast_ref::<GenericListViewArray<O>>()
2220+
.unwrap();
2221+
2222+
assert_eq!(list_view.len(), 5);
2223+
2224+
// Check offsets and sizes
2225+
let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2226+
.into_iter()
2227+
.map(|v| O::usize_as(v))
2228+
.collect();
2229+
let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2230+
.into_iter()
2231+
.map(|v| O::usize_as(v))
2232+
.collect();
2233+
assert_eq!(list_view.value_offsets(), &expected_offsets);
2234+
assert_eq!(list_view.value_sizes(), &expected_sizes);
2235+
2236+
// Row 0: [1, 2, 3]
2237+
assert!(list_view.is_valid(0));
2238+
let vals = list_view.value(0);
2239+
let ints = vals.as_primitive::<Int32Type>();
2240+
assert_eq!(ints.values(), &[1, 2, 3]);
2241+
2242+
// Row 1: [4, null]
2243+
assert!(list_view.is_valid(1));
2244+
let vals = list_view.value(1);
2245+
let ints = vals.as_primitive::<Int32Type>();
2246+
assert_eq!(ints.len(), 2);
2247+
assert_eq!(ints.value(0), 4);
2248+
assert!(ints.is_null(1));
2249+
2250+
// Row 2: null
2251+
assert!(list_view.is_null(2));
2252+
2253+
// Row 3: [6]
2254+
assert!(list_view.is_valid(3));
2255+
let vals = list_view.value(3);
2256+
let ints = vals.as_primitive::<Int32Type>();
2257+
assert_eq!(ints.values(), &[6]);
2258+
2259+
// Row 4: []
2260+
assert!(list_view.is_valid(4));
2261+
let vals = list_view.value(4);
2262+
assert_eq!(vals.len(), 0);
2263+
}
2264+
2265+
#[test]
2266+
fn test_read_list_view() {
2267+
assert_read_list_view::<i32>();
2268+
assert_read_list_view::<i64>();
2269+
}
2270+
21952271
#[test]
21962272
fn test_skip_empty_lines() {
21972273
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);

0 commit comments

Comments
 (0)