Skip to content

Commit 14f94b2

Browse files
committed
fix: [9018]Fixed RunArray slice offsets(row, cast, eq)
1 parent ac640da commit 14f94b2

File tree

7 files changed

+213
-65
lines changed

7 files changed

+213
-65
lines changed

arrow-array/src/array/run_array.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ impl<R: RunEndIndexType> RunArray<R> {
141141
///
142142
/// [`values`]: Self::values
143143
pub fn values_slice(&self) -> ArrayRef {
144+
if self.len() == 0 {
145+
return self.values.slice(0, 0);
146+
}
144147
let start = self.get_start_physical_index();
145148
let end = self.get_end_physical_index();
146149
self.values.slice(start, end - start + 1)
@@ -1173,4 +1176,56 @@ mod tests {
11731176
let values_slice2 = values_slice2.as_primitive::<Int32Type>();
11741177
assert_eq!(values_slice2.values(), &[1]);
11751178
}
1179+
1180+
#[test]
1181+
fn test_run_array_eq_diff_physical_same_logical() {
1182+
let run_ends1 = Int32Array::from(vec![1, 3, 6]);
1183+
let values1 = StringArray::from(vec!["a", "b", "c"]);
1184+
let array1 = RunArray::<Int32Type>::try_new(&run_ends1, &values1).unwrap();
1185+
1186+
let run_ends2 = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1187+
let values2 = StringArray::from(vec!["a", "b", "b", "c", "c", "c"]);
1188+
let array2 = RunArray::<Int32Type>::try_new(&run_ends2, &values2).unwrap();
1189+
1190+
assert_eq!(array1, array2);
1191+
}
1192+
1193+
#[test]
1194+
fn test_run_array_eq_sliced() {
1195+
let run_ends1 = Int32Array::from(vec![2, 5, 10]);
1196+
let values1 = StringArray::from(vec!["a", "b", "c"]);
1197+
let array1 = RunArray::<Int32Type>::try_new(&run_ends1, &values1).unwrap();
1198+
// Logical: a, a, b, b, b, c, c, c, c, c
1199+
1200+
let slice1 = array1.slice(1, 6);
1201+
// Logical: a, b, b, b, c, c
1202+
1203+
let run_ends2 = Int32Array::from(vec![1, 4, 6]);
1204+
let values2 = StringArray::from(vec!["a", "b", "c"]);
1205+
let array2 = RunArray::<Int32Type>::try_new(&run_ends2, &values2).unwrap();
1206+
// Logical: a, b, b, b, c, c
1207+
1208+
assert_eq!(slice1, array2);
1209+
1210+
let slice2 = array1.slice(2, 3);
1211+
// Logical: b, b, b
1212+
let run_ends3 = Int32Array::from(vec![3]);
1213+
let values3 = StringArray::from(vec!["b"]);
1214+
let array3 = RunArray::<Int32Type>::try_new(&run_ends3, &values3).unwrap();
1215+
assert_eq!(slice2, array3);
1216+
}
1217+
1218+
#[test]
1219+
fn test_run_array_eq_sliced_different_offsets() {
1220+
let run_ends1 = Int32Array::from(vec![2, 5, 10]);
1221+
let values1 = StringArray::from(vec!["a", "b", "c"]);
1222+
let array1 = RunArray::<Int32Type>::try_new(&run_ends1, &values1).unwrap();
1223+
1224+
let slice1 = array1.slice(1, 4); // a, b, b, b
1225+
let slice2 = array1.slice(1, 4);
1226+
assert_eq!(slice1, slice2);
1227+
1228+
let slice3 = array1.slice(0, 4); // a, a, b, b
1229+
assert_ne!(slice1, slice3);
1230+
}
11761231
}

arrow-buffer/src/buffer/run.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,14 @@ where
199199
pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
200200
let offset = self.logical_offset;
201201
let len = self.logical_length;
202-
let start = self.get_start_physical_index();
203-
let end = self.get_end_physical_index();
204-
self.run_ends[start..=end].iter().map(move |&val| {
202+
let physical_slice = if len == 0 {
203+
&self.run_ends[0..0]
204+
} else {
205+
let start = self.get_start_physical_index();
206+
let end = self.get_end_physical_index();
207+
&self.run_ends[start..=end]
208+
};
209+
physical_slice.iter().map(move |&val| {
205210
let val = val.as_usize().saturating_sub(offset).min(len);
206211
E::from_usize(val).unwrap()
207212
})

arrow-cast/src/cast/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12451,4 +12451,32 @@ mod tests {
1245112451
assert_eq!(casted.as_ref(), &expected);
1245212452
}
1245312453
}
12454+
12455+
#[test]
12456+
fn test_cast_between_sliced_run_end_encoded() {
12457+
let run_ends = Int16Array::from(vec![2, 5, 8]);
12458+
let values = StringArray::from(vec!["a", "b", "c"]);
12459+
12460+
let ree_array = RunArray::<Int16Type>::try_new(&run_ends, &values).unwrap();
12461+
let ree_array = ree_array.slice(1, 2);
12462+
let array_ref = Arc::new(ree_array) as ArrayRef;
12463+
12464+
let target_type = DataType::RunEndEncoded(
12465+
Arc::new(Field::new("run_ends", DataType::Int64, false)),
12466+
Arc::new(Field::new("values", DataType::Utf8, true)),
12467+
);
12468+
let cast_options = CastOptions {
12469+
safe: false,
12470+
format_options: FormatOptions::default(),
12471+
};
12472+
12473+
let result = cast_with_options(&array_ref, &target_type, &cast_options).unwrap();
12474+
let run_array = result.as_run::<Int64Type>();
12475+
let run_array = run_array.downcast::<StringArray>().unwrap();
12476+
12477+
let expected = vec!["a", "b"];
12478+
let actual = run_array.into_iter().flatten().collect::<Vec<_>>();
12479+
12480+
assert_eq!(expected, actual);
12481+
}
1245412482
}

arrow-cast/src/cast/run_array.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,18 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
3232
.downcast_ref::<RunArray<K>>()
3333
.ok_or_else(|| ArrowError::CastError("Expected RunArray".to_string()))?;
3434

35-
let values = run_array.values();
36-
3735
match to_type {
3836
// Stay as RunEndEncoded, cast only the values
3937
DataType::RunEndEncoded(target_index_field, target_value_field) => {
40-
let cast_values =
41-
cast_with_options(values, target_value_field.data_type(), cast_options)?;
38+
let values = run_array.values_slice();
39+
let cast_values = cast_with_options(
40+
values.as_ref(),
41+
target_value_field.data_type(),
42+
cast_options,
43+
)?;
4244

43-
let run_ends_array = PrimitiveArray::<K>::from_iter_values(
44-
run_array.run_ends().values().iter().copied(),
45-
);
45+
let run_ends_array =
46+
PrimitiveArray::<K>::from_iter_values(run_array.run_ends().sliced_values());
4647
let cast_run_ends = cast_with_options(
4748
&run_ends_array,
4849
target_index_field.data_type(),
@@ -72,6 +73,7 @@ pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
7273

7374
// Expand to logical form
7475
_ => {
76+
let values = run_array.values();
7577
let len = run_array.len();
7678
let offset = run_array.offset();
7779
let run_ends = run_array.run_ends().values();

arrow-data/src/equal/run.rs

Lines changed: 93 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,118 @@
1616
// under the License.
1717

1818
use crate::data::ArrayData;
19+
use arrow_buffer::ArrowNativeType;
20+
use arrow_schema::DataType;
21+
use num_traits::ToPrimitive;
1922

2023
use super::equal_range;
2124

22-
/// The current implementation of comparison of run array support physical comparison.
23-
/// Comparing run encoded array based on logical indices (`lhs_start`, `rhs_start`) will
24-
/// be time consuming as converting from logical index to physical index cannot be done
25-
/// in constant time. The current comparison compares the underlying physical arrays.
2625
pub(super) fn run_equal(
2726
lhs: &ArrayData,
2827
rhs: &ArrayData,
2928
lhs_start: usize,
3029
rhs_start: usize,
3130
len: usize,
3231
) -> bool {
33-
if lhs_start != 0
34-
|| rhs_start != 0
35-
|| (lhs.len() != len && rhs.len() != len)
36-
|| lhs.offset() > 0
37-
|| rhs.offset() > 0
38-
{
39-
unimplemented!("Logical comparison for run array not supported.")
32+
let lhs_index_type = match lhs.data_type() {
33+
DataType::RunEndEncoded(f, _) => f.data_type(),
34+
_ => unreachable!(),
35+
};
36+
37+
match lhs_index_type {
38+
DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start, rhs_start, len),
39+
DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start, rhs_start, len),
40+
DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start, rhs_start, len),
41+
_ => unreachable!(),
4042
}
43+
}
4144

42-
if lhs.len() != rhs.len() {
43-
return false;
45+
fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
46+
lhs: &ArrayData,
47+
rhs: &ArrayData,
48+
lhs_start: usize,
49+
rhs_start: usize,
50+
len: usize,
51+
) -> bool {
52+
if len == 0 {
53+
return true;
4454
}
55+
// RunEndEncoded arrays are guaranteed to have at least 2 children [run_ends, values]
56+
let lhs_run_ends_data = &lhs.child_data()[0];
57+
let rhs_run_ends_data = &rhs.child_data()[0];
58+
let lhs_values = &lhs.child_data()[1];
59+
let rhs_values = &rhs.child_data()[1];
4560

46-
let lhs_child_data = lhs.child_data();
47-
let lhs_run_ends_array = &lhs_child_data[0];
48-
let lhs_values_array = &lhs_child_data[1];
61+
let lhs_run_ends = &lhs_run_ends_data.buffers()[0].typed_data::<T>()
62+
[lhs_run_ends_data.offset()..lhs_run_ends_data.offset() + lhs_run_ends_data.len()];
63+
let rhs_run_ends = &rhs_run_ends_data.buffers()[0].typed_data::<T>()
64+
[rhs_run_ends_data.offset()..rhs_run_ends_data.offset() + rhs_run_ends_data.len()];
4965

50-
let rhs_child_data = rhs.child_data();
51-
let rhs_run_ends_array = &rhs_child_data[0];
52-
let rhs_values_array = &rhs_child_data[1];
66+
let lhs_abs_start = lhs.offset() + lhs_start;
67+
let rhs_abs_start = rhs.offset() + rhs_start;
68+
let lhs_abs_end = lhs_abs_start + len;
69+
let rhs_abs_end = rhs_abs_start + len;
5370

54-
if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
55-
return false;
56-
}
71+
let l_start_phys = find_physical_index(lhs_run_ends, lhs_abs_start);
72+
let r_start_phys = find_physical_index(rhs_run_ends, rhs_abs_start);
73+
74+
let l_end_phys = find_physical_index(lhs_run_ends, lhs_abs_end - 1);
75+
let r_end_phys = find_physical_index(rhs_run_ends, rhs_abs_end - 1);
76+
77+
let mut l_phys = l_start_phys;
78+
let mut r_phys = r_start_phys;
5779

58-
if lhs_values_array.len() != rhs_values_array.len() {
59-
return false;
80+
let l_runs = l_end_phys - l_start_phys + 1;
81+
let r_runs = r_end_phys - r_start_phys + 1;
82+
83+
if l_runs == r_runs {
84+
let physical_match = lhs_run_ends[l_start_phys..l_end_phys]
85+
.iter()
86+
.zip(&rhs_run_ends[r_start_phys..r_end_phys])
87+
.all(|(l, r)| l.as_usize() - lhs_abs_start == r.as_usize() - rhs_abs_start);
88+
89+
if physical_match {
90+
return equal_range(lhs_values, rhs_values, l_start_phys, r_start_phys, l_runs);
91+
}
6092
}
6193

62-
// check run ends array are equal. The length of the physical array
63-
// is used to validate the child arrays.
64-
let run_ends_equal = equal_range(
65-
lhs_run_ends_array,
66-
rhs_run_ends_array,
67-
lhs_start,
68-
rhs_start,
69-
lhs_run_ends_array.len(),
70-
);
71-
72-
// if run ends array are not the same return early without validating
73-
// values array.
74-
if !run_ends_equal {
75-
return false;
94+
let mut processed = 0;
95+
while processed < len {
96+
if !equal_range(lhs_values, rhs_values, l_phys, r_phys, 1) {
97+
return false;
98+
}
99+
100+
let l_run_end = lhs_run_ends[l_phys].as_usize();
101+
let r_run_end = rhs_run_ends[r_phys].as_usize();
102+
103+
let l_remaining = l_run_end - (lhs_abs_start + processed);
104+
let r_remaining = r_run_end - (rhs_abs_start + processed);
105+
106+
let step = l_remaining.min(r_remaining).min(len - processed);
107+
processed += step;
108+
109+
if processed < len {
110+
if lhs_abs_start + processed == l_run_end {
111+
l_phys += 1;
112+
}
113+
if rhs_abs_start + processed == r_run_end {
114+
r_phys += 1;
115+
}
116+
}
76117
}
77118

78-
// check values array are equal
79-
equal_range(
80-
lhs_values_array,
81-
rhs_values_array,
82-
lhs_start,
83-
rhs_start,
84-
rhs_values_array.len(),
85-
)
119+
true
120+
}
121+
122+
fn find_physical_index<T: ArrowNativeType + ToPrimitive>(
123+
run_ends: &[T],
124+
logical_index: usize,
125+
) -> usize {
126+
if logical_index == 0 {
127+
return 0;
128+
}
129+
match run_ends.binary_search_by(|val| val.as_usize().cmp(&logical_index)) {
130+
Ok(idx) => idx + 1,
131+
Err(idx) => idx,
132+
}
86133
}

arrow-row/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -657,14 +657,14 @@ impl Codec {
657657
Codec::RunEndEncoded(converter) => {
658658
let values = match array.data_type() {
659659
DataType::RunEndEncoded(r, _) => match r.data_type() {
660-
DataType::Int16 => array.as_run::<Int16Type>().values(),
661-
DataType::Int32 => array.as_run::<Int32Type>().values(),
662-
DataType::Int64 => array.as_run::<Int64Type>().values(),
660+
DataType::Int16 => array.as_run::<Int16Type>().values_slice(),
661+
DataType::Int32 => array.as_run::<Int32Type>().values_slice(),
662+
DataType::Int64 => array.as_run::<Int64Type>().values_slice(),
663663
_ => unreachable!("Unsupported run end index type: {r:?}"),
664664
},
665665
_ => unreachable!(),
666666
};
667-
let rows = converter.convert_columns(std::slice::from_ref(values))?;
667+
let rows = converter.convert_columns(std::slice::from_ref(&values))?;
668668
Ok(Encoder::RunEndEncoded(rows))
669669
}
670670
Codec::Union(converters, _) => {

arrow-row/src/run.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ pub fn compute_lengths<R: RunEndIndexType>(
2727
rows: &Rows,
2828
array: &RunArray<R>,
2929
) {
30-
let run_ends = array.run_ends().values();
30+
let run_ends = array.run_ends().sliced_values();
3131
let mut logical_start = 0;
3232

3333
// Iterate over each run and apply the same length to all logical positions in the run
34-
for (physical_idx, &run_end) in run_ends.iter().enumerate() {
34+
for (physical_idx, run_end) in run_ends.enumerate() {
3535
let logical_end = run_end.as_usize();
3636
let row_len = rows.row_len(physical_idx);
3737
let encoded_len = variable::padded_length(Some(row_len));
@@ -55,14 +55,14 @@ pub fn encode<R: RunEndIndexType>(
5555
opts: SortOptions,
5656
array: &RunArray<R>,
5757
) {
58-
let run_ends = array.run_ends();
58+
let run_ends = array.run_ends().sliced_values();
5959

6060
let mut logical_idx = 0;
6161
let mut offset_idx = 1; // Skip first offset
6262

6363
// Iterate over each run
64-
for physical_idx in 0..run_ends.values().len() {
65-
let run_end = run_ends.values()[physical_idx].as_usize();
64+
for (physical_idx, run_end) in run_ends.enumerate() {
65+
let run_end = run_end.as_usize();
6666

6767
// Process all elements in this run
6868
while logical_idx < run_end && offset_idx < offsets.len() {
@@ -639,4 +639,15 @@ mod tests {
639639
let result_ree = arrays[0].as_run::<Int32Type>();
640640
assert_eq!(result_ree.len(), 0);
641641
}
642+
643+
#[test]
644+
fn test_run_end_encoded_round_trip_sliced() {
645+
let values = Int64Array::from(vec![100, 200, 100, 300]);
646+
let run_ends = vec![2, 3, 5, 6];
647+
let array: RunArray<Int16Type> =
648+
RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
649+
let array = array.slice(2, 3);
650+
651+
assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
652+
}
642653
}

0 commit comments

Comments
 (0)