Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ harness = false

[[bench]]
name = "record_batch"
harness = false

[[bench]]
name = "list_iterator"
harness = false
30 changes: 30 additions & 0 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,36 @@ impl<OffsetSize: OffsetSizeTrait> GenericListArray<OffsetSize> {
GenericListArrayIter::<'a, OffsetSize>::new(self)
}

/// Constructs a new typed iterator that avoids `Arc<dyn Array>` allocations
/// by returning concrete array types.
///
/// This method downcasts the child values array to the specified type `ValueArray`.
/// Returns `None` if the downcast fails (i.e., the child array is not of the expected type).
///
/// # Example
/// ```
/// # use arrow_array::{ListArray, Int64Array, types::Int64Type};
/// # use arrow_array::array::typed_list_iter::GenericListTypedIter;
/// let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
/// Some(vec![Some(1), Some(2)]),
/// None,
/// Some(vec![Some(3)]),
/// ]);
///
/// let typed_iter: Option<GenericListTypedIter<i32, Int64Array>> = list_array.typed_iter();
/// let mut iter = typed_iter.unwrap();
///
/// assert!(iter.next().unwrap().is_some()); // First element
/// assert!(iter.next().unwrap().is_none()); // Null element
/// assert!(iter.next().unwrap().is_some()); // Third element
/// ```
pub fn typed_iter<ValueArray>(&self) -> Option<crate::array::typed_list_iter::GenericListTypedIter<OffsetSize, ValueArray>>
where
ValueArray: crate::array::typed_list_iter::SliceableArray + Clone + 'static,
{
crate::array::typed_list_iter::GenericListTypedIter::new(self.clone())
}

#[inline]
fn get_type(data_type: &DataType) -> Option<&DataType> {
match (OffsetSize::IS_LARGE, data_type) {
Expand Down
1 change: 1 addition & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod byte_view_array;
pub use byte_view_array::*;

mod list_view_array;
mod typed_list_iter;

pub use list_view_array::*;

Expand Down
291 changes: 291 additions & 0 deletions arrow-array/src/array/typed_list_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
use arrow_buffer::{NullBuffer, OffsetBuffer};
use crate::{Array, ArrowPrimitiveType, BooleanArray, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, GenericByteArray, GenericByteViewArray, GenericListArray, GenericListViewArray, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StructArray, UnionArray};
use crate::types::{ArrowDictionaryKeyType, ByteArrayType, ByteViewType, RunEndIndexType};

/// Arrays that can be sliced in a zero copy, zero allocation way.
pub trait SliceableArray {
fn slice(&self, offset: usize, length: usize) -> Self;
}

/// Macro to implement SliceableArray for array types that have a slice method
macro_rules! impl_sliceable {
// Pattern for types without generic parameters
($array_type:ty) => {
impl SliceableArray for $array_type {
fn slice(&self, offset: usize, length: usize) -> Self {
<$array_type>::slice(self, offset, length)
}
}
};
// Pattern for types with generic parameters: impl_sliceable!(ArrayType, TraitBound)
($array_type:ident, $($bounds:tt)+) => {
impl<T: $($bounds)+> SliceableArray for $array_type<T> {
fn slice(&self, offset: usize, length: usize) -> Self {
$array_type::slice(self, offset, length)
}
}
};
}

impl_sliceable!(BooleanArray);
impl_sliceable!(DictionaryArray, ArrowDictionaryKeyType);
impl_sliceable!(FixedSizeBinaryArray);
impl_sliceable!(FixedSizeListArray);
impl_sliceable!(GenericByteArray, ByteArrayType);
impl_sliceable!(GenericByteViewArray, ByteViewType + ?Sized);
impl_sliceable!(GenericListArray, OffsetSizeTrait);
impl_sliceable!(GenericListViewArray, OffsetSizeTrait);
impl_sliceable!(MapArray);
impl_sliceable!(NullArray);
impl_sliceable!(PrimitiveArray, ArrowPrimitiveType);
impl_sliceable!(RunArray, RunEndIndexType);
impl_sliceable!(StructArray);
impl_sliceable!(UnionArray);

/// A typed iterator on a GenericListArray.
///
/// Downcasting at iterator creation time allows to avoid allocations during the iteration, since
/// we can now directly return the target type, instead of having to create
/// Arc<Array> that require allocations. This version should be both more ergonomic and more
/// efficient than the standard ArrayIter for GenericListArrays for supported types.
pub struct GenericListTypedIter<OffsetSize: OffsetSizeTrait, ValueArray: SliceableArray> {
nulls: Option<NullBuffer>,
values: ValueArray,
value_offsets: OffsetBuffer<OffsetSize>,
current: usize,
}

impl<OffsetSize: OffsetSizeTrait, ValueArray: SliceableArray + Clone + 'static> GenericListTypedIter<OffsetSize, ValueArray> {
pub fn new(list: GenericListArray<OffsetSize>) -> Option<Self> {
let nulls = list.nulls().cloned();
let values = list.values().as_any().downcast_ref::<ValueArray>()?.clone();
let value_offsets = list.offsets().clone();
Some(Self {
nulls,
values,
value_offsets,
current: 0,
})
}
}

impl<OffsetSize: OffsetSizeTrait, ValueArray: SliceableArray> Iterator for GenericListTypedIter<OffsetSize, ValueArray> {
type Item = Option<ValueArray>;

fn next(&mut self) -> Option<Self::Item> {
// Check if we've reached the end
if self.current >= self.value_offsets.len() - 1 {
return None;
}

// Check if current row is null
let is_null = self.nulls.as_ref().map_or(false, |n| n.is_null(self.current));

let result = if is_null {
Some(None)
} else {
// Get start and end offsets for this list element
let start = self.value_offsets[self.current].as_usize();
let end = self.value_offsets[self.current + 1].as_usize();

// Slice the values array - this is zero-copy
Some(Some(self.values.slice(start, end - start)))
};

self.current += 1;
result
}

fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.value_offsets.len() - 1 - self.current;
(remaining, Some(remaining))
}
}

impl<OffsetSize: OffsetSizeTrait, ValueArray: SliceableArray> ExactSizeIterator for GenericListTypedIter<OffsetSize, ValueArray> {
fn len(&self) -> usize {
self.value_offsets.len() - 1 - self.current
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{Int32Array, Int64Array, ListArray, StringArray, types::{Int32Type, Int64Type}};

#[test]
fn test_primitive_array_no_nulls() {
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
Some(vec![Some(4)]),
Some(vec![]),
]);

let typed_iter: Option<GenericListTypedIter<i32, Int64Array>> = list_array.typed_iter();
let mut iter = typed_iter.unwrap();

// First element
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr.value(0), 1);
assert_eq!(arr.value(1), 2);
assert_eq!(arr.value(2), 3);

// Second element
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr.value(0), 4);

// Third element (empty list)
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 0);

// No more elements
assert!(iter.next().is_none());
}

#[test]
fn test_primitive_array_with_nulls() {
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
None,
Some(vec![Some(3)]),
None,
]);

let typed_iter: Option<GenericListTypedIter<i32, Int64Array>> = list_array.typed_iter();
let mut iter = typed_iter.unwrap();

// First element
assert!(iter.next().unwrap().is_some());

// Null element
assert!(iter.next().unwrap().is_none());

// Third element
assert!(iter.next().unwrap().is_some());

// Another null element
assert!(iter.next().unwrap().is_none());

// No more elements
assert!(iter.next().is_none());
}

#[test]
fn test_string_array() {
let list_array = ListArray::new(
arrow_schema::Field::new("item", arrow_schema::DataType::Utf8, true).into(),
arrow_buffer::OffsetBuffer::from_lengths([2, 1, 3]),
std::sync::Arc::new(StringArray::from(vec![
Some("a"), Some("b"), // First list
Some("c"), // Second list
Some("d"), Some("e"), Some("f"), // Third list
])),
None,
);

let typed_iter: Option<GenericListTypedIter<i32, StringArray>> = list_array.typed_iter();
let mut iter = typed_iter.unwrap();

// First element
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr.value(0), "a");
assert_eq!(arr.value(1), "b");

// Second element
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr.value(0), "c");

// Third element
let arr = iter.next().unwrap().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr.value(0), "d");
assert_eq!(arr.value(1), "e");
assert_eq!(arr.value(2), "f");

// No more elements
assert!(iter.next().is_none());
}

#[test]
fn test_wrong_type_returns_none() {
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
]);

// Try to create iterator with wrong type - should return None
let typed_iter: Option<GenericListTypedIter<i32, Int32Array>> = list_array.typed_iter();
assert!(typed_iter.is_none());
}

#[test]
fn test_iterator_size_hint() {
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
Some(vec![Some(2)]),
Some(vec![Some(3)]),
]);

let typed_iter: Option<GenericListTypedIter<i32, Int64Array>> = list_array.typed_iter();
let iter = typed_iter.unwrap();

assert_eq!(iter.size_hint(), (3, Some(3)));
assert_eq!(iter.len(), 3);
}

#[test]
fn test_iterator_with_enumerate() {
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(10)]),
None,
Some(vec![Some(20), Some(30)]),
]);

let typed_iter: Option<GenericListTypedIter<i32, Int64Array>> = list_array.typed_iter();
let iter = typed_iter.unwrap();

for (idx, arr) in iter.enumerate() {
match idx {
0 => {
let a = arr.unwrap();
assert_eq!(a.len(), 1);
assert_eq!(a.value(0), 10);
}
1 => assert!(arr.is_none()),
2 => {
let a = arr.unwrap();
assert_eq!(a.len(), 2);
assert_eq!(a.value(0), 20);
assert_eq!(a.value(1), 30);
}
_ => panic!("Unexpected index"),
}
}
}

#[test]
fn test_iterator_with_zip() {
let list1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![Some(3)]),
]);
let list2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10)]),
Some(vec![Some(20), Some(30)]),
]);

let iter1: GenericListTypedIter<i32, Int32Array> = list1.typed_iter().unwrap();
let iter2: GenericListTypedIter<i32, Int32Array> = list2.typed_iter().unwrap();

for (arr1, arr2) in iter1.zip(iter2) {
let a1 = arr1.unwrap();
let a2 = arr2.unwrap();
// Just verify they're not empty
assert!(a1.len() > 0);
assert!(a2.len() > 0);
}
}
}
14 changes: 14 additions & 0 deletions arrow/benches/array_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,20 @@ fn add_benchmark(c: &mut Criterion) {
// Must use black_box here as this can be optimized away
|_item| hint::black_box(false),
);

benchmark_array_iter(
c,
"int list array with len 16",
&create_primitive_list_array_with_seed::<i64, Int64Type>(BATCH_SIZE, 0.0, 0.0, 16, 0),
&create_primitive_list_array_with_seed::<i64, Int64Type>(BATCH_SIZE, 0.5, 0.0, 16, 0),
// fold init
0_usize,
// fold function
|acc, item| acc.wrapping_add(item.map(|item| item.len()).unwrap_or_default()),
// predicate that will always evaluate to false while allowing us to avoid using hint::black_box and let the compiler optimize more
|item| item.is_some_and(|item| item.len() > 100),
);

}

criterion_group!(benches, add_benchmark);
Expand Down