Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ all-features = true
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ord = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fairly non trivial crates (ord and select) so it is sad to see the dependencies being added here

That being said, I think it is becoming clear that anything involving REE benefits from those two

Maybe we could split them up or osmething into new crates with the "core" parts (specifically partition and take) 🤔

arrow-take and arrow-partition maybe 🤔

I'll file a ticket to consider this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow-schema = { workspace = true }
arrow-select = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "2.0", default-features = false, features = ["std"] }
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }
Expand All @@ -54,6 +55,7 @@ ryu = "1.0"
itoa = "1.0"

[dev-dependencies]
arrow-data = { workspace = true }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
futures = "0.3"
Expand Down
80 changes: 29 additions & 51 deletions arrow-json/src/reader/run_end_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
// under the License.

use std::marker::PhantomData;
use std::ops::Range;
use std::slice::from_ref;
use std::sync::Arc;

use arrow_array::types::RunEndIndexType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array};
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array};
use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer};
use arrow_ord::partition::partition;
use arrow_schema::{ArrowError, DataType};
use arrow_select::take::take;

use crate::reader::tape::Tape;
use crate::reader::{ArrayDecoder, DecoderContext};
Expand Down Expand Up @@ -63,58 +66,33 @@ impl<R: RunEndIndexType + Send> ArrayDecoder for RunEndEncodedArrayDecoder<R> {
return Ok(new_empty_array(&self.data_type));
}

let flat_data = self.decoder.decode(tape, pos)?.to_data();
let flat_array = self.decoder.decode(tape, pos)?;

let mut run_ends: Vec<R::Native> = Vec::new();
let mut mutable = MutableArrayData::new(vec![&flat_data], false, len);
let partitions = partition(from_ref(&flat_array))?;
let size = partitions.len();
let mut run_ends = Vec::with_capacity(size);
let mut indices = Vec::with_capacity(size);

let mut run_start = 0;
for i in 1..len {
if !same_run(&flat_data, run_start, i) {
let run_end = R::Native::from_usize(i).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {i} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);
run_start = i;
}
for Range { start, end } in partitions.ranges() {
let run_end = R::Native::from_usize(end).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {end} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
indices.push(start);
}
let run_end = R::Native::from_usize(len).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {len} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);

let values_data = mutable.freeze();
let run_ends_data =
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None).into_data();
let indices = UInt32Array::from_iter_values(indices.into_iter().map(|i| i as u32));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the old code could also handle usize indices not ust u32 but I think in practice it won't matter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, also indices are bounded by the tape pos (which is &[u32]), so they should never exceed u32::MAX IIUC

let values = take(flat_array.as_ref(), &indices, None)?;

let data = ArrayDataBuilder::new(self.data_type.clone())
.len(len)
.add_child_data(run_ends_data)
.add_child_data(values_data);
// SAFETY: run_ends are strictly increasing with the last value equal to len
let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) };

// Safety:
// run_ends are strictly increasing with the last value equal to len,
// and values has the same length as run_ends
Ok(make_array(unsafe { data.build_unchecked() }))
// SAFETY: run_ends are valid and values has the same length as run_ends
let array =
unsafe { RunArray::<R>::new_unchecked(self.data_type.clone(), run_ends, values) };
Ok(Arc::new(array))
}
}

fn same_run(data: &ArrayData, i: usize, j: usize) -> bool {
let null_i = data.is_null(i);
let null_j = data.is_null(j);
if null_i != null_j {
return false;
}
if null_i {
return true;
}
data.slice(i, 1) == data.slice(j, 1)
}
Loading