From e371251b57cfc4bd52d567b9b1eeea36f3724d0b Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Fri, 10 Oct 2025 01:50:48 -0500 Subject: [PATCH 1/5] Add support for run-end encoded (REE) arrays in arrow-avro --- arrow-avro/Cargo.toml | 3 +- arrow-avro/src/codec.rs | 59 +++++ arrow-avro/src/reader/record.rs | 352 ++++++++++++++++++++++++++ arrow-avro/src/schema.rs | 50 +++- arrow-avro/src/writer/encoder.rs | 200 ++++++++++++++- arrow-avro/src/writer/mod.rs | 407 +++++++++++++++++++++++++++++++ 6 files changed, 1052 insertions(+), 19 deletions(-) diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 975bcbe5e8ff..d5f105ae7b8b 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -43,12 +43,13 @@ canonical_extension_types = ["arrow-schema/canonical_extension_types"] md5 = ["dep:md5"] sha256 = ["dep:sha2"] small_decimals = [] -avro_custom_types = [] +avro_custom_types = ["arrow-select"] [dependencies] arrow-schema = { workspace = true } arrow-buffer = { workspace = true } arrow-array = { workspace = true } +arrow-select = { workspace = true, optional = true } serde_json = { version = "1.0", default-features = false, features = ["std"] } serde = { version = "1.0.188", features = ["derive"] } flate2 = { version = "1.0", default-features = false, features = [ diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index f946a5a1b95a..29464e8bf910 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -14,6 +14,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +//! Codec for Mapping Avro and Arrow types. + use crate::schema::{ AVRO_ENUM_SYMBOLS_METADATA_KEY, AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability, @@ -460,6 +463,8 @@ impl AvroDataType { }; default_encoding.parse_default_literal(default_json)? } + #[cfg(feature = "avro_custom_types")] + Codec::RunEndEncoded(values, _) => values.parse_default_literal(default_json)?, }; Ok(lit) } @@ -685,6 +690,8 @@ pub(crate) enum Codec { /// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second) #[cfg(feature = "avro_custom_types")] DurationSeconds, + #[cfg(feature = "avro_custom_types")] + RunEndEncoded(Arc, u8), } impl Codec { @@ -765,6 +772,19 @@ impl Codec { Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond), #[cfg(feature = "avro_custom_types")] Self::DurationSeconds => DataType::Duration(TimeUnit::Second), + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(values, bits) => { + let run_ends_dt = match *bits { + 16 => DataType::Int16, + 32 => DataType::Int32, + 64 => DataType::Int64, + _ => DataType::Int32, + }; + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", run_ends_dt, false)), + Arc::new(Field::new("values", values.codec().data_type(), true)), + ) + } } } @@ -936,6 +956,8 @@ impl From<&Codec> for UnionFieldKind { Codec::Uuid => Self::Uuid, Codec::Union(..) => Self::Union, #[cfg(feature = "avro_custom_types")] + Codec::RunEndEncoded(values, _) => UnionFieldKind::from(values.codec()), + #[cfg(feature = "avro_custom_types")] Codec::DurationNanos | Codec::DurationMicros | Codec::DurationMillis @@ -1141,6 +1163,16 @@ impl<'a> Maker<'a> { } } + #[cfg(feature = "avro_custom_types")] + #[inline] + fn propagate_nullability_into_ree(dt: &mut AvroDataType, nb: Nullability) { + if let Codec::RunEndEncoded(values, bits) = dt.codec.clone() { + let mut inner = (*values).clone(); + inner.nullability = Some(nb); + dt.codec = Codec::RunEndEncoded(Arc::new(inner), bits); + } + } + fn make_data_type<'s>( &mut self, writer_schema: &'s Schema<'a>, @@ -1185,6 +1217,8 @@ impl<'a> Maker<'a> { (true, Some(0)) => { let mut field = self.parse_type(&f[1], namespace)?; field.nullability = Some(Nullability::NullFirst); + #[cfg(feature = "avro_custom_types")] + Self::propagate_nullability_into_ree(&mut field, Nullability::NullFirst); return Ok(field); } (true, Some(1)) => { @@ -1196,6 +1230,8 @@ impl<'a> Maker<'a> { } let mut field = self.parse_type(&f[0], namespace)?; field.nullability = Some(Nullability::NullSecond); + #[cfg(feature = "avro_custom_types")] + Self::propagate_nullability_into_ree(&mut field, Nullability::NullSecond); return Ok(field); } _ => {} @@ -1374,6 +1410,27 @@ impl<'a> Maker<'a> { (Some("arrow.duration-seconds"), c @ Codec::Int64) => { *c = Codec::DurationSeconds } + #[cfg(feature = "avro_custom_types")] + (Some("arrow.run-end-encoded"), _) => { + let bits_u8: u8 = t + .attributes + .additional + .get("arrow.runEndIndexBits") + .and_then(|v| v.as_u64()) + .and_then(|n| u8::try_from(n).ok()) + .ok_or_else(|| ArrowError::ParseError( + "arrow.run-end-encoded requires 'arrow.runEndIndexBits' (one of 16, 32, or 64)" + .to_string(), + ))?; + if bits_u8 != 16 && bits_u8 != 32 && bits_u8 != 64 { + return Err(ArrowError::ParseError(format!( + "Invalid 'arrow.runEndIndexBits' value {bits_u8}; must be 16, 32, or 64" + ))); + } + // Wrap the parsed underlying site as REE + let values_site = field.clone(); + field.codec = Codec::RunEndEncoded(Arc::new(values_site), bits_u8); + } (Some(logical), _) => { // Insert unrecognized logical type into metadata map field.metadata.insert("logicalType".into(), logical.into()); @@ -1412,6 +1469,8 @@ impl<'a> Maker<'a> { (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => { let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?; dt.nullability = Some(w_nb); + #[cfg(feature = "avro_custom_types")] + Self::propagate_nullability_into_ree(&mut dt, w_nb); Ok(dt) } _ => self.resolve_unions(writer_variants, reader_variants, namespace), diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 99e782b0fd93..68097fa85a6a 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Avro Decoder for Arrow types. + use crate::codec::{ AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord, ResolvedUnion, @@ -33,6 +35,8 @@ use arrow_schema::{ }; #[cfg(feature = "small_decimals")] use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; +#[cfg(feature = "avro_custom_types")] +use arrow_select::take::{TakeOptions, take}; use std::cmp::Ordering; use std::sync::Arc; use strum_macros::AsRefStr; @@ -234,6 +238,8 @@ enum Decoder { Decimal64(usize, Option, Option, Decimal64Builder), Decimal128(usize, Option, Option, Decimal128Builder), Decimal256(usize, Option, Option, Decimal256Builder), + #[cfg(feature = "avro_custom_types")] + RunEndEncoded(u8, usize, Box), Union(UnionDecoder), Nullable(Nullability, NullBufferBuilder, Box, NullablePlan), } @@ -474,6 +480,23 @@ impl Decoder { "Sparse Arrow unions are not yet supported".to_string(), )); } + #[cfg(feature = "avro_custom_types")] + (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => { + let inner = Self::try_new(values_dt)?; + let byte_width: u8 = match *width_bits_or_bytes { + 2 | 4 | 8 => *width_bits_or_bytes, + 16 => 2, + 32 => 4, + 64 => 8, + other => { + return Err(ArrowError::InvalidArgumentError(format!( + "Unsupported run-end width {other} for RunEndEncoded; \ + expected 16/32/64 bits or 2/4/8 bytes" + ))); + } + }; + Self::RunEndEncoded(byte_width, 0, Box::new(inner)) + } }; Ok(match data_type.nullability() { Some(nullability) => { @@ -550,6 +573,11 @@ impl Decoder { Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), Self::Enum(indices, _, _) => indices.push(0), Self::Duration(builder) => builder.append_null(), + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(_, len, inner) => { + *len += 1; + inner.append_null()?; + } Self::Union(u) => u.append_null()?, Self::Nullable(_, null_buffer, inner, _) => { null_buffer.append(false); @@ -778,6 +806,11 @@ impl Decoder { "Default for enum must be a symbol".to_string(), )), }, + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(_, len, inner) => { + *len += 1; + inner.append_default(lit) + } Self::Union(u) => u.append_default(lit), Self::Record(field_meta, decoders, projector) => match lit { AvroLiteral::Map(entries) => { @@ -918,6 +951,11 @@ impl Decoder { let nanos = (millis as i64) * 1_000_000; builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos)); } + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(_, len, inner) => { + *len += 1; + inner.decode(buf)?; + } Self::Union(u) => u.decode(buf)?, Self::Nullable(order, nb, encoding, plan) => { match *plan { @@ -950,6 +988,12 @@ impl Decoder { buf: &mut AvroCursor<'_>, promotion: Promotion, ) -> Result<(), ArrowError> { + #[cfg(feature = "avro_custom_types")] + if let Self::RunEndEncoded(_, len, inner) = self { + *len += 1; + return inner.decode_with_promotion(buf, promotion); + } + macro_rules! promote_numeric_to { ($variant:ident, $getter:ident, $to:ty) => {{ match self { @@ -1158,6 +1202,73 @@ impl Decoder { .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(vals) } + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(width, len, inner) => { + let values = inner.flush(nulls)?; + let n = *len; + let arr = values.as_ref(); + let mut run_starts: Vec = Vec::with_capacity(n); + if n > 0 { + run_starts.push(0); + for i in 1..n { + if !values_equal_at(arr, i - 1, i) { + run_starts.push(i); + } + } + } + if n > (u32::MAX as usize) { + return Err(ArrowError::InvalidArgumentError(format!( + "RunEndEncoded length {} exceeds maximum supported by UInt32 indices for take", + n + ))); + } + let run_count = run_starts.len(); + let take_idx: PrimitiveArray = + run_starts.iter().map(|&s| s as u32).collect(); + let per_run_values = if run_count == 0 { + values.slice(0, 0) + } else { + take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| { + ArrowError::ParseError(format!("take() for REE values failed: {e}")) + })? + }; + + macro_rules! build_run_array { + ($Native:ty, $ArrowTy:ty) => {{ + let mut ends: Vec<$Native> = Vec::with_capacity(run_count); + for (idx, &_start) in run_starts.iter().enumerate() { + let end = if idx + 1 < run_count { + run_starts[idx + 1] + } else { + n + }; + ends.push(end as $Native); + } + let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect(); + let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref()) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(run_arr) as ArrayRef + }}; + } + match *width { + 2 => { + if n > i16::MAX as usize { + return Err(ArrowError::InvalidArgumentError(format!( + "RunEndEncoded length {} exceeds i16::MAX for run end width 2", + n + ))); + } + build_run_array!(i16, Int16Type) + } + 4 => build_run_array!(i32, Int32Type), + 8 => build_run_array!(i64, Int64Type), + other => { + return Err(ArrowError::InvalidArgumentError(format!( + "Unsupported run-end width {other} for RunEndEncoded" + ))); + } + } + } Self::Union(u) => u.flush(nulls)?, }) } @@ -1705,6 +1816,20 @@ fn sign_cast_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { Ok(out) } +#[cfg(feature = "avro_custom_types")] +#[inline] +fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool { + match (arr.is_null(i), arr.is_null(j)) { + (true, true) => true, + (true, false) | (false, true) => false, + (false, false) => { + let a = arr.slice(i, 1); + let b = arr.slice(j, 1); + a == b + } + } +} + #[derive(Debug)] struct Projector { writer_to_reader: Arc<[Option]>, @@ -1846,6 +1971,8 @@ enum Skipper { Struct(Vec), Union(Vec), Nullable(Nullability, Box), + #[cfg(feature = "avro_custom_types")] + RunEndEncoded(Box), } impl Skipper { @@ -1897,6 +2024,10 @@ impl Skipper { .collect::>()?, ) } + #[cfg(feature = "avro_custom_types")] + Codec::RunEndEncoded(inner, _w) => { + Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?)) + } }; if let Some(n) = dt.nullability() { base = Self::Nullable(n, Box::new(base)); @@ -2001,6 +2132,8 @@ impl Skipper { } Ok(()) } + #[cfg(feature = "avro_custom_types")] + Self::RunEndEncoded(inner) => inner.skip(buf), } } } @@ -4297,4 +4430,223 @@ mod tests { assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes"); Ok(()) } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_width16_int32_basic_grouping() { + use arrow_array::RunArray; + use std::sync::Arc; + let inner = avro_from_codec(Codec::Int32); + let ree = AvroDataType::new( + Codec::RunEndEncoded(Arc::new(inner), 16), + Default::default(), + None, + ); + let mut dec = Decoder::try_new(&ree).expect("create REE decoder"); + for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] { + let bytes = encode_avro_int(v); + dec.decode(&mut AvroCursor::new(&bytes)).expect("decode"); + } + let arr = dec.flush(None).expect("flush"); + let ra = arr + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(ra.len(), 9); + assert_eq!(ra.run_ends().values(), &[3, 5, 9]); + let vals = ra + .values() + .as_ref() + .as_any() + .downcast_ref::() + .expect("values Int32"); + assert_eq!(vals.values(), &[1, 2, 3]); + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_width32_nullable_values_group_nulls() { + use arrow_array::RunArray; + use std::sync::Arc; + let inner = AvroDataType::new( + Codec::Int32, + Default::default(), + Some(Nullability::NullSecond), + ); + let ree = AvroDataType::new( + Codec::RunEndEncoded(Arc::new(inner), 32), + Default::default(), + None, + ); + let mut dec = Decoder::try_new(&ree).expect("create REE decoder"); + let seq: [Option; 8] = [ + None, + None, + Some(7), + Some(7), + Some(7), + None, + Some(5), + Some(5), + ]; + for item in seq { + let mut bytes = Vec::new(); + match item { + None => bytes.extend_from_slice(&encode_vlq_u64(1)), + Some(v) => { + bytes.extend_from_slice(&encode_vlq_u64(0)); + bytes.extend_from_slice(&encode_avro_int(v)); + } + } + dec.decode(&mut AvroCursor::new(&bytes)).expect("decode"); + } + let arr = dec.flush(None).expect("flush"); + let ra = arr + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(ra.len(), 8); + assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]); + let vals = ra + .values() + .as_ref() + .as_any() + .downcast_ref::() + .expect("values Int32 (nullable)"); + assert_eq!(vals.len(), 4); + assert!(vals.is_null(0)); + assert_eq!(vals.value(1), 7); + assert!(vals.is_null(2)); + assert_eq!(vals.value(3), 5); + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() { + use arrow_array::RunArray; + let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY)); + let ree = Decoder::RunEndEncoded( + 8, /* bytes => Int64 run-ends */ + 0, + Box::new(inner_values), + ); + let mut dec = Decoder::Nullable( + Nullability::NullSecond, + NullBufferBuilder::new(DEFAULT_CAPACITY), + Box::new(ree), + NullablePlan::FromSingle { + promotion: Promotion::IntToDouble, + }, + ); + for v in [1, 1, 2, 2, 2] { + let bytes = encode_avro_int(v); + dec.decode(&mut AvroCursor::new(&bytes)).expect("decode"); + } + let arr = dec.flush(None).expect("flush"); + let ra = arr + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(ra.len(), 5); + assert_eq!(ra.run_ends().values(), &[2, 5]); + let vals = ra + .values() + .as_ref() + .as_any() + .downcast_ref::() + .expect("values Float64"); + assert_eq!(vals.values(), &[1.0, 2.0]); + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_unsupported_run_end_width_errors() { + use std::sync::Arc; + let inner = avro_from_codec(Codec::Int32); + let dt = AvroDataType::new( + Codec::RunEndEncoded(Arc::new(inner), 3), + Default::default(), + None, + ); + let err = Decoder::try_new(&dt).expect_err("must reject unsupported width"); + let msg = err.to_string(); + assert!( + msg.contains("Unsupported run-end width") + && msg.contains("16/32/64 bits or 2/4/8 bytes"), + "unexpected error message: {msg}" + ); + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_empty_input_is_empty_runarray() { + use arrow_array::RunArray; + use std::sync::Arc; + let inner = avro_from_codec(Codec::Utf8); + let dt = AvroDataType::new( + Codec::RunEndEncoded(Arc::new(inner), 4), + Default::default(), + None, + ); + let mut dec = Decoder::try_new(&dt).expect("create REE decoder"); + let arr = dec.flush(None).expect("flush"); + let ra = arr + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(ra.len(), 0); + assert_eq!(ra.run_ends().len(), 0); + assert_eq!(ra.values().len(), 0); + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_strings_grouping_width32_bits() { + use arrow_array::RunArray; + use std::sync::Arc; + let inner = avro_from_codec(Codec::Utf8); + let dt = AvroDataType::new( + Codec::RunEndEncoded(Arc::new(inner), 32), + Default::default(), + None, + ); + let mut dec = Decoder::try_new(&dt).expect("create REE decoder"); + for s in ["a", "a", "bb", "bb", "bb", "a"] { + let bytes = encode_avro_bytes(s.as_bytes()); + dec.decode(&mut AvroCursor::new(&bytes)).expect("decode"); + } + let arr = dec.flush(None).expect("flush"); + let ra = arr + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(ra.run_ends().values(), &[2, 5, 6]); + let vals = ra + .values() + .as_ref() + .as_any() + .downcast_ref::() + .expect("values String"); + assert_eq!(vals.len(), 3); + assert_eq!(vals.value(0), "a"); + assert_eq!(vals.value(1), "bb"); + assert_eq!(vals.value(2), "a"); + } + + #[cfg(not(feature = "avro_custom_types"))] + #[test] + fn test_no_custom_types_feature_smoke_decodes_plain_int32() { + let dt = avro_from_codec(Codec::Int32); + let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder"); + for v in [1, 2, 3] { + let bytes = encode_avro_int(v); + dec.decode(&mut AvroCursor::new(&bytes)).expect("decode"); + } + let arr = dec.flush(None).expect("flush"); + let a = arr + .as_any() + .downcast_ref::() + .expect("Int32Array"); + assert_eq!(a.values(), &[1, 2, 3]); + } } diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index b3081bbd09ab..45ad5822a814 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Avro Schema representations for Arrow. + #[cfg(feature = "canonical_extension_types")] use arrow_schema::extension::ExtensionType; use arrow_schema::{ @@ -1497,14 +1499,46 @@ fn datatype_to_avro( )? } } - DataType::RunEndEncoded(_, values) => process_datatype( - values.data_type(), - values.name(), - values.metadata(), - name_gen, - null_order, - false, - )?, + #[cfg(feature = "avro_custom_types")] + DataType::RunEndEncoded(run_ends, values) => { + let bits = match run_ends.data_type() { + DataType::Int16 => 16, + DataType::Int32 => 32, + DataType::Int64 => 64, + other => { + return Err(ArrowError::SchemaError(format!( + "RunEndEncoded requires Int16/Int32/Int64 for run_ends, found: {other:?}" + ))); + } + }; + // Build the value site schema, preserving its own nullability + let (value_schema, value_extras) = datatype_to_avro( + values.data_type(), + values.name(), + values.metadata(), + name_gen, + null_order, + )?; + let mut merged = merge_extras(value_schema, value_extras); + if values.is_nullable() { + merged = wrap_nullable(merged, null_order); + } + let mut extras = JsonMap::new(); + extras.insert("logicalType".into(), json!("arrow.run-end-encoded")); + extras.insert("arrow.runEndIndexBits".into(), json!(bits)); + return Ok((merged, extras)); + } + #[cfg(not(feature = "avro_custom_types"))] + DataType::RunEndEncoded(_run_ends, values) => { + let (value_schema, _extras) = datatype_to_avro( + values.data_type(), + values.name(), + values.metadata(), + name_gen, + null_order, + )?; + return Ok((value_schema, JsonMap::new())); + } DataType::Union(fields, mode) => { let mut branches: Vec = Vec::with_capacity(fields.len()); let mut type_ids: Vec = Vec::with_capacity(fields.len()); diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 0fa217e89488..5ddb798846c9 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -20,20 +20,22 @@ use crate::codec::{AvroDataType, AvroField, Codec}; use crate::schema::{Fingerprint, Nullability, Prefix}; use arrow_array::cast::AsArray; +use arrow_array::types::RunEndIndexType; use arrow_array::types::{ ArrowPrimitiveType, Date32Type, DurationMicrosecondType, DurationMillisecondType, - DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int32Type, Int64Type, - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, Time32MillisecondType, - Time64MicrosecondType, TimestampMicrosecondType, TimestampMillisecondType, + DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType, + TimestampMillisecondType, }; use arrow_array::{ Array, Decimal128Array, Decimal256Array, DictionaryArray, FixedSizeBinaryArray, GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, MapArray, - OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray, + OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, StructArray, UnionArray, }; #[cfg(feature = "small_decimals")] use arrow_array::{Decimal32Array, Decimal64Array}; -use arrow_buffer::NullBuffer; +use arrow_buffer::{ArrowNativeType, NullBuffer}; use arrow_schema::{ ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode, }; @@ -423,7 +425,6 @@ impl<'a> FieldEncoder<'a> { .ok_or_else(|| { ArrowError::SchemaError("Expected DictionaryArray".into()) })?; - let values = dict .values() .as_any() @@ -464,6 +465,67 @@ impl<'a> FieldEncoder<'a> { Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?)) } + FieldPlan::RunEndEncoded { + values_nullability, + value_plan, + } => { + let dt = array.data_type(); + let values_field = match dt { + DataType::RunEndEncoded(_re_field, v_field) => v_field.as_ref(), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro RunEndEncoded site requires Arrow DataType::RunEndEncoded, found: {other:?}" + ))); + } + }; + // Helper closure to build a typed RunEncodedEncoder + let build = |run_arr_any: &'a dyn Array| -> Result, ArrowError> { + if let Some(arr) = run_arr_any.as_any().downcast_ref::>() { + let values_enc = prepare_value_site_encoder( + arr.values().as_ref(), + values_field, + *values_nullability, + value_plan.as_ref(), + )?; + return Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::< + Int16Type, + >::new( + arr, values_enc + )))); + } + if let Some(arr) = run_arr_any.as_any().downcast_ref::>() { + let values_enc = prepare_value_site_encoder( + arr.values().as_ref(), + values_field, + *values_nullability, + value_plan.as_ref(), + )?; + return Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::< + Int32Type, + >::new( + arr, values_enc + )))); + } + if let Some(arr) = run_arr_any.as_any().downcast_ref::>() { + let values_enc = prepare_value_site_encoder( + arr.values().as_ref(), + values_field, + *values_nullability, + value_plan.as_ref(), + )?; + return Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::< + Int64Type, + >::new( + arr, values_enc + )))); + } + Err(ArrowError::SchemaError( + "Unsupported run-ends index type for RunEndEncoded; expected Int16/Int32/Int64" + .into(), + )) + }; + build(array)? + } }; // Compute the effective null state from writer-declared nullability and data nulls. let null_state = match (nullability, array.null_count() > 0) { @@ -545,6 +607,12 @@ enum FieldPlan { Enum { symbols: Arc<[String]> }, /// Avro union, maps to Arrow Union. Union { bindings: Vec }, + /// Avro RunEndEncoded site. Values are encoded per logical row by mapping the + /// row index to its containing run and emitting that run's value with `value_plan`. + RunEndEncoded { + values_nullability: Option, + value_plan: Box, + }, } #[derive(Debug, Clone)] @@ -638,10 +706,17 @@ impl RecordEncoder { ArrowError::SchemaError(format!("Column index {arrow_index} out of range")) })?; let field = fields[arrow_index].as_ref(); + #[cfg(not(feature = "avro_custom_types"))] + let site_nullability = match &col_plan.plan { + FieldPlan::RunEndEncoded { .. } => None, + _ => col_plan.nullability, + }; + #[cfg(feature = "avro_custom_types")] + let site_nullability = col_plan.nullability; let encoder = prepare_value_site_encoder( array.as_ref(), field, - col_plan.nullability, + site_nullability, &col_plan.plan, )?; out.push(encoder); @@ -694,6 +769,25 @@ fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option { impl FieldPlan { fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result { + #[cfg(not(feature = "avro_custom_types"))] + if let DataType::RunEndEncoded(_re_field, values_field) = arrow_field.data_type() { + let values_nullability = avro_dt.nullability(); + let value_site_dt: &AvroDataType = match avro_dt.codec() { + Codec::Union(branches, _, _) => branches + .iter() + .find(|b| !matches!(b.codec(), Codec::Null)) + .ok_or_else(|| { + ArrowError::SchemaError( + "Avro union at RunEndEncoded site has no non-null branch".into(), + ) + })?, + _ => avro_dt, + }; + return Ok(FieldPlan::RunEndEncoded { + values_nullability, + value_plan: Box::new(FieldPlan::build(value_site_dt, values_field.as_ref())?), + }); + } if let DataType::FixedSizeBinary(len) = arrow_field.data_type() { // Extension-based detection (only when the feature is enabled) let ext_is_uuid = { @@ -860,7 +954,6 @@ impl FieldPlan { ))); } }; - if avro_branches.len() != arrow_union_fields.len() { return Err(ArrowError::SchemaError(format!( "Mismatched number of branches between Avro union ({}) and Arrow union ({}) for field '{}'", @@ -869,7 +962,6 @@ impl FieldPlan { arrow_field.name() ))); } - let bindings = avro_branches .iter() .zip(arrow_union_fields.iter()) @@ -882,12 +974,26 @@ impl FieldPlan { }) }) .collect::, ArrowError>>()?; - Ok(FieldPlan::Union { bindings }) } Codec::Union(_, _, UnionMode::Sparse) => Err(ArrowError::NotYetImplemented( "Sparse Arrow unions are not yet supported".to_string(), )), + #[cfg(feature = "avro_custom_types")] + Codec::RunEndEncoded(values_dt, _width_code) => { + let values_field = match arrow_field.data_type() { + DataType::RunEndEncoded(_run_ends_field, values_field) => values_field.as_ref(), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro RunEndEncoded maps to Arrow DataType::RunEndEncoded, found: {other:?}" + ))); + } + }; + Ok(FieldPlan::RunEndEncoded { + values_nullability: values_dt.nullability(), + value_plan: Box::new(FieldPlan::build(values_dt.as_ref(), values_field)?), + }) + } _ => Ok(FieldPlan::Scalar), } } @@ -935,6 +1041,10 @@ enum Encoder<'a> { Enum(EnumEncoder<'a>), Map(Box>), Union(Box>), + /// Run-end encoded values with specific run-end index widths + RunEncoded16(Box>), + RunEncoded32(Box>), + RunEncoded64(Box>), Null, } @@ -977,6 +1087,9 @@ impl<'a> Encoder<'a> { Encoder::Map(e) => (e).encode(out, idx), Encoder::Enum(e) => (e).encode(out, idx), Encoder::Union(e) => (e).encode(out, idx), + Encoder::RunEncoded16(e) => (e).encode(out, idx), + Encoder::RunEncoded32(e) => (e).encode(out, idx), + Encoder::RunEncoded64(e) => (e).encode(out, idx), Encoder::Null => Ok(()), } } @@ -1460,6 +1573,7 @@ impl IntervalToDurationParts for IntervalDayTimeType { }) } } + /// Single generic encoder used for all three interval units. /// Writes Avro `fixed(12)` as three little-endian u32 values in one call. struct DurationEncoder<'a, P: ArrowPrimitiveType + IntervalToDurationParts>(&'a PrimitiveArray

); @@ -1554,6 +1668,72 @@ type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>; type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>; type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>; +/// Generic encoder for Arrow `RunArray`-based sites (run-end encoded). +/// Follows the pattern used by other generic encoders (i.e., `ListEncoder`), +/// avoiding runtime branching on run-end width. +struct RunEncodedEncoder<'a, R: RunEndIndexType> { + ends_slice: &'a [::Native], + base: usize, + len: usize, + values: FieldEncoder<'a>, + // Cached run index used for sequential scans of rows [0..n) + cur_run: usize, + // Cached end (logical index, 1-based per spec) for the current run. + cur_end: usize, +} + +type RunEncodedEncoder16<'a> = RunEncodedEncoder<'a, Int16Type>; +type RunEncodedEncoder32<'a> = RunEncodedEncoder<'a, Int32Type>; +type RunEncodedEncoder64<'a> = RunEncodedEncoder<'a, Int64Type>; + +impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> { + fn new(arr: &'a RunArray, values: FieldEncoder<'a>) -> Self { + let ends = arr.run_ends(); + let base = ends.get_start_physical_index(); + let slice = ends.values(); + let len = ends.len(); + let cur_end = if len == 0 { 0 } else { slice[base].as_usize() }; + Self { + ends_slice: slice, + base, + len, + values, + cur_run: 0, + cur_end, + } + } + + /// Advance `cur_run` so that `idx` is within the run ending at `cur_end`. + /// Uses the REE invariant: run ends are strictly increasing, positive, and 1-based. + #[inline(always)] + fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> { + if idx < self.cur_end { + return Ok(()); + } + // Move forward across run boundaries until idx falls within cur_end + while self.cur_run + 1 < self.len && idx >= self.cur_end { + self.cur_run += 1; + self.cur_end = self.ends_slice[self.base + self.cur_run].as_usize(); + } + if idx < self.cur_end { + Ok(()) + } else { + Err(ArrowError::InvalidArgumentError(format!( + "row index {idx} out of bounds for run-ends ({} runs)", + self.len + ))) + } + } + + #[inline(always)] + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + self.advance_to_row(idx)?; + // For REE values, the value for any logical row within a run is at + // the physical index of that run. + self.values.encode(out, self.cur_run) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index 2e89312cc240..a5dc7e5103eb 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -394,9 +394,13 @@ mod tests { use crate::reader::ReaderBuilder; use crate::schema::{AvroSchema, SchemaStore}; use crate::test_util::arrow_test_data; + #[cfg(feature = "avro_custom_types")] + use arrow_array::types::{Int16Type, Int32Type, Int64Type}; use arrow_array::{ Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StructArray, UnionArray, }; + #[cfg(feature = "avro_custom_types")] + use arrow_array::{Int16Array, Int64Array, RunArray, StringArray}; #[cfg(not(feature = "avro_custom_types"))] use arrow_schema::{DataType, Field, Schema}; #[cfg(feature = "avro_custom_types")] @@ -1417,4 +1421,407 @@ mod tests { assert_eq!(round_trip, input); Ok(()) } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> { + let run_ends = Int32Array::from(vec![3, 5, 7, 8]); + let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]); + let ree = RunArray::::try_new(&run_ends, &run_values)?; + let field = Field::new("x", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(ree.clone()) as ArrayRef], + )?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 8); + match out.schema().field(0).data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.name(), "run_ends"); + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert_eq!(values_field.name(), "values"); + assert_eq!(values_field.data_type(), &DataType::Int32); + assert!(values_field.is_nullable()); + let got_ree = out + .column(0) + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(got_ree, &ree); + } + other => panic!( + "Unexpected DataType for round-tripped RunEndEncoded column: {:?}", + other + ), + } + Ok(()) + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), ArrowError> + { + let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices + let run_values = StringArray::from(vec![Some("a"), None, Some("c")]); + let ree = RunArray::::try_new(&run_ends, &run_values)?; + let field = Field::new("s", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(ree.clone()) as ArrayRef], + )?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 7); + match out.schema().field(0).data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int16); + assert_eq!(values_field.data_type(), &DataType::Utf8); + assert!( + values_field.is_nullable(), + "REE 'values' child should be nullable" + ); + let got = out + .column(0) + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(got, &ree); + } + other => panic!("Unexpected DataType: {:?}", other), + } + Ok(()) + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() + -> Result<(), ArrowError> { + let run_ends = Int64Array::from(vec![4_i64, 8_i64]); + let run_values = Int32Array::from(vec![Some(999), Some(-5)]); + let ree = RunArray::::try_new(&run_ends, &run_values)?; + let field = Field::new("y", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(ree.clone()) as ArrayRef], + )?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 8); + match out.schema().field(0).data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int64); + assert_eq!(values_field.data_type(), &DataType::Int32); + assert!(values_field.is_nullable()); + let got = out + .column(0) + .as_any() + .downcast_ref::>() + .expect("RunArray"); + assert_eq!(got, &ree); + } + other => panic!("Unexpected DataType for REE column: {:?}", other), + } + Ok(()) + } + + #[cfg(feature = "avro_custom_types")] + #[test] + fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), ArrowError> { + let run_ends = Int32Array::from(vec![3, 5, 7, 8]); + let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]); + let base = RunArray::::try_new(&run_ends, &run_values)?; + let offset = 1usize; + let length = 6usize; + let base_values = base + .values() + .as_any() + .downcast_ref::() + .expect("REE values as Int32Array"); + let mut logical_window: Vec> = Vec::with_capacity(length); + for i in offset..offset + length { + let phys = base.get_physical_index(i); + let v = if base_values.is_null(phys) { + None + } else { + Some(base_values.value(phys)) + }; + logical_window.push(v); + } + + fn compress_run_ends_i32(vals: &[Option]) -> (Int32Array, Int32Array) { + if vals.is_empty() { + return (Int32Array::new_null(0), Int32Array::new_null(0)); + } + let mut run_ends_out: Vec = Vec::new(); + let mut run_vals_out: Vec> = Vec::new(); + let mut cur = vals[0]; + let mut len = 1i32; + for v in &vals[1..] { + if *v == cur { + len += 1; + } else { + let last_end = run_ends_out.last().copied().unwrap_or(0); + run_ends_out.push(last_end + len); + run_vals_out.push(cur); + cur = *v; + len = 1; + } + } + let last_end = run_ends_out.last().copied().unwrap_or(0); + run_ends_out.push(last_end + len); + run_vals_out.push(cur); + ( + Int32Array::from(run_ends_out), + Int32Array::from(run_vals_out), + ) + } + let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window); + let owned_slice = RunArray::::try_new(&owned_run_ends, &owned_run_values)?; + let field = Field::new("x", owned_slice.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(owned_slice.clone()) as ArrayRef], + )?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), length); + match out.schema().field(0).data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert_eq!(values_field.data_type(), &DataType::Int32); + assert!(values_field.is_nullable()); + let got = out + .column(0) + .as_any() + .downcast_ref::>() + .expect("RunArray"); + fn expand_ree_to_int32(a: &RunArray) -> Int32Array { + let vals = a + .values() + .as_any() + .downcast_ref::() + .expect("REE values as Int32Array"); + let mut out: Vec> = Vec::with_capacity(a.len()); + for i in 0..a.len() { + let phys = a.get_physical_index(i); + out.push(if vals.is_null(phys) { + None + } else { + Some(vals.value(phys)) + }); + } + Int32Array::from(out) + } + let got_logical = expand_ree_to_int32(got); + let expected_logical = Int32Array::from(logical_window); + assert_eq!( + got_logical, expected_logical, + "Logical values differ after REE slice round-trip" + ); + } + other => panic!("Unexpected DataType for REE column: {:?}", other), + } + Ok(()) + } + + #[cfg(not(feature = "avro_custom_types"))] + #[test] + fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), ArrowError> { + use arrow_schema::{DataType, Field, Schema}; + let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]); + let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]); + let ree = arrow_array::RunArray::::try_new( + &run_ends, + &run_values, + )?; + let field = Field::new("x", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 8); + assert_eq!(out.schema().field(0).data_type(), &DataType::Int32); + let got = out + .column(0) + .as_any() + .downcast_ref::() + .expect("Int32Array"); + let expected = Int32Array::from(vec![ + Some(1), + Some(1), + Some(1), + Some(2), + Some(2), + None, + None, + Some(3), + ]); + assert_eq!(got, &expected); + Ok(()) + } + + #[cfg(not(feature = "avro_custom_types"))] + #[test] + fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off() + -> Result<(), ArrowError> { + use arrow_schema::{DataType, Field, Schema}; + let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]); + let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]); + let ree = arrow_array::RunArray::::try_new( + &run_ends, + &run_values, + )?; + let field = Field::new("s", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 7); + assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8); + let got = out + .column(0) + .as_any() + .downcast_ref::() + .expect("StringArray"); + let expected = arrow_array::StringArray::from(vec![ + Some("a"), + Some("a"), + None, + None, + None, + Some("c"), + Some("c"), + ]); + assert_eq!(got, &expected); + Ok(()) + } + + #[cfg(not(feature = "avro_custom_types"))] + #[test] + fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off() + -> Result<(), ArrowError> { + use arrow_schema::{DataType, Field, Schema}; + let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]); + let run_values = Int32Array::from(vec![Some(999), Some(-5)]); + let ree = arrow_array::RunArray::::try_new( + &run_ends, + &run_values, + )?; + let field = Field::new("y", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 8); + assert_eq!(out.schema().field(0).data_type(), &DataType::Int32); + let got = out + .column(0) + .as_any() + .downcast_ref::() + .expect("Int32Array"); + let expected = Int32Array::from(vec![ + Some(999), + Some(999), + Some(999), + Some(999), + Some(-5), + Some(-5), + Some(-5), + Some(-5), + ]); + assert_eq!(got, &expected); + Ok(()) + } + + #[cfg(not(feature = "avro_custom_types"))] + #[test] + fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), ArrowError> { + use arrow_schema::{DataType, Field, Schema}; + let run_ends = Int32Array::from(vec![2, 4, 6]); + let run_values = Int32Array::from(vec![Some(1), Some(2), None]); + let ree = arrow_array::RunArray::::try_new( + &run_ends, + &run_values, + )?; + let field = Field::new("x", ree.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?; + let mut writer = AvroWriter::new(Vec::::new(), schema.clone())?; + writer.write(&batch)?; + writer.finish()?; + let bytes = writer.into_inner(); + let reader = ReaderBuilder::new().build(Cursor::new(bytes))?; + let out_schema = reader.schema(); + let batches = reader.collect::, _>>()?; + let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output"); + assert_eq!(out.num_columns(), 1); + assert_eq!(out.num_rows(), 6); + assert_eq!(out.schema().field(0).data_type(), &DataType::Int32); + let got = out + .column(0) + .as_any() + .downcast_ref::() + .expect("Int32Array"); + let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]); + assert_eq!(got, &expected); + Ok(()) + } } From 57329cc89bb79f176a22a930d38e7bb91e35825b Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:05:30 -0500 Subject: [PATCH 2/5] Update arrow-avro/src/reader/record.rs Co-authored-by: Matthijs Brobbel --- arrow-avro/src/reader/record.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 68097fa85a6a..68ba9ea0d649 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -1218,8 +1218,7 @@ impl Decoder { } if n > (u32::MAX as usize) { return Err(ArrowError::InvalidArgumentError(format!( - "RunEndEncoded length {} exceeds maximum supported by UInt32 indices for take", - n + "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take", ))); } let run_count = run_starts.len(); From ec4eb366a987b49876ec66c9b9e7b657ef5c3fe1 Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:05:36 -0500 Subject: [PATCH 3/5] Update arrow-avro/src/reader/record.rs Co-authored-by: Matthijs Brobbel --- arrow-avro/src/reader/record.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 68ba9ea0d649..7eac382d9f1b 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -1253,8 +1253,7 @@ impl Decoder { 2 => { if n > i16::MAX as usize { return Err(ArrowError::InvalidArgumentError(format!( - "RunEndEncoded length {} exceeds i16::MAX for run end width 2", - n + "RunEndEncoded length {n} exceeds i16::MAX for run end width 2" ))); } build_run_array!(i16, Int16Type) From 343813f46c17ad4014c2903b41e503a83e2f2d51 Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:05:42 -0500 Subject: [PATCH 4/5] Update arrow-avro/Cargo.toml Co-authored-by: Matthijs Brobbel --- arrow-avro/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 4a416ed38982..dc59d337a0f4 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -43,7 +43,7 @@ canonical_extension_types = ["arrow-schema/canonical_extension_types"] md5 = ["dep:md5"] sha256 = ["dep:sha2"] small_decimals = [] -avro_custom_types = ["arrow-select"] +avro_custom_types = ["dep:arrow-select"] [dependencies] arrow-schema = { workspace = true } From ade92b8e46cec41dc7f71581886163419ea9d99a Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:09:12 -0500 Subject: [PATCH 5/5] Address PR Comments --- arrow-avro/src/codec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 29464e8bf910..a6a495bdf34d 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -778,7 +778,7 @@ impl Codec { 16 => DataType::Int16, 32 => DataType::Int32, 64 => DataType::Int64, - _ => DataType::Int32, + _ => unreachable!(), }; DataType::RunEndEncoded( Arc::new(Field::new("run_ends", run_ends_dt, false)),