From bfbe55428c816fb8a23f1d7472bac3180e5ae93b Mon Sep 17 00:00:00 2001 From: Manish Kumar Date: Tue, 23 Dec 2025 14:35:29 +0530 Subject: [PATCH] fix:[9018]Fixed RunArray slice offsets --- arrow-array/src/array/run_array.rs | 41 ++++++++++++++++++++++++++ arrow-buffer/src/buffer/run.rs | 40 +++++++++++++++++++++++++ arrow-cast/src/cast/mod.rs | 3 -- arrow-cast/src/cast/run_array.rs | 16 ++++++---- arrow-data/src/transform/run.rs | 2 +- arrow-select/src/concat.rs | 47 +++++++++++++++++++++--------- arrow-select/src/filter.rs | 18 +++++++----- arrow-select/src/interleave.rs | 5 +--- 8 files changed, 137 insertions(+), 35 deletions(-) diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 5254a0ed3cdc..9ca1af943d27 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -136,6 +136,16 @@ impl RunArray { &self.values } + /// Similar to [`values`] but accounts for logical slicing, returning only the values + /// that are part of the logical slice of this array. + /// + /// [`values`]: Self::values + pub fn values_slice(&self) -> ArrayRef { + let start = self.get_start_physical_index(); + let end = self.get_end_physical_index(); + self.values.slice(start, end - start + 1) + } + /// Returns the physical index at which the array slice starts. /// /// See [`RunEndBuffer::get_start_physical_index`]. @@ -1132,4 +1142,35 @@ mod tests { assert_eq!(array_i16_1, array_i16_2); } + + #[test] + fn test_run_array_values_slice() { + // 0, 0, 1, 1, 1, 2...2 (15 2s) + let run_ends: PrimitiveArray = vec![2, 5, 20].into(); + let values: PrimitiveArray = vec![0, 1, 2].into(); + let array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let slice = array.slice(1, 4); // 0 | 1, 1, 1 | + // logical indices: 1, 2, 3, 4 + // physical indices: 0, 1, 1, 1 + // values at 0 is 0 + // values at 1 is 1 + // values slice should be [0, 1] + assert_eq!(slice.get_start_physical_index(), 0); + assert_eq!(slice.get_end_physical_index(), 1); + + let values_slice = slice.values_slice(); + let values_slice = values_slice.as_primitive::(); + assert_eq!(values_slice.values(), &[0, 1]); + + let slice2 = array.slice(2, 3); // 1, 1, 1 + // logical indices: 2, 3, 4 + // physical indices: 1, 1, 1 + assert_eq!(slice2.get_start_physical_index(), 1); + assert_eq!(slice2.get_end_physical_index(), 1); + + let values_slice2 = slice2.values_slice(); + let values_slice2 = values_slice2.as_primitive::(); + assert_eq!(values_slice2.values(), &[1]); + } } diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs index 9458291ef8fc..6603dec1bac1 100644 --- a/arrow-buffer/src/buffer/run.rs +++ b/arrow-buffer/src/buffer/run.rs @@ -189,6 +189,24 @@ where &self.run_ends } + /// Returns an iterator yielding run ends adjusted for the logical slice. + /// + /// Each yielded value is subtracted by the [`logical_offset`] and capped + /// at the [`logical_length`]. + /// + /// [`logical_offset`]: Self::offset + /// [`logical_length`]: Self::len + pub fn sliced_values(&self) -> impl Iterator + '_ { + let offset = self.logical_offset; + let len = self.logical_length; + let start = self.get_start_physical_index(); + let end = self.get_end_physical_index(); + self.run_ends[start..=end].iter().map(move |&val| { + let val = val.as_usize().saturating_sub(offset).min(len); + E::from_usize(val).unwrap() + }) + } + /// Returns the maximum run-end encoded in the underlying buffer; that is, the /// last physical run of the buffer. This does not take into account any logical /// slicing that may have occurred. @@ -368,4 +386,26 @@ mod tests { assert_eq!(buffer.get_start_physical_index(), 0); assert_eq!(buffer.get_end_physical_index(), 0); } + + #[test] + fn test_sliced_values() { + // [0, 0, 1, 2, 2, 2] + let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6); + + // Slice: [0, 1, 2, 2] start: 1, len: 4 + // Logical indices: 1, 2, 3, 4 + // Original run ends: [2, 3, 6] + // Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4] + let sliced = buffer.slice(1, 4); + let sliced_values: Vec = sliced.sliced_values().collect(); + assert_eq!(sliced_values, &[1, 2, 4]); + + // Slice: [2, 2] start: 4, len: 2 + // Original run ends: [2, 3, 6] + // Slicing at 4 means we only have the last run (physical index 2, which ends at 6) + // Adjusted: [6-4] capped at 2 -> [2] + let sliced = buffer.slice(4, 2); + let sliced_values: Vec = sliced.sliced_values().collect(); + assert_eq!(sliced_values, &[2]); + } } diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 67fd02837d65..34393b3b3114 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -11746,9 +11746,6 @@ mod tests { } #[test] - #[should_panic = "assertion `left == right` failed\n left: ScalarBuffer([1, 1, 2])\n right: [2, 2, 3]"] - // TODO: fix cast of RunArrays to account for sliced RunArray's - // https://github.com/apache/arrow-rs/issues/9018 fn test_sliced_run_end_encoded_to_primitive() { let run_ends = Int32Array::from(vec![2, 5, 6]); let values = Int32Array::from(vec![1, 2, 3]); diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index 0d4679d9f3f5..3e14804dc824 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -72,13 +72,17 @@ pub(crate) fn run_end_encoded_cast( // Expand to logical form _ => { - let run_ends = run_array.run_ends().values().to_vec(); - let mut indices = Vec::with_capacity(run_array.run_ends().len()); - let mut physical_idx: usize = 0; - for logical_idx in 0..run_array.run_ends().len() { - // If the logical index is equal to the (next) run end, increment the physical index, - // since we are at the end of a run. + let len = run_array.len(); + let offset = run_array.offset(); + let run_ends = run_array.run_ends().values(); + + let mut indices = Vec::with_capacity(len); + let mut physical_idx = run_array.get_start_physical_index(); + + for logical_idx in offset..offset + len { if logical_idx == run_ends[physical_idx].as_usize() { + // If the logical index is equal to the (next) run end, increment the physical index, + // since we are at the end of a run. physical_idx += 1; } indices.push(physical_idx as i32); diff --git a/arrow-data/src/transform/run.rs b/arrow-data/src/transform/run.rs index 89daa00516c2..6ae3a034f340 100644 --- a/arrow-data/src/transform/run.rs +++ b/arrow-data/src/transform/run.rs @@ -206,7 +206,7 @@ pub fn build_extend(array: &ArrayData) -> Extend<'_> { let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>( source_buffer, source_run_ends.len(), - start, + start + array.offset(), len, dest_last_run_end, ); diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 84c41b6e16f3..a6e3083a6ee7 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -364,7 +364,7 @@ where run_arrays .iter() .scan(R::default_value(), |acc, run_array| { - *acc = *acc + *run_array.run_ends().values().last().unwrap(); + *acc = *acc + R::Native::from_usize(run_array.len()).unwrap(); Some(*acc) }), ) @@ -379,18 +379,17 @@ where let adjustment = needed_run_end_adjustments[i]; run_array .run_ends() - .values() - .iter() - .map(move |run_end| *run_end + adjustment) + .sliced_values() + .map(move |run_end| run_end + adjustment) }, )); - let all_values = concat( - &run_arrays - .iter() - .map(|x| x.values().as_ref()) - .collect::>(), - )?; + let values_slices: Vec = run_arrays + .iter() + .map(|run_array| run_array.values_slice()) + .collect(); + + let all_values = concat(&values_slices.iter().map(|x| x.as_ref()).collect::>())?; let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone()) .len(total_len) @@ -1716,9 +1715,6 @@ mod tests { } #[test] - #[should_panic = "assertion `left == right` failed\n left: [20, 20, 40, 40, 40]\n right: [10, 10, 20, 20, 30, 40, 40, 40]"] - // TODO: fix concat of RunArrays to account for sliced RunArray's - // https://github.com/apache/arrow-rs/issues/9018 fn test_concat_sliced_run_array() { // Slicing away first run in both arrays let run_ends1 = Int32Array::from(vec![2, 4]); @@ -1879,4 +1875,29 @@ mod tests { assert_eq!(values.len(), 6); assert_eq!(&[10, 20, 30, 40, 50, 60], values.values()); } + + #[test] + fn test_concat_run_array_with_truncated_run() { + // Create a run array with run ends [2, 5] and values [10, 20] + // Logical: [10, 10, 20, 20, 20] + let run_ends1 = Int32Array::from(vec![2, 5]); + let values1 = Int32Array::from(vec![10, 20]); + let array1 = RunArray::try_new(&run_ends1, &values1).unwrap(); + let array1_sliced = array1.slice(0, 3); + + let run_ends2 = Int32Array::from(vec![2]); + let values2 = Int32Array::from(vec![30]); + let array2 = RunArray::try_new(&run_ends2, &values2).unwrap(); + + let result = concat(&[&array1_sliced, &array2]).unwrap(); + let result_run_array = result.as_run::(); + + // Result should be [10, 10, 20, 30, 30] + // Run ends should be [2, 3, 5] + assert_eq!(result_run_array.len(), 5); + let run_ends = result_run_array.run_ends().values(); + let values = result_run_array.values().as_primitive::(); + assert_eq!(values.values(), &[10, 20, 30]); + assert_eq!(&[2, 3, 5], run_ends); + } } diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 1aa933ae18f5..07ce16de9646 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -487,7 +487,12 @@ where R::Native: AddAssign, { let run_ends: &RunEndBuffer = array.run_ends(); - let mut new_run_ends = vec![R::default_value(); run_ends.len()]; + let start_physical = run_ends.get_start_physical_index(); + let end_physical = run_ends.get_end_physical_index(); + let physical_len = end_physical - start_physical + 1; + + let mut new_run_ends = vec![R::default_value(); physical_len]; + let offset = run_ends.offset() as u64; let mut start = 0u64; let mut j = 0; @@ -495,9 +500,9 @@ where let filter_values = predicate.filter.values(); let run_ends = run_ends.inner(); - let pred: BooleanArray = BooleanBuffer::collect_bool(run_ends.len(), |i| { + let pred: BooleanArray = BooleanBuffer::collect_bool(physical_len, |i| { let mut keep = false; - let mut end = run_ends[i].into() as u64; + let mut end = (run_ends[i + start_physical].into() as u64).saturating_sub(offset); let difference = end.saturating_sub(filter_values.len() as u64); end -= difference; @@ -517,8 +522,8 @@ where new_run_ends.truncate(j); - let values = array.values(); - let values = filter(&values, &pred)?; + let values = array.values_slice(); + let values = filter(values.as_ref(), &pred)?; let run_ends = PrimitiveArray::::try_new(new_run_ends.into(), None)?; RunArray::try_new(&run_ends, &values) @@ -1355,9 +1360,6 @@ mod tests { } #[test] - #[should_panic = "assertion `left == right` failed\n left: [-2, 9]\n right: [7, -2]"] - // TODO: fix filter of RunArrays to account for sliced RunArray's - // https://github.com/apache/arrow-rs/issues/9018 fn test_filter_run_end_encoding_array_sliced() { let run_ends = Int64Array::from(vec![2, 3, 8]); let values = Int64Array::from(vec![7, -2, 9]); diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index 7a04c83f79e9..d4303e8d85eb 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -1172,9 +1172,6 @@ mod tests { } #[test] - #[should_panic = "assertion `left == right` failed\n left: [1, 4, 2, 5, 6]\n right: [2, 5, 2, 5, 6]"] - // TODO: fix interleave of RunArrays to account for sliced RunArray's - // https://github.com/apache/arrow-rs/issues/9018 fn test_interleave_run_end_encoded_sliced() { let mut builder = PrimitiveRunBuilder::::new(); builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some)); @@ -1186,7 +1183,7 @@ mod tests { let b = builder.finish(); let b = b.slice(1, 3); // [5, 5, 6] - let indices = &[(0, 1), (1, 0), (0, 3), (1, 2), (1, 3)]; + let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)]; let result = interleave(&[&a, &b], indices).unwrap(); let result = result.as_run::();