Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ impl<R: RunEndIndexType> RunArray<R> {
&self.values
}

/// Similar to [`values`] but accounts for logical slicing, returning only the values
/// that are part of the logical slice of this array.
///
/// [`values`]: Self::values
pub fn values_slice(&self) -> ArrayRef {
let start = self.get_start_physical_index();
let end = self.get_end_physical_index();
self.values.slice(start, end - start + 1)
}

/// Returns the physical index at which the array slice starts.
///
/// See [`RunEndBuffer::get_start_physical_index`].
Expand Down Expand Up @@ -1132,4 +1142,35 @@ mod tests {

assert_eq!(array_i16_1, array_i16_2);
}

#[test]
fn test_run_array_values_slice() {
// 0, 0, 1, 1, 1, 2...2 (15 2s)
let run_ends: PrimitiveArray<Int32Type> = vec![2, 5, 20].into();
let values: PrimitiveArray<Int32Type> = vec![0, 1, 2].into();
let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();

let slice = array.slice(1, 4); // 0 | 1, 1, 1 |
// logical indices: 1, 2, 3, 4
// physical indices: 0, 1, 1, 1
// values at 0 is 0
// values at 1 is 1
// values slice should be [0, 1]
assert_eq!(slice.get_start_physical_index(), 0);
assert_eq!(slice.get_end_physical_index(), 1);

let values_slice = slice.values_slice();
let values_slice = values_slice.as_primitive::<Int32Type>();
assert_eq!(values_slice.values(), &[0, 1]);

let slice2 = array.slice(2, 3); // 1, 1, 1
// logical indices: 2, 3, 4
// physical indices: 1, 1, 1
assert_eq!(slice2.get_start_physical_index(), 1);
assert_eq!(slice2.get_end_physical_index(), 1);

let values_slice2 = slice2.values_slice();
let values_slice2 = values_slice2.as_primitive::<Int32Type>();
assert_eq!(values_slice2.values(), &[1]);
}
}
40 changes: 40 additions & 0 deletions arrow-buffer/src/buffer/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ where
&self.run_ends
}

/// Returns an iterator yielding run ends adjusted for the logical slice.
///
/// Each yielded value is subtracted by the [`logical_offset`] and capped
/// at the [`logical_length`].
///
/// [`logical_offset`]: Self::offset
/// [`logical_length`]: Self::len
pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
let offset = self.logical_offset;
let len = self.logical_length;
let start = self.get_start_physical_index();
let end = self.get_end_physical_index();
self.run_ends[start..=end].iter().map(move |&val| {
let val = val.as_usize().saturating_sub(offset).min(len);
E::from_usize(val).unwrap()
})
}

/// Returns the maximum run-end encoded in the underlying buffer; that is, the
/// last physical run of the buffer. This does not take into account any logical
/// slicing that may have occurred.
Expand Down Expand Up @@ -368,4 +386,26 @@ mod tests {
assert_eq!(buffer.get_start_physical_index(), 0);
assert_eq!(buffer.get_end_physical_index(), 0);
}

#[test]
fn test_sliced_values() {
// [0, 0, 1, 2, 2, 2]
let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);

// Slice: [0, 1, 2, 2] start: 1, len: 4
// Logical indices: 1, 2, 3, 4
// Original run ends: [2, 3, 6]
// Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4]
let sliced = buffer.slice(1, 4);
let sliced_values: Vec<i32> = sliced.sliced_values().collect();
assert_eq!(sliced_values, &[1, 2, 4]);

// Slice: [2, 2] start: 4, len: 2
// Original run ends: [2, 3, 6]
// Slicing at 4 means we only have the last run (physical index 2, which ends at 6)
// Adjusted: [6-4] capped at 2 -> [2]
let sliced = buffer.slice(4, 2);
let sliced_values: Vec<i32> = sliced.sliced_values().collect();
assert_eq!(sliced_values, &[2]);
}
}
3 changes: 0 additions & 3 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11746,9 +11746,6 @@ mod tests {
}

#[test]
#[should_panic = "assertion `left == right` failed\n left: ScalarBuffer([1, 1, 2])\n right: [2, 2, 3]"]
// TODO: fix cast of RunArrays to account for sliced RunArray's
// https://github.com/apache/arrow-rs/issues/9018
fn test_sliced_run_end_encoded_to_primitive() {
let run_ends = Int32Array::from(vec![2, 5, 6]);
let values = Int32Array::from(vec![1, 2, 3]);
Expand Down
16 changes: 10 additions & 6 deletions arrow-cast/src/cast/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,17 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(

// Expand to logical form
_ => {
let run_ends = run_array.run_ends().values().to_vec();
let mut indices = Vec::with_capacity(run_array.run_ends().len());
let mut physical_idx: usize = 0;
for logical_idx in 0..run_array.run_ends().len() {
// If the logical index is equal to the (next) run end, increment the physical index,
// since we are at the end of a run.
let len = run_array.len();
let offset = run_array.offset();
let run_ends = run_array.run_ends().values();

let mut indices = Vec::with_capacity(len);
let mut physical_idx = run_array.get_start_physical_index();

for logical_idx in offset..offset + len {
if logical_idx == run_ends[physical_idx].as_usize() {
// If the logical index is equal to the (next) run end, increment the physical index,
// since we are at the end of a run.
physical_idx += 1;
}
indices.push(physical_idx as i32);
Expand Down
2 changes: 1 addition & 1 deletion arrow-data/src/transform/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub fn build_extend(array: &ArrayData) -> Extend<'_> {
let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
source_buffer,
source_run_ends.len(),
start,
start + array.offset(),
len,
dest_last_run_end,
);
Expand Down
47 changes: 34 additions & 13 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ where
run_arrays
.iter()
.scan(R::default_value(), |acc, run_array| {
*acc = *acc + *run_array.run_ends().values().last().unwrap();
*acc = *acc + R::Native::from_usize(run_array.len()).unwrap();
Some(*acc)
}),
)
Expand All @@ -379,18 +379,17 @@ where
let adjustment = needed_run_end_adjustments[i];
run_array
.run_ends()
.values()
.iter()
.map(move |run_end| *run_end + adjustment)
.sliced_values()
.map(move |run_end| run_end + adjustment)
},
));

let all_values = concat(
&run_arrays
.iter()
.map(|x| x.values().as_ref())
.collect::<Vec<_>>(),
)?;
let values_slices: Vec<ArrayRef> = run_arrays
.iter()
.map(|run_array| run_array.values_slice())
.collect();

let all_values = concat(&values_slices.iter().map(|x| x.as_ref()).collect::<Vec<_>>())?;

let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
.len(total_len)
Expand Down Expand Up @@ -1716,9 +1715,6 @@ mod tests {
}

#[test]
#[should_panic = "assertion `left == right` failed\n left: [20, 20, 40, 40, 40]\n right: [10, 10, 20, 20, 30, 40, 40, 40]"]
// TODO: fix concat of RunArrays to account for sliced RunArray's
// https://github.com/apache/arrow-rs/issues/9018
fn test_concat_sliced_run_array() {
// Slicing away first run in both arrays
let run_ends1 = Int32Array::from(vec![2, 4]);
Expand Down Expand Up @@ -1879,4 +1875,29 @@ mod tests {
assert_eq!(values.len(), 6);
assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
}

#[test]
fn test_concat_run_array_with_truncated_run() {
// Create a run array with run ends [2, 5] and values [10, 20]
// Logical: [10, 10, 20, 20, 20]
let run_ends1 = Int32Array::from(vec![2, 5]);
let values1 = Int32Array::from(vec![10, 20]);
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
let array1_sliced = array1.slice(0, 3);

let run_ends2 = Int32Array::from(vec![2]);
let values2 = Int32Array::from(vec![30]);
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();

let result = concat(&[&array1_sliced, &array2]).unwrap();
let result_run_array = result.as_run::<Int32Type>();

// Result should be [10, 10, 20, 30, 30]
// Run ends should be [2, 3, 5]
assert_eq!(result_run_array.len(), 5);
let run_ends = result_run_array.run_ends().values();
let values = result_run_array.values().as_primitive::<Int32Type>();
assert_eq!(values.values(), &[10, 20, 30]);
assert_eq!(&[2, 3, 5], run_ends);
}
}
18 changes: 10 additions & 8 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,17 +487,22 @@ where
R::Native: AddAssign,
{
let run_ends: &RunEndBuffer<R::Native> = array.run_ends();
let mut new_run_ends = vec![R::default_value(); run_ends.len()];
let start_physical = run_ends.get_start_physical_index();
let end_physical = run_ends.get_end_physical_index();
let physical_len = end_physical - start_physical + 1;

let mut new_run_ends = vec![R::default_value(); physical_len];
let offset = run_ends.offset() as u64;

let mut start = 0u64;
let mut j = 0;
let mut count = R::default_value();
let filter_values = predicate.filter.values();
let run_ends = run_ends.inner();

let pred: BooleanArray = BooleanBuffer::collect_bool(run_ends.len(), |i| {
let pred: BooleanArray = BooleanBuffer::collect_bool(physical_len, |i| {
let mut keep = false;
let mut end = run_ends[i].into() as u64;
let mut end = (run_ends[i + start_physical].into() as u64).saturating_sub(offset);
let difference = end.saturating_sub(filter_values.len() as u64);
end -= difference;

Expand All @@ -517,8 +522,8 @@ where

new_run_ends.truncate(j);

let values = array.values();
let values = filter(&values, &pred)?;
let values = array.values_slice();
let values = filter(values.as_ref(), &pred)?;

let run_ends = PrimitiveArray::<R>::try_new(new_run_ends.into(), None)?;
RunArray::try_new(&run_ends, &values)
Expand Down Expand Up @@ -1355,9 +1360,6 @@ mod tests {
}

#[test]
#[should_panic = "assertion `left == right` failed\n left: [-2, 9]\n right: [7, -2]"]
// TODO: fix filter of RunArrays to account for sliced RunArray's
// https://github.com/apache/arrow-rs/issues/9018
fn test_filter_run_end_encoding_array_sliced() {
let run_ends = Int64Array::from(vec![2, 3, 8]);
let values = Int64Array::from(vec![7, -2, 9]);
Expand Down
5 changes: 1 addition & 4 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,9 +1172,6 @@ mod tests {
}

#[test]
#[should_panic = "assertion `left == right` failed\n left: [1, 4, 2, 5, 6]\n right: [2, 5, 2, 5, 6]"]
// TODO: fix interleave of RunArrays to account for sliced RunArray's
// https://github.com/apache/arrow-rs/issues/9018
fn test_interleave_run_end_encoded_sliced() {
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
Expand All @@ -1186,7 +1183,7 @@ mod tests {
let b = builder.finish();
let b = b.slice(1, 3); // [5, 5, 6]

let indices = &[(0, 1), (1, 0), (0, 3), (1, 2), (1, 3)];
let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
let result = interleave(&[&a, &b], indices).unwrap();

let result = result.as_run::<Int32Type>();
Expand Down
Loading