Skip to content

Commit 58a95fc

Browse files
committed
[Json] Support FixedSizeList in json decoder
1 parent 711fac8 commit 58a95fc

File tree

3 files changed

+292
-2
lines changed

3 files changed

+292
-2
lines changed

arrow-json/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,4 +390,46 @@ mod tests {
390390
assert_list_view_roundtrip::<i32>();
391391
assert_list_view_roundtrip::<i64>();
392392
}
393+
394+
#[test]
395+
fn test_json_roundtrip_fixed_size_list() {
396+
let inner = Arc::new(Field::new("item", DataType::Int32, true));
397+
let schema = Arc::new(Schema::new(vec![
398+
Field::new("flat", DataType::FixedSizeList(inner.clone(), 3), true),
399+
Field::new(
400+
"nested",
401+
DataType::FixedSizeList(
402+
Arc::new(Field::new(
403+
"item",
404+
DataType::FixedSizeList(inner, 2),
405+
true,
406+
)),
407+
2,
408+
),
409+
true,
410+
),
411+
]));
412+
413+
let input = r#"{"flat":[1,2,3],"nested":[[1,2],[3,4]]}
414+
{"flat":[4,null,5]}
415+
{"flat":[6,7,8],"nested":[[null,5],[6,null]]}
416+
"#
417+
.as_bytes();
418+
419+
let batches: Vec<RecordBatch> = ReaderBuilder::new(schema.clone())
420+
.with_batch_size(1024)
421+
.build(Cursor::new(input))
422+
.unwrap()
423+
.collect::<Result<Vec<_>, _>>()
424+
.unwrap();
425+
426+
let mut output = Vec::new();
427+
let mut writer = WriterBuilder::new().build::<_, LineDelimited>(&mut output);
428+
for batch in &batches {
429+
writer.write(batch).unwrap();
430+
}
431+
writer.finish().unwrap();
432+
433+
assert_eq!(input, &output);
434+
}
393435
}

arrow-json/src/reader/list_array.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::marker::PhantomData;
1919
use std::sync::Arc;
2020

2121
use arrow_array::builder::BooleanBufferBuilder;
22-
use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait, make_array};
22+
use arrow_array::{ArrayRef, FixedSizeListArray, GenericListArray, OffsetSizeTrait, make_array};
2323
use arrow_buffer::buffer::NullBuffer;
2424
use arrow_buffer::{Buffer, OffsetBuffer, ScalarBuffer};
2525
use arrow_data::ArrayDataBuilder;
@@ -140,3 +140,86 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDeco
140140
}
141141
}
142142
}
143+
144+
pub struct FixedSizeListArrayDecoder {
145+
field: FieldRef,
146+
size: i32,
147+
decoder: Box<dyn ArrayDecoder>,
148+
ignore_type_conflicts: bool,
149+
is_nullable: bool,
150+
}
151+
152+
impl FixedSizeListArrayDecoder {
153+
pub fn new(
154+
ctx: &DecoderContext,
155+
data_type: &DataType,
156+
is_nullable: bool,
157+
) -> Result<Self, ArrowError> {
158+
let (field, size) = match data_type {
159+
DataType::FixedSizeList(f, s) => (f, *s),
160+
_ => unreachable!(),
161+
};
162+
let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
163+
164+
Ok(Self {
165+
field: field.clone(),
166+
size,
167+
decoder,
168+
ignore_type_conflicts: ctx.ignore_type_conflicts(),
169+
is_nullable,
170+
})
171+
}
172+
}
173+
174+
impl ArrayDecoder for FixedSizeListArrayDecoder {
175+
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
176+
let expected = self.size as usize;
177+
let mut child_pos = Vec::with_capacity(pos.len() * expected);
178+
179+
let mut nulls = self
180+
.is_nullable
181+
.then(|| BooleanBufferBuilder::new(pos.len()));
182+
183+
for p in pos {
184+
let end_idx = match (tape.get(*p), nulls.as_mut()) {
185+
(TapeElement::StartList(end_idx), None) => end_idx,
186+
(TapeElement::StartList(end_idx), Some(nulls)) => {
187+
nulls.append(true);
188+
end_idx
189+
}
190+
(TapeElement::Null, Some(nulls)) => {
191+
nulls.append(false);
192+
child_pos.resize(child_pos.len() + expected, 0);
193+
continue;
194+
}
195+
(_, Some(nulls)) if self.ignore_type_conflicts => {
196+
nulls.append(false);
197+
child_pos.resize(child_pos.len() + expected, 0);
198+
continue;
199+
}
200+
_ => return Err(tape.error(*p, "[")),
201+
};
202+
203+
let child_start = child_pos.len();
204+
let mut cur_idx = *p + 1;
205+
while cur_idx < end_idx {
206+
child_pos.push(cur_idx);
207+
cur_idx = tape.next(cur_idx, "fixed-size list value")?;
208+
}
209+
210+
let actual = child_pos.len() - child_start;
211+
if actual != expected {
212+
return Err(ArrowError::JsonError(format!(
213+
"Incorrect number of elements for FixedSizeList, \
214+
expected {expected} but got {actual}"
215+
)));
216+
}
217+
}
218+
219+
let values = self.decoder.decode(tape, &child_pos)?;
220+
let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
221+
222+
let array = FixedSizeListArray::try_new(self.field.clone(), self.size, values, nulls)?;
223+
Ok(Arc::new(array))
224+
}
225+
}

arrow-json/src/reader/mod.rs

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ use crate::reader::binary_array::{
151151
};
152152
use crate::reader::boolean_array::BooleanArrayDecoder;
153153
use crate::reader::decimal_array::DecimalArrayDecoder;
154-
use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder};
154+
use crate::reader::list_array::{
155+
FixedSizeListArrayDecoder, ListArrayDecoder, ListViewArrayDecoder,
156+
};
155157
use crate::reader::map_array::MapArrayDecoder;
156158
use crate::reader::null_array::NullArrayDecoder;
157159
use crate::reader::primitive_array::PrimitiveArrayDecoder;
@@ -835,6 +837,7 @@ fn make_decoder(
835837
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
836838
DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837839
DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
840+
DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArrayDecoder::new(ctx, data_type, is_nullable)?)),
838841
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
839842
DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
840843
DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
@@ -2308,6 +2311,152 @@ mod tests {
23082311
assert_read_list_view::<i64>();
23092312
}
23102313

2314+
#[test]
2315+
fn test_fixed_size_list() {
2316+
let buf = r#"
2317+
{"a": [1, 2, 3]}
2318+
{"a": [4, 5, 6]}
2319+
{"a": [7, 8, 9]}
2320+
"#;
2321+
2322+
let field = Field::new_list_field(DataType::Int32, true);
2323+
let schema = Arc::new(Schema::new(vec![Field::new(
2324+
"a",
2325+
DataType::FixedSizeList(Arc::new(field), 3),
2326+
false,
2327+
)]));
2328+
2329+
let batches = do_read(buf, 1024, false, false, schema);
2330+
assert_eq!(batches.len(), 1);
2331+
2332+
let col = batches[0].column(0).as_fixed_size_list();
2333+
assert_eq!(col.len(), 3);
2334+
assert_eq!(col.value_length(), 3);
2335+
2336+
let values = col.values().as_primitive::<Int32Type>();
2337+
assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
2338+
}
2339+
2340+
#[test]
2341+
fn test_fixed_size_list_nullable() {
2342+
let buf = r#"
2343+
{"a": [1, 2]}
2344+
{"a": null}
2345+
{"a": [3, null]}
2346+
"#;
2347+
2348+
let field = Field::new_list_field(DataType::Int32, true);
2349+
let schema = Arc::new(Schema::new(vec![Field::new(
2350+
"a",
2351+
DataType::FixedSizeList(Arc::new(field), 2),
2352+
true,
2353+
)]));
2354+
2355+
let batches = do_read(buf, 1024, false, false, schema);
2356+
assert_eq!(batches.len(), 1);
2357+
2358+
let col = batches[0].column(0).as_fixed_size_list();
2359+
assert_eq!(col.len(), 3);
2360+
assert!(col.is_valid(0));
2361+
assert!(col.is_null(1));
2362+
assert!(col.is_valid(2));
2363+
2364+
let values = col.values().as_primitive::<Int32Type>();
2365+
assert_eq!(values.value(0), 1);
2366+
assert_eq!(values.value(1), 2);
2367+
assert_eq!(values.value(4), 3);
2368+
assert!(values.is_null(5));
2369+
}
2370+
2371+
#[test]
2372+
fn test_fixed_size_list_wrong_size() {
2373+
let buf = r#"{"a": [1, 2, 3]}"#;
2374+
2375+
let field = Field::new_list_field(DataType::Int32, true);
2376+
let schema = Arc::new(Schema::new(vec![Field::new(
2377+
"a",
2378+
DataType::FixedSizeList(Arc::new(field), 2),
2379+
false,
2380+
)]));
2381+
2382+
let err = ReaderBuilder::new(schema)
2383+
.build(Cursor::new(buf.as_bytes()))
2384+
.unwrap()
2385+
.next()
2386+
.unwrap()
2387+
.unwrap_err();
2388+
2389+
assert!(err.to_string().contains("expected 2 but got 3"), "{}", err);
2390+
}
2391+
2392+
#[test]
2393+
fn test_fixed_size_list_nested() {
2394+
let buf = r#"
2395+
{"a": [[1, 2], [3, 4]]}
2396+
{"a": [[5, 6], [7, 8]]}
2397+
"#;
2398+
2399+
let inner_field = Field::new_list_field(DataType::Int32, true);
2400+
let inner_type = DataType::FixedSizeList(Arc::new(inner_field), 2);
2401+
let outer_field = Arc::new(Field::new_list_field(inner_type.clone(), true));
2402+
let schema = Arc::new(Schema::new(vec![Field::new(
2403+
"a",
2404+
DataType::FixedSizeList(outer_field, 2),
2405+
false,
2406+
)]));
2407+
2408+
let batches = do_read(buf, 1024, false, false, schema);
2409+
assert_eq!(batches.len(), 1);
2410+
2411+
let col = batches[0].column(0).as_fixed_size_list();
2412+
assert_eq!(col.len(), 2);
2413+
assert_eq!(col.value_length(), 2);
2414+
2415+
let inner = col.values().as_fixed_size_list();
2416+
assert_eq!(inner.len(), 4);
2417+
assert_eq!(inner.value_length(), 2);
2418+
2419+
let values = inner.values().as_primitive::<Int32Type>();
2420+
assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8]);
2421+
}
2422+
2423+
#[test]
2424+
fn test_fixed_size_list_ignore_type_conflicts() {
2425+
let field = Field::new("item", DataType::Int32, true);
2426+
let schema = Arc::new(Schema::new(vec![Field::new(
2427+
"a",
2428+
DataType::FixedSizeList(Arc::new(field), 2),
2429+
true,
2430+
)]));
2431+
2432+
let json = vec![
2433+
json!({"a": [1, 2]}),
2434+
json!({"a": "not a list"}),
2435+
json!({"a": 42}),
2436+
json!({"a": [6, 7]}),
2437+
];
2438+
2439+
let mut decoder = ReaderBuilder::new(schema)
2440+
.with_ignore_type_conflicts(true)
2441+
.build_decoder()
2442+
.unwrap();
2443+
decoder.serialize(&json).unwrap();
2444+
let batch = decoder.flush().unwrap().unwrap();
2445+
2446+
let col = batch.column(0).as_fixed_size_list();
2447+
assert_eq!(col.len(), 4);
2448+
assert!(col.is_valid(0));
2449+
assert!(col.is_null(1)); // string -> null
2450+
assert!(col.is_null(2)); // number -> null
2451+
assert!(col.is_valid(3));
2452+
2453+
let values = col.values().as_primitive::<Int32Type>();
2454+
assert_eq!(values.value(0), 1);
2455+
assert_eq!(values.value(1), 2);
2456+
assert_eq!(values.value(6), 6);
2457+
assert_eq!(values.value(7), 7);
2458+
}
2459+
23112460
#[test]
23122461
fn test_skip_empty_lines() {
23132462
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
@@ -3256,6 +3405,14 @@ mod tests {
32563405
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
32573406
false,
32583407
),
3408+
Field::new(
3409+
"fixed_size_list",
3410+
DataType::FixedSizeList(
3411+
Arc::new(Field::new("item", DataType::Int32, true)),
3412+
2,
3413+
),
3414+
false,
3415+
),
32593416
Field::new(
32603417
"map",
32613418
DataType::Map(
@@ -3312,6 +3469,14 @@ mod tests {
33123469
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
33133470
true,
33143471
),
3472+
Field::new(
3473+
"fixed_size_list",
3474+
DataType::FixedSizeList(
3475+
Arc::new(Field::new("item", DataType::Int32, true)),
3476+
2,
3477+
),
3478+
true,
3479+
),
33153480
Field::new(
33163481
"map",
33173482
DataType::Map(

0 commit comments

Comments
 (0)