diff --git a/arrow-cast/Cargo.toml b/arrow-cast/Cargo.toml index fb5ad1af3d3a..1e698833eaa0 100644 --- a/arrow-cast/Cargo.toml +++ b/arrow-cast/Cargo.toml @@ -43,7 +43,6 @@ force_validate = [] arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-data = { workspace = true } -arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } chrono = { workspace = true } diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index 8d70afef3ab6..ee8260048735 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -16,7 +16,20 @@ // under the License. use crate::cast::*; -use arrow_ord::partition::partition; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + ArrowDictionaryKeyType, ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, + Decimal256Type, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, + DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, + Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_array::{ + Array, ArrayRef, BinaryViewArray, BooleanArray, DictionaryArray, FixedSizeBinaryArray, + GenericBinaryArray, GenericStringArray, PrimitiveArray, StringViewArray, +}; /// Attempts to cast a `RunArray` with index type K into /// `to_type` for supported types. @@ -134,16 +147,8 @@ pub(crate) fn cast_to_run_end_encoded( )); } - // Partition the array to identify runs of consecutive equal values - let partitions = partition(&[Arc::clone(cast_array)])?; - let mut run_ends = Vec::new(); - let mut values_indexes = Vec::new(); - let mut last_partition_end = 0; - for partition in partitions.ranges() { - values_indexes.push(last_partition_end); - run_ends.push(partition.end); - last_partition_end = partition.end; - } + // Identify run boundaries by comparing consecutive values + let (run_ends, values_indexes) = compute_run_boundaries(cast_array); // Build the run_ends array for run_end in run_ends { @@ -162,3 +167,475 @@ pub(crate) fn cast_to_run_end_encoded( let run_array = RunArray::::try_new(&run_ends_array, values_array.as_ref())?; Ok(Arc::new(run_array)) } + +fn compute_run_boundaries(array: &ArrayRef) -> (Vec, Vec) { + if array.is_empty() { + return (Vec::new(), Vec::new()); + } + + use arrow_schema::{DataType::*, IntervalUnit, TimeUnit}; + + match array.data_type() { + Null => (vec![array.len()], vec![0]), + Boolean => runs_for_boolean(array.as_boolean()), + Int8 => runs_for_primitive(array.as_primitive::()), + Int16 => runs_for_primitive(array.as_primitive::()), + Int32 => runs_for_primitive(array.as_primitive::()), + Int64 => runs_for_primitive(array.as_primitive::()), + UInt8 => runs_for_primitive(array.as_primitive::()), + UInt16 => runs_for_primitive(array.as_primitive::()), + UInt32 => runs_for_primitive(array.as_primitive::()), + UInt64 => runs_for_primitive(array.as_primitive::()), + Float16 => runs_for_primitive(array.as_primitive::()), + Float32 => runs_for_primitive(array.as_primitive::()), + Float64 => runs_for_primitive(array.as_primitive::()), + Date32 => runs_for_primitive(array.as_primitive::()), + Date64 => runs_for_primitive(array.as_primitive::()), + Time32(TimeUnit::Second) => runs_for_primitive(array.as_primitive::()), + Time32(TimeUnit::Millisecond) => { + runs_for_primitive(array.as_primitive::()) + } + Time64(TimeUnit::Microsecond) => { + runs_for_primitive(array.as_primitive::()) + } + Time64(TimeUnit::Nanosecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Second) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Millisecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Microsecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Nanosecond) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Second, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Millisecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Microsecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Nanosecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::YearMonth) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::DayTime) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::MonthDayNano) => { + runs_for_primitive(array.as_primitive::()) + } + Decimal128(_, _) => runs_for_primitive(array.as_primitive::()), + Decimal256(_, _) => runs_for_primitive(array.as_primitive::()), + Utf8 => runs_for_string_i32(array.as_string::()), + LargeUtf8 => runs_for_string_i64(array.as_string::()), + Utf8View => runs_for_string_view(array.as_string_view()), + Binary => runs_for_binary_i32(array.as_binary::()), + LargeBinary => runs_for_binary_i64(array.as_binary::()), + BinaryView => runs_for_binary_view(array.as_binary_view()), + FixedSizeBinary(_) => runs_for_fixed_size_binary(array.as_fixed_size_binary()), + Dictionary(key_type, _) => match key_type.as_ref() { + Int8 => runs_for_dictionary::(array.as_dictionary()), + Int16 => runs_for_dictionary::(array.as_dictionary()), + Int32 => runs_for_dictionary::(array.as_dictionary()), + Int64 => runs_for_dictionary::(array.as_dictionary()), + UInt8 => runs_for_dictionary::(array.as_dictionary()), + UInt16 => runs_for_dictionary::(array.as_dictionary()), + UInt32 => runs_for_dictionary::(array.as_dictionary()), + UInt64 => runs_for_dictionary::(array.as_dictionary()), + _ => runs_generic(array.as_ref()), + }, + _ => runs_generic(array.as_ref()), + } +} + +fn runs_for_boolean(array: &BooleanArray) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + let mut current_valid = array.is_valid(0); + let mut current_value = if current_valid { array.value(0) } else { false }; + + for idx in 1..len { + let valid = array.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let value = array.value(idx); + if value != current_value { + current_value = value; + boundary = true; + } + } else if current_valid != valid { + boundary = true; + if valid { + current_value = array.value(idx); + } + } + + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_primitive( + array: &PrimitiveArray, +) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let values = array.values(); + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + + if array.null_count() == 0 { + let mut current = unsafe { *values.get_unchecked(0) }; + let mut idx = 1; + while idx < len { + let boundary = scan_run_end::(values, current, idx); + if boundary == len { + break; + } + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(boundary); + current = unsafe { *values.get_unchecked(boundary) }; + idx = boundary + 1; + } + return finalize_runs(run_boundaries, len); + } + + let nulls = array + .nulls() + .expect("null_count > 0 implies a null buffer is present"); + let mut current_valid = nulls.is_valid(0); + let mut current_value = unsafe { *values.get_unchecked(0) }; + for idx in 1..len { + let valid = nulls.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let value = unsafe { *values.get_unchecked(idx) }; + if value != current_value { + current_value = value; + boundary = true; + } + } else if current_valid != valid { + boundary = true; + if valid { + current_value = unsafe { *values.get_unchecked(idx) }; + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + finalize_runs(run_boundaries, len) +} + +fn runs_for_binary_i32(array: &GenericBinaryArray) -> (Vec, Vec) { + let mut to_usize = |v: i32| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_binary_i64(array: &GenericBinaryArray) -> (Vec, Vec) { + let mut to_usize = |v: i64| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_binary_like( + len: usize, + null_count: usize, + offsets: &[T], + values: &[u8], + mut is_valid: impl FnMut(usize) -> bool, + to_usize: &mut impl FnMut(T) -> usize, +) -> (Vec, Vec) { + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + + if null_count == 0 { + let mut current_start = to_usize(offsets[0]); + let mut current_end = to_usize(offsets[1]); + for idx in 1..len { + let start = to_usize(offsets[idx]); + let end = to_usize(offsets[idx + 1]); + if (end - start) != (current_end - current_start) + || values[start..end] != values[current_start..current_end] + { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_start = start; + current_end = end; + } + } + } else { + let mut current_valid = is_valid(0); + let mut current_range = (to_usize(offsets[0]), to_usize(offsets[1])); + for idx in 1..len { + let valid = is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let start = to_usize(offsets[idx]); + let end = to_usize(offsets[idx + 1]); + let (current_start, current_end) = current_range; + if (end - start) != (current_end - current_start) + || values[start..end] != values[current_start..current_end] + { + boundary = true; + current_range = (start, end); + } + } else if current_valid != valid { + boundary = true; + if valid { + current_range = (to_usize(offsets[idx]), to_usize(offsets[idx + 1])); + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_string_i32(array: &GenericStringArray) -> (Vec, Vec) { + let mut to_usize = |v: i32| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_string_i64(array: &GenericStringArray) -> (Vec, Vec) { + let mut to_usize = |v: i64| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_string_view(array: &StringViewArray) -> (Vec, Vec) { + runs_generic(array) +} + +fn runs_for_binary_view(array: &BinaryViewArray) -> (Vec, Vec) { + runs_generic(array) +} + +fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let width = array.value_length() as usize; + let values = array.value_data(); + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + if array.null_count() == 0 { + let mut current_slice = &values[0..width]; + for idx in 1..len { + let start = idx * width; + let slice = &values[start..start + width]; + if slice != current_slice { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_slice = slice; + } + } + } else { + let nulls = array + .nulls() + .expect("null_count > 0 implies a null buffer is present"); + let mut current_valid = nulls.is_valid(0); + let mut current_slice = &values[0..width]; + for idx in 1..len { + let valid = nulls.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let start = idx * width; + let slice = &values[start..start + width]; + if slice != current_slice { + boundary = true; + current_slice = slice; + } + } else if current_valid != valid { + boundary = true; + if valid { + let start = idx * width; + current_slice = &values[start..start + width]; + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_dictionary( + array: &DictionaryArray, +) -> (Vec, Vec) { + runs_for_primitive(array.keys()) +} + +fn runs_generic(array: &dyn Array) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + let mut current_data = array.slice(0, 1).to_data(); + for idx in 1..len { + let next_data = array.slice(idx, 1).to_data(); + if current_data != next_data { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_data = next_data; + } + } + + finalize_runs(run_boundaries, len) +} + +fn trivial_runs(len: usize) -> Option<(Vec, Vec)> { + match len { + 0 => Some((Vec::new(), Vec::new())), + 1 => Some((vec![1], vec![0])), + _ => None, + } +} + +#[inline] +fn ensure_capacity(vec: &mut Vec, total_len: usize) { + if vec.len() == vec.capacity() { + let remaining = total_len.saturating_sub(vec.len()); + vec.reserve(remaining.max(1)); + } +} + +fn finalize_runs(mut run_boundaries: Vec, len: usize) -> (Vec, Vec) { + let mut values_indexes = Vec::with_capacity(run_boundaries.len() + 1); + values_indexes.push(0); + values_indexes.extend_from_slice(&run_boundaries); + run_boundaries.push(len); + (run_boundaries, values_indexes) +} + +#[inline] +fn scan_run_end( + values: &[T::Native], + current: T::Native, + start: usize, +) -> usize { + let element_size = std::mem::size_of::(); + if element_size <= 8 && 16 % element_size == 0 { + let elements_per_chunk = 16 / element_size; + return scan_run_end_chunk::(values, current, start, elements_per_chunk, element_size); + } + scan_run_end_scalar::(values, current, start) +} + +#[inline] +fn scan_run_end_chunk( + values: &[T::Native], + current: T::Native, + start: usize, + elements_per_chunk: usize, + element_size: usize, +) -> usize { + let len = values.len(); + let mut idx = start; + if idx >= len { + return len; + } + + let mut pattern_bytes = [0u8; 16]; + unsafe { + let value_bytes = + std::slice::from_raw_parts(¤t as *const T::Native as *const u8, element_size); + for chunk in pattern_bytes.chunks_mut(element_size) { + chunk.copy_from_slice(value_bytes); + } + } + let pattern = u128::from_ne_bytes(pattern_bytes); + + while idx + elements_per_chunk <= len { + let chunk = unsafe { (values.as_ptr().add(idx) as *const u128).read_unaligned() }; + if chunk != pattern { + for offset in 0..elements_per_chunk { + let value = unsafe { *values.get_unchecked(idx + offset) }; + if value != current { + return idx + offset; + } + } + unreachable!("chunk mismatch without locating differing element"); + } + idx += elements_per_chunk; + } + + scan_run_end_scalar::(values, current, idx) +} + +#[inline] +fn scan_run_end_scalar( + values: &[T::Native], + current: T::Native, + mut idx: usize, +) -> usize { + let len = values.len(); + while idx < len && unsafe { *values.get_unchecked(idx) } == current { + idx += 1; + } + idx +}