Skip to content

Commit 57e6a3c

Browse files
committed
fix:[9018]Fixed RunArray slice offsets
1 parent 814ee42 commit 57e6a3c

File tree

8 files changed

+216
-36
lines changed

8 files changed

+216
-36
lines changed

arrow-array/src/array/run_array.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ impl<R: RunEndIndexType> RunArray<R> {
136136
&self.values
137137
}
138138

139+
/// Similar to [`values`] but accounts for logical slicing, returning only the values
140+
/// that are part of the logical slice of this array.
141+
///
142+
/// [`values`]: Self::values
143+
pub fn values_slice(&self) -> ArrayRef {
144+
let start = self.get_start_physical_index();
145+
let end = self.get_end_physical_index();
146+
self.values.slice(start, end - start + 1)
147+
}
148+
139149
/// Returns the physical index at which the array slice starts.
140150
///
141151
/// See [`RunEndBuffer::get_start_physical_index`].
@@ -1128,4 +1138,35 @@ mod tests {
11281138

11291139
assert_eq!(array_i16_1, array_i16_2);
11301140
}
1141+
1142+
#[test]
1143+
fn test_run_array_values_slice() {
1144+
// 0, 0, 1, 1, 1, 2...2 (15 2s)
1145+
let run_ends: PrimitiveArray<Int32Type> = vec![2, 5, 20].into();
1146+
let values: PrimitiveArray<Int32Type> = vec![0, 1, 2].into();
1147+
let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
1148+
1149+
let slice = array.slice(1, 4); // 0 | 1, 1, 1 |
1150+
// logical indices: 1, 2, 3, 4
1151+
// physical indices: 0, 1, 1, 1
1152+
// values at 0 is 0
1153+
// values at 1 is 1
1154+
// values slice should be [0, 1]
1155+
assert_eq!(slice.get_start_physical_index(), 0);
1156+
assert_eq!(slice.get_end_physical_index(), 1);
1157+
1158+
let values_slice = slice.values_slice();
1159+
let values_slice = values_slice.as_primitive::<Int32Type>();
1160+
assert_eq!(values_slice.values(), &[0, 1]);
1161+
1162+
let slice2 = array.slice(2, 3); // 1, 1, 1
1163+
// logical indices: 2, 3, 4
1164+
// physical indices: 1, 1, 1
1165+
assert_eq!(slice2.get_start_physical_index(), 1);
1166+
assert_eq!(slice2.get_end_physical_index(), 1);
1167+
1168+
let values_slice2 = slice2.values_slice();
1169+
let values_slice2 = values_slice2.as_primitive::<Int32Type>();
1170+
assert_eq!(values_slice2.values(), &[1]);
1171+
}
11311172
}

arrow-buffer/src/buffer/run.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,39 @@ where
189189
&self.run_ends
190190
}
191191

192+
/// Similar to [`values`] but accounts for logical slicing, returning only the values
193+
/// that are part of the logical slice of this buffer.
194+
///
195+
/// > [!WARNING]
196+
/// > The returned values are the raw physical run ends and do not account for logical
197+
/// > truncation at the slice boundaries. For example, if a run of length 10 is sliced
198+
/// > to only include the last 3 elements, the returned run end will still be the original
199+
/// > physical end. Use [`logical_run_ends`] for un-truncated, relative run ends.
200+
///
201+
/// [`values`]: Self::values
202+
/// [`logical_run_ends`]: Self::logical_run_ends
203+
pub fn values_slice(&self) -> &[E] {
204+
let start = self.get_start_physical_index();
205+
let end = self.get_end_physical_index();
206+
&self.run_ends[start..=end]
207+
}
208+
209+
/// Returns an iterator yielding run ends adjusted for the logical slice.
210+
///
211+
/// Each yielded value is subtracted by the [`logical_offset`] and capped
212+
/// at the [`logical_length`].
213+
///
214+
/// [`logical_offset`]: Self::offset
215+
/// [`logical_length`]: Self::len
216+
pub fn logical_run_ends(&self) -> impl Iterator<Item = E> + '_ {
217+
let offset = self.logical_offset;
218+
let len = self.logical_length;
219+
self.values_slice().iter().map(move |&val| {
220+
let val = val.as_usize().saturating_sub(offset).min(len);
221+
E::from_usize(val).unwrap()
222+
})
223+
}
224+
192225
/// Returns the maximum run-end encoded in the underlying buffer; that is, the
193226
/// last physical run of the buffer. This does not take into account any logical
194227
/// slicing that may have occurred.
@@ -368,4 +401,65 @@ mod tests {
368401
assert_eq!(buffer.get_start_physical_index(), 0);
369402
assert_eq!(buffer.get_end_physical_index(), 0);
370403
}
404+
#[test]
405+
fn test_sliced_buffer_access() {
406+
// [0, 0, 1, 2, 2, 2]
407+
let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);
408+
409+
// Slice: [0, 1, 2, 2] start: 1, len: 4
410+
// Logical indices: 1, 2, 3, 4
411+
// Physical indices: 0, 1, 2, 2
412+
let sliced = buffer.slice(1, 4);
413+
assert_eq!(sliced.get_start_physical_index(), 0);
414+
assert_eq!(sliced.get_end_physical_index(), 2);
415+
416+
assert_eq!(sliced.values_slice(), &[2, 3, 6]);
417+
418+
// Slice: [2, 2] start: 3, len: 2 (relative to original)
419+
// Original indices: 4, 5
420+
// Physical indices: 2, 2
421+
let sliced2 = buffer.slice(4, 2);
422+
assert_eq!(sliced2.get_start_physical_index(), 2);
423+
assert_eq!(sliced2.get_end_physical_index(), 2);
424+
assert_eq!(sliced2.values_slice(), &[6]);
425+
426+
// Test with offset slice
427+
// buffer: [2, 3, 6] (logical length 6)
428+
// slice(1, 4) -> [0, 1, 2, 2] (logical length 4). Offset 1.
429+
// Indices: 0 (orig 1) -> phys 0
430+
// Indices: 1 (orig 2) -> phys 1
431+
// Indices: 2 (orig 3) -> phys 2
432+
// Indices: 3 (orig 4) -> phys 2
433+
434+
// slice(3, 1) on sliced buffer.
435+
// indices 3 (orig 4). phys 2.
436+
let sliced3 = sliced.slice(3, 1);
437+
assert_eq!(sliced3.get_start_physical_index(), 2);
438+
assert_eq!(sliced3.get_end_physical_index(), 2);
439+
assert_eq!(sliced3.values_slice(), &[6]);
440+
}
441+
442+
#[test]
443+
fn test_logical_run_ends() {
444+
// [0, 0, 1, 2, 2, 2]
445+
let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);
446+
447+
// Slice: [0, 1, 2, 2] start: 1, len: 4
448+
// Logical indices: 1, 2, 3, 4
449+
// Original run ends: [2, 3, 6]
450+
// Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4]
451+
let sliced = buffer.slice(1, 4);
452+
let logical_run_ends: Vec<i32> = sliced.logical_run_ends().collect();
453+
assert_eq!(logical_run_ends, &[1, 2, 4]);
454+
assert_eq!(sliced.values_slice(), &[2, 3, 6]); //This shows how values_slice are different than logical run ends
455+
456+
// Slice: [2, 2] start: 4, len: 2
457+
// Original run ends: [2, 3, 6]
458+
// Slicing at 4 means we only have the last run (physical index 2, which ends at 6)
459+
// Adjusted: [6-4] capped at 2 -> [2]
460+
let sliced = buffer.slice(4, 2);
461+
let logical_run_ends: Vec<i32> = sliced.logical_run_ends().collect();
462+
assert_eq!(logical_run_ends, &[2]);
463+
assert_eq!(sliced.values_slice(), &[6]); // This shows how values_slice are different than logical run ends
464+
}
371465
}

arrow-cast/src/cast/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11746,9 +11746,6 @@ mod tests {
1174611746
}
1174711747

1174811748
#[test]
11749-
#[should_panic = "assertion `left == right` failed\n left: ScalarBuffer([1, 1, 2])\n right: [2, 2, 3]"]
11750-
// TODO: fix cast of RunArrays to account for sliced RunArray's
11751-
// https://github.com/apache/arrow-rs/issues/9018
1175211749
fn test_sliced_run_end_encoded_to_primitive() {
1175311750
let run_ends = Int32Array::from(vec![2, 5, 6]);
1175411751
let values = Int32Array::from(vec![1, 2, 3]);

arrow-cast/src/cast/run_array.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,17 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
7272

7373
// Expand to logical form
7474
_ => {
75-
let run_ends = run_array.run_ends().values().to_vec();
76-
let mut indices = Vec::with_capacity(run_array.run_ends().len());
77-
let mut physical_idx: usize = 0;
78-
for logical_idx in 0..run_array.run_ends().len() {
79-
// If the logical index is equal to the (next) run end, increment the physical index,
80-
// since we are at the end of a run.
75+
let len = run_array.len();
76+
let offset = run_array.offset();
77+
let run_ends = run_array.run_ends().values();
78+
79+
let mut indices = Vec::with_capacity(len);
80+
let mut physical_idx = run_array.get_start_physical_index();
81+
82+
for logical_idx in offset..offset + len {
8183
if logical_idx == run_ends[physical_idx].as_usize() {
84+
// If the logical index is equal to the (next) run end, increment the physical index,
85+
// since we are at the end of a run.
8286
physical_idx += 1;
8387
}
8488
indices.push(physical_idx as i32);

arrow-data/src/transform/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ pub fn build_extend(array: &ArrayData) -> Extend<'_> {
206206
let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
207207
source_buffer,
208208
source_run_ends.len(),
209-
start,
209+
start + array.offset(),
210210
len,
211211
dest_last_run_end,
212212
);

arrow-select/src/concat.rs

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ use arrow_buffer::{
4343
use arrow_data::ArrayDataBuilder;
4444
use arrow_data::transform::{Capacities, MutableArrayData};
4545
use arrow_schema::{ArrowError, DataType, FieldRef, Fields, SchemaRef};
46-
use std::{collections::HashSet, ops::Add, sync::Arc};
46+
use std::{
47+
collections::HashSet,
48+
ops::{Add, Sub},
49+
sync::Arc,
50+
};
4751

4852
fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
4953
let mut item_capacity = 0;
@@ -350,7 +354,7 @@ fn concat_structs(arrays: &[&dyn Array], fields: &Fields) -> Result<ArrayRef, Ar
350354
/// 3. Creating a new RunArray with the combined data
351355
fn concat_run_arrays<R: RunEndIndexType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError>
352356
where
353-
R::Native: Add<Output = R::Native>,
357+
R::Native: Add<Output = R::Native> + Sub<Output = R::Native> + Ord,
354358
{
355359
let run_arrays: Vec<_> = arrays
356360
.iter()
@@ -364,7 +368,7 @@ where
364368
run_arrays
365369
.iter()
366370
.scan(R::default_value(), |acc, run_array| {
367-
*acc = *acc + *run_array.run_ends().values().last().unwrap();
371+
*acc = *acc + R::Native::from_usize(run_array.len()).unwrap();
368372
Some(*acc)
369373
}),
370374
)
@@ -377,20 +381,27 @@ where
377381
PrimitiveArray::<R>::from_iter_values(run_arrays.iter().enumerate().flat_map(
378382
move |(i, run_array)| {
379383
let adjustment = needed_run_end_adjustments[i];
384+
let offset = R::Native::from_usize(run_array.offset()).unwrap();
385+
let length = R::Native::from_usize(run_array.len()).unwrap();
386+
380387
run_array
381388
.run_ends()
382-
.values()
389+
.values_slice()
383390
.iter()
384-
.map(move |run_end| *run_end + adjustment)
391+
.map(move |run_end| {
392+
let value = *run_end - offset;
393+
//min required for cases where slice ends in the partway of the run at the end.
394+
value.min(length) + adjustment
395+
})
385396
},
386397
));
387398

388-
let all_values = concat(
389-
&run_arrays
390-
.iter()
391-
.map(|x| x.values().as_ref())
392-
.collect::<Vec<_>>(),
393-
)?;
399+
let values_slices: Vec<ArrayRef> = run_arrays
400+
.iter()
401+
.map(|run_array| run_array.values_slice())
402+
.collect();
403+
404+
let all_values = concat(&values_slices.iter().map(|x| x.as_ref()).collect::<Vec<_>>())?;
394405

395406
let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
396407
.len(total_len)
@@ -1716,9 +1727,6 @@ mod tests {
17161727
}
17171728

17181729
#[test]
1719-
#[should_panic = "assertion `left == right` failed\n left: [20, 20, 40, 40, 40]\n right: [10, 10, 20, 20, 30, 40, 40, 40]"]
1720-
// TODO: fix concat of RunArrays to account for sliced RunArray's
1721-
// https://github.com/apache/arrow-rs/issues/9018
17221730
fn test_concat_sliced_run_array() {
17231731
// Slicing away first run in both arrays
17241732
let run_ends1 = Int32Array::from(vec![2, 4]);
@@ -1879,4 +1887,42 @@ mod tests {
18791887
assert_eq!(values.len(), 6);
18801888
assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
18811889
}
1890+
1891+
#[test]
1892+
fn test_concat_run_array_with_truncated_run() {
1893+
// Create a run array with run ends [2, 5] and values [10, 20]
1894+
// Logical: [10, 10, 20, 20, 20]
1895+
let run_ends1 = Int32Array::from(vec![2, 5]);
1896+
let values1 = Int32Array::from(vec![10, 20]);
1897+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1898+
1899+
// This test case handles the scenario where a slice ends partway through its final run.
1900+
// It validates the logic in `concat_run_arrays` that uses `.min(length)` to truncate
1901+
// the last run end to the slice's logical length:
1902+
//
1903+
// .map(move |run_end| {
1904+
// let value = *run_end - offset;
1905+
// value.min(length) + adjustment
1906+
// })
1907+
//
1908+
// For array1.slice(0, 3) (logical length 3), the last physical run ends at 5.
1909+
// Without min(length), its run end would be 5 - 0 = 5.
1910+
// With min(length), it becomes min(5 - 0, 3) = 3.
1911+
let array1_sliced = array1.slice(0, 3);
1912+
1913+
let run_ends2 = Int32Array::from(vec![2]);
1914+
let values2 = Int32Array::from(vec![30]);
1915+
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1916+
1917+
let result = concat(&[&array1_sliced, &array2]).unwrap();
1918+
let result_run_array = result.as_run::<Int32Type>();
1919+
1920+
// Result should be [10, 10, 20, 30, 30]
1921+
// Run ends should be [2, 3, 5]
1922+
assert_eq!(result_run_array.len(), 5);
1923+
let run_ends = result_run_array.run_ends().values();
1924+
let values = result_run_array.values().as_primitive::<Int32Type>();
1925+
assert_eq!(values.values(), &[10, 20, 30]);
1926+
assert_eq!(&[2, 3, 5], run_ends);
1927+
}
18821928
}

arrow-select/src/filter.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -487,17 +487,21 @@ where
487487
R::Native: AddAssign,
488488
{
489489
let run_ends: &RunEndBuffer<R::Native> = array.run_ends();
490-
let mut new_run_ends = vec![R::default_value(); run_ends.len()];
490+
let start_physical = run_ends.get_start_physical_index();
491+
let physical_len = run_ends.values_slice().len();
492+
493+
let mut new_run_ends = vec![R::default_value(); physical_len];
494+
let offset = run_ends.offset() as u64;
491495

492496
let mut start = 0u64;
493497
let mut j = 0;
494498
let mut count = R::default_value();
495499
let filter_values = predicate.filter.values();
496500
let run_ends = run_ends.inner();
497501

498-
let pred: BooleanArray = BooleanBuffer::collect_bool(run_ends.len(), |i| {
502+
let pred: BooleanArray = BooleanBuffer::collect_bool(physical_len, |i| {
499503
let mut keep = false;
500-
let mut end = run_ends[i].into() as u64;
504+
let mut end = (run_ends[i + start_physical].into() as u64).saturating_sub(offset);
501505
let difference = end.saturating_sub(filter_values.len() as u64);
502506
end -= difference;
503507

@@ -517,8 +521,8 @@ where
517521

518522
new_run_ends.truncate(j);
519523

520-
let values = array.values();
521-
let values = filter(&values, &pred)?;
524+
let values = array.values_slice();
525+
let values = filter(values.as_ref(), &pred)?;
522526

523527
let run_ends = PrimitiveArray::<R>::try_new(new_run_ends.into(), None)?;
524528
RunArray::try_new(&run_ends, &values)
@@ -1355,9 +1359,6 @@ mod tests {
13551359
}
13561360

13571361
#[test]
1358-
#[should_panic = "assertion `left == right` failed\n left: [-2, 9]\n right: [7, -2]"]
1359-
// TODO: fix filter of RunArrays to account for sliced RunArray's
1360-
// https://github.com/apache/arrow-rs/issues/9018
13611362
fn test_filter_run_end_encoding_array_sliced() {
13621363
let run_ends = Int64Array::from(vec![2, 3, 8]);
13631364
let values = Int64Array::from(vec![7, -2, 9]);

arrow-select/src/interleave.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,9 +1172,6 @@ mod tests {
11721172
}
11731173

11741174
#[test]
1175-
#[should_panic = "assertion `left == right` failed\n left: [1, 4, 2, 5, 6]\n right: [2, 5, 2, 5, 6]"]
1176-
// TODO: fix interleave of RunArrays to account for sliced RunArray's
1177-
// https://github.com/apache/arrow-rs/issues/9018
11781175
fn test_interleave_run_end_encoded_sliced() {
11791176
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
11801177
builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
@@ -1186,7 +1183,7 @@ mod tests {
11861183
let b = builder.finish();
11871184
let b = b.slice(1, 3); // [5, 5, 6]
11881185

1189-
let indices = &[(0, 1), (1, 0), (0, 3), (1, 2), (1, 3)];
1186+
let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
11901187
let result = interleave(&[&a, &b], indices).unwrap();
11911188

11921189
let result = result.as_run::<Int32Type>();

0 commit comments

Comments
 (0)