Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ord = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fairly non trivial crates (ord and select) so it is sad to see the dependencies being added here

That being said, I think it is becoming clear that anything involving REE benefits from those two

Maybe we could split them up or osmething into new crates with the "core" parts (specifically partition and take) 🤔

arrow-take and arrow-partition maybe 🤔

I'll file a ticket to consider this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that arrow-cast already brings in ord & select in it's dependencies, so I don't think this actually affects the build too much (compared to previous state)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that arrow-cast already brings in ord & select in it's dependencies, so I don't think this actually affects the build too much (compared to previous state)

yeah, adding REE to cast required those new dependencies. We have some ideas of how to avoid it but they aren't great

arrow-schema = { workspace = true }
arrow-select = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "2.0", default-features = false, features = ["std"] }
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }
Expand Down
80 changes: 29 additions & 51 deletions arrow-json/src/reader/run_end_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
// under the License.

use std::marker::PhantomData;
use std::ops::Range;
use std::slice::from_ref;
use std::sync::Arc;

use arrow_array::types::RunEndIndexType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array};
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array};
use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer};
use arrow_ord::partition::partition;
use arrow_schema::{ArrowError, DataType};
use arrow_select::take::take;

use crate::reader::tape::Tape;
use crate::reader::{ArrayDecoder, DecoderContext};
Expand Down Expand Up @@ -63,58 +66,33 @@ impl<R: RunEndIndexType + Send> ArrayDecoder for RunEndEncodedArrayDecoder<R> {
return Ok(new_empty_array(&self.data_type));
}

let flat_data = self.decoder.decode(tape, pos)?.to_data();
let flat_array = self.decoder.decode(tape, pos)?;

let mut run_ends: Vec<R::Native> = Vec::new();
let mut mutable = MutableArrayData::new(vec![&flat_data], false, len);
let partitions = partition(from_ref(&flat_array))?;
let size = partitions.len();
let mut run_ends = Vec::with_capacity(size);
let mut indices = Vec::with_capacity(size);

let mut run_start = 0;
for i in 1..len {
if !same_run(&flat_data, run_start, i) {
let run_end = R::Native::from_usize(i).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {i} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);
run_start = i;
}
for Range { start, end } in partitions.ranges() {
let run_end = R::Native::from_usize(end).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {end} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
indices.push(start);
}
let run_end = R::Native::from_usize(len).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {len} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);

let values_data = mutable.freeze();
let run_ends_data =
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None).into_data();
let indices = UInt32Array::from_iter_values(indices.into_iter().map(|i| i as u32));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the old code could also handle usize indices not ust u32 but I think in practice it won't matter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, also indices are bounded by the tape pos (which is &[u32]), so they should never exceed u32::MAX IIUC

let values = take(flat_array.as_ref(), &indices, None)?;

let data = ArrayDataBuilder::new(self.data_type.clone())
.len(len)
.add_child_data(run_ends_data)
.add_child_data(values_data);
// SAFETY: run_ends are strictly increasing with the last value equal to len
let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) };

// Safety:
// run_ends are strictly increasing with the last value equal to len,
// and values has the same length as run_ends
Ok(make_array(unsafe { data.build_unchecked() }))
// SAFETY: run_ends are valid and values has the same length as run_ends
let array =
unsafe { RunArray::<R>::new_unchecked(self.data_type.clone(), run_ends, values) };
Ok(Arc::new(array))
}
}

fn same_run(data: &ArrayData, i: usize, j: usize) -> bool {
let null_i = data.is_null(i);
let null_j = data.is_null(j);
if null_i != null_j {
return false;
}
if null_i {
return true;
}
data.slice(i, 1) == data.slice(j, 1)
}
Loading