From 49e539e771ac70eae5e6c4e580e3213e8d6102c0 Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Thu, 12 Feb 2026 22:32:00 +0100 Subject: [PATCH 1/2] Fix `ArrowArrayStreamReader` for 0-column record batch streams --- arrow-array/src/ffi_stream.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs index c46943682914..692da0d44737 100644 --- a/arrow-array/src/ffi_stream.rs +++ b/arrow-array/src/ffi_stream.rs @@ -66,6 +66,7 @@ use std::{ use arrow_data::ffi::FFI_ArrowArray; use arrow_schema::{ArrowError, Schema, SchemaRef, ffi::FFI_ArrowSchema}; +use crate::RecordBatchOptions; use crate::array::Array; use crate::array::StructArray; use crate::ffi::from_ffi_and_data_type; @@ -365,7 +366,12 @@ impl Iterator for ArrowArrayStreamReader { from_ffi_and_data_type(array, DataType::Struct(self.schema().fields().clone())) }; Some(result.and_then(|data| { - RecordBatch::try_new(self.schema.clone(), StructArray::from(data).into_parts().1) + let len = data.len(); + RecordBatch::try_new_with_options( + self.schema.clone(), + StructArray::from(data).into_parts().1, + &RecordBatchOptions::new().with_row_count(Some(len)), + ) })) } else { let last_error = self.get_stream_last_error(); From 1f5dd795d743f10620710584924e1826d2a649ca Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Thu, 12 Feb 2026 22:32:29 +0100 Subject: [PATCH 2/2] Refactor existing `stream_round_trip_tests` and include new `[...]_no_columns` test --- arrow-array/src/ffi_stream.rs | 67 +++++++++++++++++------------------ 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs index 692da0d44737..815d7c57605d 100644 --- a/arrow-array/src/ffi_stream.rs +++ b/arrow-array/src/ffi_stream.rs @@ -425,20 +425,7 @@ mod tests { } } - fn _test_round_trip_export(arrays: Vec>) -> Result<()> { - let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); - let schema = Arc::new(Schema::new_with_metadata( - vec![ - Field::new("a", arrays[0].data_type().clone(), true) - .with_metadata(metadata.clone()), - Field::new("b", arrays[1].data_type().clone(), true) - .with_metadata(metadata.clone()), - Field::new("c", arrays[2].data_type().clone(), true) - .with_metadata(metadata.clone()), - ], - metadata, - )); - let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + fn _test_round_trip_export(batch: RecordBatch, schema: Arc) -> Result<()> { let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; let reader = TestRecordBatchReader::new(schema.clone(), iter); @@ -467,10 +454,12 @@ mod tests { } let array = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap(); + let len = array.len(); - let record_batch = RecordBatch::try_new( + let record_batch = RecordBatch::try_new_with_options( SchemaRef::from(exported_schema.clone()), StructArray::from(array).into_parts().1, + &RecordBatchOptions::new().with_row_count(Some(len)), ) .unwrap(); produced_batches.push(record_batch); @@ -481,20 +470,7 @@ mod tests { Ok(()) } - fn _test_round_trip_import(arrays: Vec>) -> Result<()> { - let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); - let schema = Arc::new(Schema::new_with_metadata( - vec![ - Field::new("a", arrays[0].data_type().clone(), true) - .with_metadata(metadata.clone()), - Field::new("b", arrays[1].data_type().clone(), true) - .with_metadata(metadata.clone()), - Field::new("c", arrays[2].data_type().clone(), true) - .with_metadata(metadata.clone()), - ], - metadata, - )); - let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + fn _test_round_trip_import(batch: RecordBatch, schema: Arc) -> Result<()> { let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; let reader = TestRecordBatchReader::new(schema.clone(), iter); @@ -517,19 +493,40 @@ mod tests { } #[test] - fn test_stream_round_trip_export() -> Result<()> { + fn test_stream_round_trip() { let array = Int32Array::from(vec![Some(2), None, Some(1), None]); let array: Arc = Arc::new(array); + let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); + + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("a", array.data_type().clone(), true).with_metadata(metadata.clone()), + Field::new("b", array.data_type().clone(), true).with_metadata(metadata.clone()), + Field::new("c", array.data_type().clone(), true).with_metadata(metadata.clone()), + ], + metadata, + )); + let batch = RecordBatch::try_new(schema.clone(), vec![array.clone(), array.clone(), array]) + .unwrap(); - _test_round_trip_export(vec![array.clone(), array.clone(), array]) + _test_round_trip_export(batch.clone(), schema.clone()).unwrap(); + _test_round_trip_import(batch, schema).unwrap(); } #[test] - fn test_stream_round_trip_import() -> Result<()> { - let array = Int32Array::from(vec![Some(2), None, Some(1), None]); - let array: Arc = Arc::new(array); + fn test_stream_round_trip_no_columns() { + let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]); + + let schema = Arc::new(Schema::new_with_metadata(Vec::::new(), metadata)); + let batch = RecordBatch::try_new_with_options( + schema.clone(), + Vec::>::new(), + &RecordBatchOptions::new().with_row_count(Some(10)), + ) + .unwrap(); - _test_round_trip_import(vec![array.clone(), array.clone(), array]) + _test_round_trip_export(batch.clone(), schema.clone()).unwrap(); + _test_round_trip_import(batch, schema).unwrap(); } #[test]