Skip to content

Commit 568cceb

Browse files
committed
[Json] Use partition and take in RunEndEncoded decoder
1 parent e1e06b2 commit 568cceb

File tree

2 files changed

+31
-51
lines changed

2 files changed

+31
-51
lines changed

arrow-json/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ arrow-array = { workspace = true }
4040
arrow-buffer = { workspace = true }
4141
arrow-cast = { workspace = true }
4242
arrow-data = { workspace = true }
43+
arrow-ord = { workspace = true }
4344
arrow-schema = { workspace = true }
45+
arrow-select = { workspace = true }
4446
half = { version = "2.1", default-features = false }
4547
indexmap = { version = "2.0", default-features = false, features = ["std"] }
4648
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }

arrow-json/src/reader/run_end_array.rs

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
// under the License.
1717

1818
use std::marker::PhantomData;
19+
use std::ops::Range;
20+
use std::slice::from_ref;
21+
use std::sync::Arc;
1922

2023
use arrow_array::types::RunEndIndexType;
21-
use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array};
22-
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
23-
use arrow_data::transform::MutableArrayData;
24-
use arrow_data::{ArrayData, ArrayDataBuilder};
24+
use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array};
25+
use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer};
26+
use arrow_ord::partition::partition;
2527
use arrow_schema::{ArrowError, DataType};
28+
use arrow_select::take::take;
2629

2730
use crate::reader::tape::Tape;
2831
use crate::reader::{ArrayDecoder, DecoderContext};
@@ -63,58 +66,33 @@ impl<R: RunEndIndexType + Send> ArrayDecoder for RunEndEncodedArrayDecoder<R> {
6366
return Ok(new_empty_array(&self.data_type));
6467
}
6568

66-
let flat_data = self.decoder.decode(tape, pos)?.to_data();
69+
let flat_array = self.decoder.decode(tape, pos)?;
6770

68-
let mut run_ends: Vec<R::Native> = Vec::new();
69-
let mut mutable = MutableArrayData::new(vec![&flat_data], false, len);
71+
let partitions = partition(from_ref(&flat_array))?;
72+
let size = partitions.len();
73+
let mut run_ends = Vec::with_capacity(size);
74+
let mut indices = Vec::with_capacity(size);
7075

71-
let mut run_start = 0;
72-
for i in 1..len {
73-
if !same_run(&flat_data, run_start, i) {
74-
let run_end = R::Native::from_usize(i).ok_or_else(|| {
75-
ArrowError::JsonError(format!(
76-
"Run end value {i} exceeds {:?} range",
77-
R::DATA_TYPE
78-
))
79-
})?;
80-
run_ends.push(run_end);
81-
mutable.extend(0, run_start, run_start + 1);
82-
run_start = i;
83-
}
76+
for Range { start, end } in partitions.ranges() {
77+
let run_end = R::Native::from_usize(end).ok_or_else(|| {
78+
ArrowError::JsonError(format!(
79+
"Run end value {end} exceeds {:?} range",
80+
R::DATA_TYPE
81+
))
82+
})?;
83+
run_ends.push(run_end);
84+
indices.push(start);
8485
}
85-
let run_end = R::Native::from_usize(len).ok_or_else(|| {
86-
ArrowError::JsonError(format!(
87-
"Run end value {len} exceeds {:?} range",
88-
R::DATA_TYPE
89-
))
90-
})?;
91-
run_ends.push(run_end);
92-
mutable.extend(0, run_start, run_start + 1);
9386

94-
let values_data = mutable.freeze();
95-
let run_ends_data =
96-
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None).into_data();
87+
let indices = UInt32Array::from_iter_values(indices.into_iter().map(|i| i as u32));
88+
let values = take(flat_array.as_ref(), &indices, None)?;
9789

98-
let data = ArrayDataBuilder::new(self.data_type.clone())
99-
.len(len)
100-
.add_child_data(run_ends_data)
101-
.add_child_data(values_data);
90+
// SAFETY: run_ends are strictly increasing with the last value equal to len
91+
let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) };
10292

103-
// Safety:
104-
// run_ends are strictly increasing with the last value equal to len,
105-
// and values has the same length as run_ends
106-
Ok(make_array(unsafe { data.build_unchecked() }))
93+
// SAFETY: run_ends are valid and values has the same length as run_ends
94+
let array =
95+
unsafe { RunArray::<R>::new_unchecked(self.data_type.clone(), run_ends, values) };
96+
Ok(Arc::new(array))
10797
}
10898
}
109-
110-
fn same_run(data: &ArrayData, i: usize, j: usize) -> bool {
111-
let null_i = data.is_null(i);
112-
let null_j = data.is_null(j);
113-
if null_i != null_j {
114-
return false;
115-
}
116-
if null_i {
117-
return true;
118-
}
119-
data.slice(i, 1) == data.slice(j, 1)
120-
}

0 commit comments

Comments
 (0)