diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 851f0a244f5..2ab1af1fd09 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -39,8 +39,9 @@ all-features = true arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } -arrow-data = { workspace = true } +arrow-ord = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } half = { version = "2.1", default-features = false } indexmap = { version = "2.0", default-features = false, features = ["std"] } num-traits = { version = "0.2.19", default-features = false, features = ["std"] } @@ -54,6 +55,7 @@ ryu = "1.0" itoa = "1.0" [dev-dependencies] +arrow-data = { workspace = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } serde = { version = "1.0", default-features = false, features = ["derive"] } futures = "0.3" diff --git a/arrow-json/src/reader/run_end_array.rs b/arrow-json/src/reader/run_end_array.rs index 1a007ccbb63..26ade9c8ed7 100644 --- a/arrow-json/src/reader/run_end_array.rs +++ b/arrow-json/src/reader/run_end_array.rs @@ -16,13 +16,16 @@ // under the License. use std::marker::PhantomData; +use std::ops::Range; +use std::slice::from_ref; +use std::sync::Arc; use arrow_array::types::RunEndIndexType; -use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array}; -use arrow_buffer::{ArrowNativeType, ScalarBuffer}; -use arrow_data::transform::MutableArrayData; -use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array}; +use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer}; +use arrow_ord::partition::partition; use arrow_schema::{ArrowError, DataType}; +use arrow_select::take::take; use crate::reader::tape::Tape; use crate::reader::{ArrayDecoder, DecoderContext}; @@ -63,58 +66,33 @@ impl ArrayDecoder for RunEndEncodedArrayDecoder { return Ok(new_empty_array(&self.data_type)); } - let flat_data = self.decoder.decode(tape, pos)?.to_data(); + let flat_array = self.decoder.decode(tape, pos)?; - let mut run_ends: Vec = Vec::new(); - let mut mutable = MutableArrayData::new(vec![&flat_data], false, len); + let partitions = partition(from_ref(&flat_array))?; + let size = partitions.len(); + let mut run_ends = Vec::with_capacity(size); + let mut indices = Vec::with_capacity(size); - let mut run_start = 0; - for i in 1..len { - if !same_run(&flat_data, run_start, i) { - let run_end = R::Native::from_usize(i).ok_or_else(|| { - ArrowError::JsonError(format!( - "Run end value {i} exceeds {:?} range", - R::DATA_TYPE - )) - })?; - run_ends.push(run_end); - mutable.extend(0, run_start, run_start + 1); - run_start = i; - } + for Range { start, end } in partitions.ranges() { + let run_end = R::Native::from_usize(end).ok_or_else(|| { + ArrowError::JsonError(format!( + "Run end value {end} exceeds {:?} range", + R::DATA_TYPE + )) + })?; + run_ends.push(run_end); + indices.push(start); } - let run_end = R::Native::from_usize(len).ok_or_else(|| { - ArrowError::JsonError(format!( - "Run end value {len} exceeds {:?} range", - R::DATA_TYPE - )) - })?; - run_ends.push(run_end); - mutable.extend(0, run_start, run_start + 1); - let values_data = mutable.freeze(); - let run_ends_data = - PrimitiveArray::::new(ScalarBuffer::from(run_ends), None).into_data(); + let indices = UInt32Array::from_iter_values(indices.into_iter().map(|i| i as u32)); + let values = take(flat_array.as_ref(), &indices, None)?; - let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(len) - .add_child_data(run_ends_data) - .add_child_data(values_data); + // SAFETY: run_ends are strictly increasing with the last value equal to len + let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) }; - // Safety: - // run_ends are strictly increasing with the last value equal to len, - // and values has the same length as run_ends - Ok(make_array(unsafe { data.build_unchecked() })) + // SAFETY: run_ends are valid and values has the same length as run_ends + let array = + unsafe { RunArray::::new_unchecked(self.data_type.clone(), run_ends, values) }; + Ok(Arc::new(array)) } } - -fn same_run(data: &ArrayData, i: usize, j: usize) -> bool { - let null_i = data.is_null(i); - let null_j = data.is_null(j); - if null_i != null_j { - return false; - } - if null_i { - return true; - } - data.slice(i, 1) == data.slice(j, 1) -}