From 37b46be73ec402e1e46a45245dae8a2c290496ae Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Wed, 5 Nov 2025 22:14:01 +0100 Subject: [PATCH 1/3] Implement a `Vec` wrapper for `pyarrow.Table` convenience CQ fixes CQ fix CQ fix Let `Table` be a combination of `Vec` and `SchemaRef` instead `cargo fmt` Overhauled `Table` definition, Added tests Add empty `Table` integration test Update `arrow-pyarrow`'s crate documentation Overhaul documentation even more Typo fix --- arrow-pyarrow-integration-testing/src/lib.rs | 26 ++- .../tests/test_sql.py | 74 +++++++++ arrow-pyarrow/src/lib.rs | 150 +++++++++++++++++- 3 files changed, 241 insertions(+), 9 deletions(-) diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index 7d5d63c1d50d..e276f91c050a 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -32,7 +32,7 @@ use arrow::compute::kernels; use arrow::datatypes::{DataType, Field, Schema}; use arrow::error::ArrowError; use arrow::ffi_stream::ArrowArrayStreamReader; -use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, ToPyArrow}; +use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, Table, ToPyArrow}; use arrow::record_batch::RecordBatch; fn to_py_err(err: ArrowError) -> PyErr { @@ -140,6 +140,28 @@ fn round_trip_record_batch_reader( Ok(obj) } +#[pyfunction] +fn round_trip_table(obj: PyArrowType) -> PyResult> { + Ok(obj) +} + +/// Function for testing whether a `Vec` is exportable as `pyarrow.Table`, with or +/// without explicitly providing a schema +#[pyfunction] +#[pyo3(signature = (record_batches, *, schema=None))] +pub fn build_table( + record_batches: Vec>, + schema: Option>, +) -> PyResult> { + Ok(PyArrowType( + Table::try_new( + record_batches.into_iter().map(|rb| rb.0).collect(), + schema.map(|s| Arc::new(s.0)), + ) + .map_err(to_py_err)?, + )) +} + #[pyfunction] fn reader_return_errors(obj: PyArrowType) -> PyResult<()> { // This makes sure we can correctly consume a RBR and return the error, @@ -178,6 +200,8 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: &Bound) -> PyResu m.add_wrapped(wrap_pyfunction!(round_trip_array))?; m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?; m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?; + m.add_wrapped(wrap_pyfunction!(round_trip_table))?; + m.add_wrapped(wrap_pyfunction!(build_table))?; m.add_wrapped(wrap_pyfunction!(reader_return_errors))?; m.add_wrapped(wrap_pyfunction!(boxed_reader_roundtrip))?; Ok(()) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 3b46d5729a1f..1970d4e170aa 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -613,6 +613,80 @@ def test_table_pycapsule(): assert len(table.to_batches()) == len(new_table.to_batches()) +def test_table_empty(): + """ + Python -> Rust -> Python + """ + schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'}) + table = pa.Table.from_batches([], schema=schema) + new_table = rust.build_table([], schema=schema) + + assert table.schema == new_table.schema + assert table == new_table + assert len(table.to_batches()) == len(new_table.to_batches()) + + +def test_table_roundtrip(): + """ + Python -> Rust -> Python + """ + schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'}) + batches = [ + pa.record_batch([[[1], [2, 42]]], schema), + pa.record_batch([[None, [], [5, 6]]], schema), + ] + table = pa.Table.from_batches(batches) + new_table = rust.round_trip_table(table) + + assert table.schema == new_table.schema + assert table == new_table + assert len(table.to_batches()) == len(new_table.to_batches()) + + +@pytest.mark.parametrize("set_schema", (True, False)) +def test_table_from_batches(set_schema: bool): + """ + Python -> Rust -> Python + """ + schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'}) + batches = [ + pa.record_batch([[[1], [2, 42]]], schema), + pa.record_batch([[None, [], [5, 6]]], schema), + ] + table = pa.Table.from_batches(batches) + new_table = rust.build_table(batches, schema=schema if set_schema else None) + + assert table.schema == new_table.schema + assert table == new_table + assert len(table.to_batches()) == len(new_table.to_batches()) + + +def test_table_error_inconsistent_schema(): + """ + Python -> Rust -> Python + """ + schema_1 = pa.schema([('ints', pa.list_(pa.int32()))]) + schema_2 = pa.schema([('floats', pa.list_(pa.float32()))]) + batches = [ + pa.record_batch([[[1], [2, 42]]], schema_1), + pa.record_batch([[None, [], [5.6, 6.4]]], schema_2), + ] + with pytest.raises(pa.ArrowException, match="Schema error: All record batches must have the same schema."): + rust.build_table(batches) + + +def test_table_error_no_schema(): + """ + Python -> Rust -> Python + """ + batches = [] + with pytest.raises( + pa.ArrowException, + match="Schema error: If no schema is supplied explicitly, there must be at least one RecordBatch!" + ): + rust.build_table(batches) + + def test_reject_other_classes(): # Arbitrary type that is not a PyArrow type not_pyarrow = ["hello"] diff --git a/arrow-pyarrow/src/lib.rs b/arrow-pyarrow/src/lib.rs index d4bbb201f027..647e4a069716 100644 --- a/arrow-pyarrow/src/lib.rs +++ b/arrow-pyarrow/src/lib.rs @@ -44,17 +44,20 @@ //! | `pyarrow.Array` | [ArrayData] | //! | `pyarrow.RecordBatch` | [RecordBatch] | //! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box` (1) | +//! | `pyarrow.Table` | [Table] (2) | //! //! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either //! [ArrowArrayStreamReader] or `Box` can be exported //! as `pyarrow.RecordBatchReader`. (`Box` is typically //! easier to create.) //! -//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't -//! have these same concepts. A chunked table is instead represented with -//! `Vec`. A `pyarrow.Table` can be imported to Rust by calling -//! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader) -//! and then importing the reader as a [ArrowArrayStreamReader]. +//! (2) Although arrow-rs offers a [pyarrow.Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table) +//! convenience wrapper [Table] (which internally holds `Vec`), this is more meant for +//! use cases where you already have `Vec` on the Rust side and want to export that in +//! bulk as a `pyarrow.Table`. In general, it is recommended to use streaming approaches instead of +//! dealing with bulk data. +//! For example, a `pyarrow.Table` can be imported to Rust through `PyArrowType` +//! instead (since `pyarrow.Table` implements the ArrayStream PyCapsule interface). use std::convert::{From, TryFrom}; use std::ptr::{addr_of, addr_of_mut}; @@ -68,13 +71,13 @@ use arrow_array::{ make_array, }; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType, Field, Schema}; +use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::ffi::Py_uintptr_t; -use pyo3::import_exception; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; -use pyo3::types::{PyCapsule, PyList, PyTuple}; +use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple}; +use pyo3::{import_exception, intern}; import_exception!(pyarrow, ArrowException); /// Represents an exception raised by PyArrow. @@ -484,6 +487,137 @@ impl IntoPyArrow for ArrowArrayStreamReader { } } +/// This is a convenience wrapper around `Vec` that tries to simplify conversion from +/// and to `pyarrow.Table`. +/// +/// This could be used in circumstances where you either want to consume a `pyarrow.Table` directly +/// (although technically, since `pyarrow.Table` implements the ArrayStreamReader PyCapsule +/// interface, one could also consume a `PyArrowType` instead) or, more +/// importantly, where one wants to export a `pyarrow.Table` from a `Vec` from the Rust +/// side. +/// +/// ```ignore +/// #[pyfunction] +/// fn return_table(...) -> PyResult> { +/// let batches: Vec; +/// let schema: SchemaRef; +/// PyArrowType(Table::try_new(batches, schema).map_err(|err| err.into_py_err(py))?) +/// } +/// ``` +#[derive(Clone)] +pub struct Table { + record_batches: Vec, + schema: SchemaRef, +} + +impl Table { + pub unsafe fn new_unchecked(record_batches: Vec, schema: SchemaRef) -> Self { + Self { + record_batches, + schema, + } + } + + pub fn try_new( + record_batches: Vec, + schema: Option, + ) -> Result { + let schema = match schema { + Some(s) => s, + None => { + record_batches + .get(0) + .ok_or_else(|| ArrowError::SchemaError( + "If no schema is supplied explicitly, there must be at least one RecordBatch!".to_owned() + ))? + .schema() + .clone() + } + }; + for record_batch in &record_batches { + if schema != record_batch.schema() { + return Err(ArrowError::SchemaError( + "All record batches must have the same schema.".to_owned(), + )); + } + } + Ok(Self { + record_batches, + schema, + }) + } + + pub fn record_batches(&self) -> &[RecordBatch] { + &self.record_batches + } + + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + pub fn into_inner(self) -> (Vec, SchemaRef) { + (self.record_batches, self.schema) + } +} + +impl TryFrom for Table { + type Error = ArrowError; + + fn try_from(value: ArrowArrayStreamReader) -> Result { + let schema = value.schema(); + let batches = value.collect::, _>>()?; + // We assume all batches have the same schema here. + unsafe { Ok(Self::new_unchecked(batches, schema)) } + } +} + +impl FromPyArrow for Table { + fn from_pyarrow_bound(ob: &Bound) -> PyResult { + let array_stream_reader: PyResult = { + // First, try whether the object implements the Arrow ArrayStreamReader protocol directly + // (which `pyarrow.Table` does) or test whether it is a RecordBatchReader. + let reader_result = if let Ok(reader) = ArrowArrayStreamReader::from_pyarrow_bound(ob) { + Some(reader) + } + // If that is not the case, test whether it has a `to_reader` method (which + // `pyarrow.Table` does) whose return value implements the Arrow ArrayStreamReader + // protocol or is a RecordBatchReader. + else if ob.hasattr(intern!(ob.py(), "to_reader"))? { + let py_reader = ob.getattr(intern!(ob.py(), "to_reader"))?.call0()?; + ArrowArrayStreamReader::from_pyarrow_bound(&py_reader).ok() + } else { + None + }; + + match reader_result { + Some(reader) => Ok(reader), + None => Err(PyTypeError::new_err( + "Expected Arrow Table, Arrow RecordBatchReader or other object which conforms to the Arrow ArrayStreamReader protocol.", + )), + } + }; + Self::try_from(array_stream_reader?) + .map_err(|err| PyErr::new::(err.to_string())) + } +} + +impl IntoPyArrow for Table { + fn into_pyarrow(self, py: Python) -> PyResult> { + let module = py.import(intern!(py, "pyarrow"))?; + let class = module.getattr(intern!(py, "Table"))?; + + let py_batches = PyList::new(py, self.record_batches.into_iter().map(PyArrowType))?; + let py_schema = PyArrowType(Arc::unwrap_or_clone(self.schema)); + + let kwargs = PyDict::new(py); + kwargs.set_item("schema", py_schema)?; + + let reader = class.call_method("from_batches", (py_batches,), Some(&kwargs))?; + + Ok(reader) + } +} + /// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`]. /// /// When wrapped around a type `T: FromPyArrow`, it From 7695898311d156444bddc5cec0ce8b34bca12384 Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Fri, 7 Nov 2025 21:23:26 +0100 Subject: [PATCH 2/3] Solve review remarks; current status still broken --- arrow-pyarrow-integration-testing/src/lib.rs | 7 +- .../tests/test_sql.py | 27 +++---- arrow-pyarrow/src/lib.rs | 80 ++++++------------- 3 files changed, 34 insertions(+), 80 deletions(-) diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index e276f91c050a..365850e39454 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -145,18 +145,15 @@ fn round_trip_table(obj: PyArrowType
) -> PyResult> { Ok(obj) } -/// Function for testing whether a `Vec` is exportable as `pyarrow.Table`, with or -/// without explicitly providing a schema #[pyfunction] -#[pyo3(signature = (record_batches, *, schema=None))] pub fn build_table( record_batches: Vec>, - schema: Option>, + schema: PyArrowType, ) -> PyResult> { Ok(PyArrowType( Table::try_new( record_batches.into_iter().map(|rb| rb.0).collect(), - schema.map(|s| Arc::new(s.0)), + Arc::new(schema.0), ) .map_err(to_py_err)?, )) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 1970d4e170aa..52821d381cba 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -630,12 +630,16 @@ def test_table_roundtrip(): """ Python -> Rust -> Python """ - schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'}) + metadata = {b'key1': b'value1'} + schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata=metadata) batches = [ pa.record_batch([[[1], [2, 42]]], schema), pa.record_batch([[None, [], [5, 6]]], schema), ] - table = pa.Table.from_batches(batches) + table = pa.Table.from_batches(batches, schema=schema) + # TODO: Remove these `assert`s as soon as the metadata issue is solved in Rust + assert table.schema.metadata == metadata + assert all(batch.schema.metadata == metadata for batch in table.to_batches()) new_table = rust.round_trip_table(table) assert table.schema == new_table.schema @@ -643,8 +647,7 @@ def test_table_roundtrip(): assert len(table.to_batches()) == len(new_table.to_batches()) -@pytest.mark.parametrize("set_schema", (True, False)) -def test_table_from_batches(set_schema: bool): +def test_table_from_batches(): """ Python -> Rust -> Python """ @@ -654,7 +657,7 @@ def test_table_from_batches(set_schema: bool): pa.record_batch([[None, [], [5, 6]]], schema), ] table = pa.Table.from_batches(batches) - new_table = rust.build_table(batches, schema=schema if set_schema else None) + new_table = rust.build_table(batches, schema) assert table.schema == new_table.schema assert table == new_table @@ -672,19 +675,7 @@ def test_table_error_inconsistent_schema(): pa.record_batch([[None, [], [5.6, 6.4]]], schema_2), ] with pytest.raises(pa.ArrowException, match="Schema error: All record batches must have the same schema."): - rust.build_table(batches) - - -def test_table_error_no_schema(): - """ - Python -> Rust -> Python - """ - batches = [] - with pytest.raises( - pa.ArrowException, - match="Schema error: If no schema is supplied explicitly, there must be at least one RecordBatch!" - ): - rust.build_table(batches) + rust.build_table(batches, schema_1) def test_reject_other_classes(): diff --git a/arrow-pyarrow/src/lib.rs b/arrow-pyarrow/src/lib.rs index 647e4a069716..e3ed8dd3d386 100644 --- a/arrow-pyarrow/src/lib.rs +++ b/arrow-pyarrow/src/lib.rs @@ -51,13 +51,13 @@ //! as `pyarrow.RecordBatchReader`. (`Box` is typically //! easier to create.) //! -//! (2) Although arrow-rs offers a [pyarrow.Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table) -//! convenience wrapper [Table] (which internally holds `Vec`), this is more meant for -//! use cases where you already have `Vec` on the Rust side and want to export that in -//! bulk as a `pyarrow.Table`. In general, it is recommended to use streaming approaches instead of -//! dealing with bulk data. -//! For example, a `pyarrow.Table` can be imported to Rust through `PyArrowType` -//! instead (since `pyarrow.Table` implements the ArrayStream PyCapsule interface). +//! (2) Although arrow-rs offers [Table], a convenience wrapper for [pyarrow.Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table) +//! that internally holds `Vec`, it is meant primarily for use cases where you already +//! have `Vec` on the Rust side and want to export that in bulk as a `pyarrow.Table`. +//! In general, it is recommended to use streaming approaches instead of dealing with data in bulk. +//! For example, a `pyarrow.Table` (or any other object that implements the ArrayStream PyCapsule +//! interface) can be imported to Rust through `PyArrowType>` instead of +//! forcing eager reading into `Vec`. use std::convert::{From, TryFrom}; use std::ptr::{addr_of, addr_of_mut}; @@ -511,33 +511,20 @@ pub struct Table { } impl Table { - pub unsafe fn new_unchecked(record_batches: Vec, schema: SchemaRef) -> Self { - Self { - record_batches, - schema, - } - } - pub fn try_new( record_batches: Vec, - schema: Option, + schema: SchemaRef, ) -> Result { - let schema = match schema { - Some(s) => s, - None => { - record_batches - .get(0) - .ok_or_else(|| ArrowError::SchemaError( - "If no schema is supplied explicitly, there must be at least one RecordBatch!".to_owned() - ))? - .schema() - .clone() - } - }; for record_batch in &record_batches { if schema != record_batch.schema() { return Err(ArrowError::SchemaError( - "All record batches must have the same schema.".to_owned(), + //"All record batches must have the same schema.".to_owned(), + format!( + "All record batches must have the same schema. \ + Expected schema: {:?}, got schema: {:?}", + schema, + record_batch.schema() + ), )); } } @@ -560,47 +547,26 @@ impl Table { } } -impl TryFrom for Table { +impl TryFrom> for Table { type Error = ArrowError; - fn try_from(value: ArrowArrayStreamReader) -> Result { + fn try_from(value: Box) -> Result { let schema = value.schema(); let batches = value.collect::, _>>()?; - // We assume all batches have the same schema here. - unsafe { Ok(Self::new_unchecked(batches, schema)) } + Self::try_new(batches, schema) } } +/// Convert a `pyarrow.Table` (or any other ArrowArrayStream compliant object) into [`Table`] impl FromPyArrow for Table { fn from_pyarrow_bound(ob: &Bound) -> PyResult { - let array_stream_reader: PyResult = { - // First, try whether the object implements the Arrow ArrayStreamReader protocol directly - // (which `pyarrow.Table` does) or test whether it is a RecordBatchReader. - let reader_result = if let Ok(reader) = ArrowArrayStreamReader::from_pyarrow_bound(ob) { - Some(reader) - } - // If that is not the case, test whether it has a `to_reader` method (which - // `pyarrow.Table` does) whose return value implements the Arrow ArrayStreamReader - // protocol or is a RecordBatchReader. - else if ob.hasattr(intern!(ob.py(), "to_reader"))? { - let py_reader = ob.getattr(intern!(ob.py(), "to_reader"))?.call0()?; - ArrowArrayStreamReader::from_pyarrow_bound(&py_reader).ok() - } else { - None - }; - - match reader_result { - Some(reader) => Ok(reader), - None => Err(PyTypeError::new_err( - "Expected Arrow Table, Arrow RecordBatchReader or other object which conforms to the Arrow ArrayStreamReader protocol.", - )), - } - }; - Self::try_from(array_stream_reader?) - .map_err(|err| PyErr::new::(err.to_string())) + let reader: Box = + Box::new(ArrowArrayStreamReader::from_pyarrow_bound(ob)?); + Self::try_from(reader).map_err(|err| PyErr::new::(err.to_string())) } } +/// Convert a [`Table`] into `pyarrow.Table`. impl IntoPyArrow for Table { fn into_pyarrow(self, py: Python) -> PyResult> { let module = py.import(intern!(py, "pyarrow"))?; From af2156d12490a972c74aac68a72ac598b4d2df1b Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Fri, 7 Nov 2025 21:54:36 +0100 Subject: [PATCH 3/3] Use `pyo3_arrow`'s `schema_equals` function for now --- arrow-pyarrow/src/lib.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/arrow-pyarrow/src/lib.rs b/arrow-pyarrow/src/lib.rs index e3ed8dd3d386..f21541755cf4 100644 --- a/arrow-pyarrow/src/lib.rs +++ b/arrow-pyarrow/src/lib.rs @@ -515,8 +515,25 @@ impl Table { record_batches: Vec, schema: SchemaRef, ) -> Result { + /// This function was copied from `pyo3_arrow/utils.rs` for now. I don't understand yet why + /// this is required instead of a "normal" `schema == record_batch.schema()` check. + /// + /// TODO: Either remove this check, replace it with something already existing in `arrow-rs` + /// or move it to a central `utils` location. + fn schema_equals(left: &SchemaRef, right: &SchemaRef) -> bool { + left.fields + .iter() + .zip(right.fields.iter()) + .all(|(left_field, right_field)| { + left_field.name() == right_field.name() + && left_field + .data_type() + .equals_datatype(right_field.data_type()) + }) + } + for record_batch in &record_batches { - if schema != record_batch.schema() { + if !schema_equals(&schema, &record_batch.schema()) { return Err(ArrowError::SchemaError( //"All record batches must have the same schema.".to_owned(), format!(