From ce72c5e501661325acfc2a1f38a93cda3b92bc57 Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Wed, 14 Jan 2026 01:48:08 -0600 Subject: [PATCH 1/4] Add row-by-row encoding support and tests to arrow-avro - Introduced `RecordEncoder::encode_rows` to buffer encoded rows as contiguous slices with per-row offsets using `BytesMut`. - Added `Encoder` for row-by-row Avro encoding, including zero-copy `Bytes` row access via `EncodedRows`. - Integrated `bytes` crate for efficient encoding operations. - Updated writer API to offer `build_encoder` for stream formats (e.g., SOE) alongside row-capacity configuration support. - Adjusted docs to highlight new encoder capabilities. - Comprehensive tests added to validate single/multi-column, nullable, prefix-based, and empty batch encoding scenarios. --- arrow-avro/Cargo.toml | 4 +- arrow-avro/src/writer/encoder.rs | 288 +++++++++++ arrow-avro/src/writer/format.rs | 34 ++ arrow-avro/src/writer/mod.rs | 849 +++++++++++++++++++++++++++++-- 4 files changed, 1126 insertions(+), 49 deletions(-) diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 48cea8467eb7..edcce33ac71f 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -66,6 +66,7 @@ indexmap = "2.10" rand = "0.9" md5 = { version = "0.8", optional = true } sha2 = { version = "0.10", optional = true } +bytes = "1.11" [dev-dependencies] arrow-data = { workspace = true } @@ -76,9 +77,8 @@ rand = { version = "0.9.1", default-features = false, features = [ ] } criterion = { workspace = true, default-features = false } tempfile = "3.3" -arrow = { workspace = true } +arrow = { workspace = true, features = ["prettyprint"] } futures = "0.3.31" -bytes = "1.10.1" async-stream = "0.3.6" apache-avro = "0.21.0" num-bigint = "0.4" diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index ef9e02c8faf1..5b306959bed5 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -43,6 +43,7 @@ use arrow_buffer::{ArrowNativeType, NullBuffer}; use arrow_schema::{ ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode, }; +use bytes::{BufMut, BytesMut}; use std::io::Write; use std::sync::Arc; use uuid::Uuid; @@ -826,6 +827,69 @@ impl RecordEncoder { } Ok(()) } + + /// Encode rows into a single contiguous `BytesMut` and append row-end offsets. + /// + /// # Invariants + /// + /// * `offsets` must be non-empty and seeded with `0` at index 0. + /// * `offsets.last()` must equal `out.len()` on entry. + /// * On success, exactly `batch.num_rows()` additional offsets are pushed, and + /// `offsets.last()` equals the new `out.len()`. + pub(crate) fn encode_rows( + &self, + batch: &RecordBatch, + row_capacity: usize, + out: &mut BytesMut, + offsets: &mut Vec, + ) -> Result<(), ArrowError> { + // Validate invariants once per call (cheap vs. per-row allocations). + if offsets.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "encode_rows requires offsets to be seeded with a 0 sentinel".to_string(), + )); + } + if offsets[0] != 0 { + return Err(ArrowError::InvalidArgumentError( + "encode_rows requires offsets[0] == 0".to_string(), + )); + } + let expected_last = out.len() as u64; + if *offsets.last().unwrap() != expected_last { + return Err(ArrowError::InvalidArgumentError(format!( + "encode_rows requires offsets.last() == out.len() ({} != {})", + offsets.last().unwrap(), + expected_last + ))); + } + let mut column_encoders = self.prepare_for_batch(batch)?; + let n = batch.num_rows(); + offsets.reserve(n); + out.reserve(n.saturating_mul(row_capacity)); + let mut w = out.writer(); + match &self.prefix { + Some(prefix) => { + let prefix_bytes = prefix.as_slice(); + for row in 0..n { + w.write_all(prefix_bytes) + .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?; + for enc in column_encoders.iter_mut() { + enc.encode(&mut w, row)?; + } + offsets.push((*w.get_ref()).len() as u64); + } + } + None => { + for row in 0..n { + for enc in column_encoders.iter_mut() { + enc.encode(&mut w, row)?; + } + offsets.push((*w.get_ref()).len() as u64); + } + } + } + Ok(()) + } } fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option { @@ -1977,6 +2041,12 @@ mod tests { } } + fn row_slice<'a>(buf: &'a [u8], offsets: &[u64], row: usize) -> &'a [u8] { + let start = offsets[row] as usize; + let end = offsets[row + 1] as usize; + &buf[start..end] + } + #[test] fn binary_encoder() { let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"]; @@ -3045,4 +3115,222 @@ mod tests { other => panic!("expected NullableNoNulls, got {other:?}"), } } + + #[test] + fn encode_rows_single_column_int32() { + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]); + let arr = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let encoder = RecordEncoder { + columns: vec![FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }], + prefix: None, + }; + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 16, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 4); + assert_eq!(*offsets.last().unwrap(), out.len() as u64); + assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1)); + assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2)); + assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3)); + } + + #[test] + fn encode_rows_multiple_columns() { + let schema = ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + let int_arr = Int32Array::from(vec![10, 20]); + let str_arr = StringArray::from(vec!["hello", "world"]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(int_arr), Arc::new(str_arr)], + ) + .unwrap(); + let encoder = RecordEncoder { + columns: vec![ + FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }, + FieldBinding { + arrow_index: 1, + nullability: None, + plan: FieldPlan::Scalar, + }, + ], + prefix: None, + }; + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 32, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 3); + assert_eq!(*offsets.last().unwrap(), out.len() as u64); + let mut expected_row0 = Vec::new(); + expected_row0.extend(avro_long_bytes(10)); + expected_row0.extend(avro_len_prefixed_bytes(b"hello")); + assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0); + let mut expected_row1 = Vec::new(); + expected_row1.extend(avro_long_bytes(20)); + expected_row1.extend(avro_len_prefixed_bytes(b"world")); + assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1); + } + + #[test] + fn encode_rows_with_prefix() { + use crate::codec::AvroFieldBuilder; + use crate::schema::AvroSchema; + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]); + let arr = Int32Array::from(vec![42]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let avro_schema = AvroSchema::try_from(&schema).unwrap(); + let fingerprint = avro_schema + .fingerprint(crate::schema::FingerprintAlgorithm::Rabin) + .unwrap(); + let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap()) + .build() + .unwrap(); + let encoder = RecordEncoderBuilder::new(&avro_root, &schema) + .with_fingerprint(Some(fingerprint)) + .build() + .unwrap(); + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 32, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 2); + let row0 = row_slice(&out, &offsets, 0); + assert!(row0.len() > 10, "Row should contain prefix + encoded value"); + assert_eq!(row0[0], 0xC3); + assert_eq!(row0[1], 0x01); + } + + #[test] + fn encode_rows_empty_batch() { + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]); + let arr = Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let encoder = RecordEncoder { + columns: vec![FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }], + prefix: None, + }; + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 16, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets, vec![0u64]); + assert!(out.is_empty()); + } + + #[test] + fn encode_rows_matches_encode_output() { + let schema = ArrowSchema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Float64, false), + ]); + let int_arr = Int64Array::from(vec![100i64, 200, 300]); + let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(int_arr), Arc::new(float_arr)], + ) + .unwrap(); + let encoder = RecordEncoder { + columns: vec![ + FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }, + FieldBinding { + arrow_index: 1, + nullability: None, + plan: FieldPlan::Scalar, + }, + ], + prefix: None, + }; + let mut stream_buf = Vec::new(); + encoder.encode(&mut stream_buf, &batch).unwrap(); + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 32, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 1 + batch.num_rows()); + assert_bytes_eq(&out[..], &stream_buf); + } + + #[test] + fn encode_rows_appends_to_existing_buffer() { + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]); + let arr = Int32Array::from(vec![5, 6]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let encoder = RecordEncoder { + columns: vec![FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }], + prefix: None, + }; + let mut out = BytesMut::new(); + out.extend_from_slice(&[0xAA, 0xBB]); + let mut offsets: Vec = vec![0, out.len() as u64]; + encoder + .encode_rows(&batch, 16, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 4); + assert_eq!(*offsets.last().unwrap(), out.len() as u64); + assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]); + assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5)); + assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6)); + } + + #[test] + fn encode_rows_nullable_column() { + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]); + let arr = Int32Array::from(vec![Some(1), None, Some(3)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let encoder = RecordEncoder { + columns: vec![FieldBinding { + arrow_index: 0, + nullability: Some(Nullability::NullFirst), + plan: FieldPlan::Scalar, + }], + prefix: None, + }; + let mut out = BytesMut::new(); + let mut offsets: Vec = vec![0]; + encoder + .encode_rows(&batch, 16, &mut out, &mut offsets) + .unwrap(); + assert_eq!(offsets.len(), 4); + let mut expected_row0 = Vec::new(); + expected_row0.extend(avro_long_bytes(1)); // union branch for value + expected_row0.extend(avro_long_bytes(1)); // value + assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0); + let expected_row1 = avro_long_bytes(0); // union branch for null + assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1); + let mut expected_row2 = Vec::new(); + expected_row2.extend(avro_long_bytes(1)); // union branch for value + expected_row2.extend(avro_long_bytes(3)); // value + assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2); + } } diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs index ba2a0b8564b2..b8d69787b03c 100644 --- a/arrow-avro/src/writer/format.rs +++ b/arrow-avro/src/writer/format.rs @@ -135,6 +135,40 @@ impl AvroFormat for AvroSoeFormat { } } +/// Unframed Avro binary streaming format ("raw Avro record body bytes (no prefix, no OCF header)"). +/// +/// Each record written by the stream writer contains only the raw Avro +/// record body bytes (i.e., the Avro binary encoding of the datum) with **no** +/// per-record prefix and **no** Object Container File (OCF) header. +/// +/// This format is useful when another transport provides framing (for example, +/// length-delimited buffers) or when embedding Avro record payloads inside a +/// larger envelope. +#[derive(Debug, Default)] +pub struct AvroBinaryFormat; + +impl AvroFormat for AvroBinaryFormat { + const NEEDS_PREFIX: bool = false; + + fn start_stream( + &mut self, + _writer: &mut W, + _schema: &Schema, + compression: Option, + ) -> Result<(), ArrowError> { + if compression.is_some() { + return Err(ArrowError::InvalidArgumentError( + "Compression not supported for Avro binary streaming".to_string(), + )); + } + Ok(()) + } + + fn sync_marker(&self) -> Option<&[u8; 16]> { + None + } +} + #[inline] fn write_string(writer: &mut W, s: &str) -> Result<(), ArrowError> { write_bytes(writer, s.as_bytes()) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index f4a2e60ed57f..6f088550f16f 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -19,47 +19,129 @@ //! //! # Overview //! -//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two output -//! formats are supported: +//! Use this module to serialize Arrow [`arrow_array::RecordBatch`] values into Avro. Three output +//! modes are supported: //! -//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object Container File (OCF)**: a self‑describing -//! file with header (schema JSON + metadata), optional compression, data blocks, and -//! sync markers. See Avro 1.11.1 “Object Container Files.” +//! * **[`crate::writer::AvroWriter`]** — writes an **Object Container File (OCF)**: a self‑describing +//! file with header (schema JSON and metadata), optional compression, data blocks, and +//! sync markers. See Avro 1.11.1 "Object Container Files." //! -//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **Single Object Encoding (SOE) Stream** (“datum” bytes) without +//! +//! * **[`crate::writer::AvroStreamWriter`]** — writes a **Single Object Encoding (SOE) Stream** without //! any container framing. This is useful when the schema is known out‑of‑band (i.e., //! via a registry) and you want minimal overhead. //! -//! ## Which format should you use? +//! * **[`crate::writer::Encoder`]** — a row-by-row encoder that buffers encoded records into a single +//! contiguous byte buffer and returns per-row [`bytes::Bytes`] slices. +//! Ideal for publishing individual messages to Kafka, Pulsar, or other message queues +//! where each message must be a self-contained Avro payload. +//! +//! ## Which writer should you use? +//! +//! | Use Case | Recommended Type | +//! |----------|------------------| +//! | Write an OCF file to disk | [`crate::writer::AvroWriter`] | +//! | Stream records continuously to a file/socket | [`crate::writer::AvroStreamWriter`] | +//! | Publish individual records to Kafka/Pulsar | [`crate::writer::Encoder`] | +//! | Need per-row byte slices for custom framing | [`crate::writer::Encoder`] | +//! +//! ## Per-Record Prefix Formats //! -//! * Use **OCF** when you need a portable, self‑contained file. The schema travels with -//! the data, making it easy to read elsewhere. -//! * Use the **SOE stream** when your surrounding protocol supplies schema information -//! (i.e., a schema registry). The writer automatically adds the per‑record prefix: -//! - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) followed by -//! an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body. -//! See Avro 1.11.1 "Single object encoding". -//! -//! - **Confluent wire format**: Each record is prefixed with magic byte `0x00` followed by -//! a **big‑endian** 4‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id(schema_id)`. -//! -//! - **Apicurio wire format**: Each record is prefixed with magic byte `0x00` followed by -//! a **big‑endian** 8‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id64(schema_id)`. -//! +//! For [`crate::writer::AvroStreamWriter`] and [`crate::writer::Encoder`], each record is automatically prefixed +//! based on the fingerprint strategy: //! -//! ## Choosing the Avro schema +//! | Strategy | Prefix | Use Case | +//! |----------|--------|----------| +//! | `FingerprintStrategy::Rabin` (default) | `0xC3 0x01` + 8-byte LE Rabin fingerprint | Standard Avro SOE | +//! | `FingerprintStrategy::Id(id)` | `0x00` + 4-byte BE schema ID | [Confluent Schema Registry] | +//! | `FingerprintStrategy::Id64(id)` | `0x00` + 8-byte BE schema ID | [Apicurio Registry] | +//! +//! [Confluent Schema Registry]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format +//! [Apicurio Registry]: https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry +//! +//! ## Choosing the Avro Schema //! //! By default, the writer converts your Arrow schema to Avro (including a top‑level record //! name). If you already have an Avro schema JSON you want to use verbatim, put it into the -//! Arrow schema metadata under the `avro.schema` key before constructing the writer. The -//! builder will use that schema instead of generating a new one (unless `strip_metadata` is -//! set to true in the options). +//! Arrow schema metadata under the [`SCHEMA_METADATA_KEY`](crate::schema::SCHEMA_METADATA_KEY) +//! key before constructing the writer. The builder will use that schema instead of generating +//! a new one. //! //! ## Compression //! -//! For OCF, you may enable a compression codec via `WriterBuilder::with_compression`. The -//! chosen codec is written into the file header and used for subsequent blocks. SOE stream -//! writing doesn’t apply container‑level compression. +//! For OCF ([`crate::writer::AvroWriter`]), you may enable a compression codec via +//! [`crate::writer::WriterBuilder::with_compression`]. The chosen codec is written into the file header +//! and used for subsequent blocks. SOE stream writing ([`crate::writer::AvroStreamWriter`], [`crate::writer::Encoder`]) +//! does not apply container‑level compression. +//! +//! # Examples +//! +//! ## Writing an OCF File +//! +//! ``` +//! use std::sync::Arc; +//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch}; +//! use arrow_schema::{DataType, Field, Schema}; +//! use arrow_avro::writer::AvroWriter; +//! +//! # fn main() -> Result<(), Box> { +//! let schema = Schema::new(vec![ +//! Field::new("id", DataType::Int64, false), +//! Field::new("name", DataType::Utf8, false), +//! ]); +//! +//! let batch = RecordBatch::try_new( +//! Arc::new(schema.clone()), +//! vec![ +//! Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef, +//! Arc::new(StringArray::from(vec!["alice", "bob"])) as ArrayRef, +//! ], +//! )?; +//! +//! let mut writer = AvroWriter::new(Vec::::new(), schema)?; +//! writer.write(&batch)?; +//! writer.finish()?; +//! let bytes = writer.into_inner(); +//! assert!(!bytes.is_empty()); +//! # Ok(()) +//! # } +//! ``` +//! +//! ## Using the Row-by-Row Encoder for Message Queues +//! +//! ``` +//! use std::sync::Arc; +//! use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +//! use arrow_schema::{DataType, Field, Schema}; +//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; +//! use arrow_avro::schema::FingerprintStrategy; +//! +//! # fn main() -> Result<(), Box> { +//! let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); +//! let batch = RecordBatch::try_new( +//! Arc::new(schema.clone()), +//! vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], +//! )?; +//! +//! // Build an Encoder with Confluent wire format (schema ID = 42) +//! let mut encoder = WriterBuilder::new(schema) +//! .with_fingerprint_strategy(FingerprintStrategy::Id(42)) +//! .build_encoder::()?; +//! +//! encoder.encode(&batch)?; +//! +//! // Get the buffered rows (zero-copy views into a single backing buffer) +//! let rows = encoder.flush(); +//! assert_eq!(rows.len(), 3); +//! +//! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body +//! for row in rows.rows() { +//! let row = row?; +//! assert_eq!(row[0], 0x00); // Confluent magic byte +//! } +//! # Ok(()) +//! # } +//! ``` //! //! --- use crate::codec::AvroFieldBuilder; @@ -70,7 +152,8 @@ use crate::schema::{ use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long}; use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat}; use arrow_array::RecordBatch; -use arrow_schema::{ArrowError, Schema}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use bytes::{Bytes, BytesMut}; use std::io::Write; use std::sync::Arc; @@ -79,11 +162,100 @@ mod encoder; /// Logic for different Avro container file formats. pub mod format; +/// A contiguous set of encoded rows. +/// +/// `EncodedRows` stores: +/// - a single backing byte buffer (`bytes::Bytes`) +/// - a `Vec` of row boundary offsets (length = `rows + 1`) +/// +/// This lets callers obtain per-row payloads as zero-copy `Bytes` slices. +/// +/// For compatibility with APIs that require owned `Vec`, use [`EncodedRows::to_vecs`]. +#[derive(Debug, Clone)] +pub struct EncodedRows { + data: Bytes, + offsets: Vec, +} + +impl EncodedRows { + /// Create a new `EncodedRows` from a backing buffer and row boundary offsets. + /// + /// `offsets` must have length `rows + 1`, and be monotonically non-decreasing. + /// The last offset should equal `data.len()`. + pub fn new(data: Bytes, offsets: Vec) -> Self { + Self { data, offsets } + } + + /// Number of rows in this buffer. + #[inline] + pub fn len(&self) -> usize { + self.offsets.len().saturating_sub(1) + } + + /// Returns `true` if there are no rows. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the backing buffer. + /// + /// Note: individual rows should typically be accessed via [`Self::row`] or [`Self::rows`]. + #[inline] + pub fn bytes(&self) -> &Bytes { + &self.data + } + + /// Returns the row boundary offsets (length = `len() + 1`). + #[inline] + pub fn offsets(&self) -> &[u64] { + &self.offsets + } + + /// Return the `i`th row as a zero-copy `Bytes` slice. + /// + /// # Errors + /// + /// Returns an error if the row offsets are invalid (e.g. exceed `usize::MAX`). + /// + /// # Panics + /// + /// Panics if `i >= self.len()`. + pub fn row(&self, i: usize) -> Result { + let start_u64 = self.offsets[i]; + let end_u64 = self.offsets[i + 1]; + let start = usize::try_from(start_u64).map_err(|_| { + ArrowError::ParseError("row start offset does not fit in usize".to_string()) + })?; + let end = usize::try_from(end_u64).map_err(|_| { + ArrowError::ParseError("row end offset does not fit in usize".to_string()) + })?; + Ok(self.data.slice(start..end)) + } + + /// Iterate over rows as zero-copy `Bytes` slices. + pub fn rows(&self) -> impl Iterator> + '_ { + (0..self.len()).map(|i| self.row(i)) + } + + /// Copy all rows into independent `Vec` buffers. + /// + /// This is useful for compatibility with APIs that require owned, mutable byte vectors. + pub fn to_vecs(&self) -> Result>, ArrowError> { + let mut out = Vec::with_capacity(self.len()); + for i in 0..self.len() { + out.push(self.row(i)?.to_vec()); + } + Ok(out) + } +} + /// Builder to configure and create a `Writer`. #[derive(Debug, Clone)] pub struct WriterBuilder { schema: Schema, codec: Option, + row_capacity: Option, capacity: usize, fingerprint_strategy: Option, } @@ -99,6 +271,7 @@ impl WriterBuilder { Self { schema, codec: None, + row_capacity: None, capacity: 1024, fingerprint_strategy: None, } @@ -117,30 +290,34 @@ impl WriterBuilder { self } - /// Sets the capacity for the given object and returns the modified instance. + /// Sets the expected capacity (in bytes) for internal buffers. + /// + /// This is used as a hint to pre-allocate staging buffers for writing. pub fn with_capacity(mut self, capacity: usize) -> Self { self.capacity = capacity; self } - /// Create a new `Writer` with specified `AvroFormat` and builder options. - /// Performs one‑time startup (header/stream init, encoder plan). - pub fn build(self, mut writer: W) -> Result, ArrowError> - where - W: Write, - F: AvroFormat, - { - let mut format = F::default(); + /// Sets the expected byte size for each encoded row. + /// + /// This setting affects [`Encoder`] created via [`build_encoder`](Self::build_encoder). + /// It is used as a hint to reduce reallocations when the typical encoded row size is known. + pub fn with_row_capacity(mut self, capacity: usize) -> Self { + self.row_capacity = Some(capacity); + self + } + + fn prepare_encoder(&self) -> Result<(Arc, RecordEncoder), ArrowError> { let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) { Some(json) => AvroSchema::new(json.clone()), None => AvroSchema::try_from(&self.schema)?, }; let maybe_fingerprint = if F::NEEDS_PREFIX { - match self.fingerprint_strategy { - Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)), - Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(id)), + match &self.fingerprint_strategy { + Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(*id)), + Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(*id)), Some(strategy) => { - Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?) + Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?) } None => Some( avro_schema @@ -156,11 +333,48 @@ impl WriterBuilder { avro_schema.clone().json_string, ); let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md)); - format.start_stream(&mut writer, &schema, self.codec)?; let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?; let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref()) .with_fingerprint(maybe_fingerprint) .build()?; + Ok((schema, encoder)) + } + + /// Build a new [`Encoder`] for the given [`AvroFormat`]. + /// + /// `Encoder` only supports stream formats (no OCF sync markers). Attempting to build an + /// encoder with an OCF format (e.g. [`AvroOcfFormat`]) will return an error. + pub fn build_encoder(self) -> Result { + if F::default().sync_marker().is_some() { + return Err(ArrowError::InvalidArgumentError( + "Encoder only supports stream formats (no OCF header/sync marker)".to_string(), + )); + } + let (schema, encoder) = self.prepare_encoder::()?; + Ok(Encoder { + schema, + encoder, + row_capacity: self.row_capacity, + buffer: BytesMut::with_capacity(self.capacity), + offsets: vec![0], + }) + } + + /// Build a new [`Writer`] with the specified [`AvroFormat`] and builder options. + pub fn build(self, mut writer: W) -> Result, ArrowError> + where + W: Write, + F: AvroFormat, + { + let mut format = F::default(); + if format.sync_marker().is_none() && !F::NEEDS_PREFIX { + return Err(ArrowError::InvalidArgumentError( + "AvroBinaryFormat is only supported with Encoder, use build_encoder instead" + .to_string(), + )); + } + let (schema, encoder) = self.prepare_encoder::()?; + format.start_stream(&mut writer, &schema, self.codec)?; Ok(Writer { writer, schema, @@ -172,6 +386,72 @@ impl WriterBuilder { } } +/// A row-by-row streaming encoder for Avro **Single Object Encoding** (SOE) streams. +/// +/// Unlike [`Writer`], which writes directly to an output sink, `Encoder` buffers rows in a +/// single contiguous backing buffer and returns per-row payloads as zero-copy [`bytes::Bytes`] +/// slices via [`EncodedRows`]. +/// +/// To get owned `Vec` payloads for compatibility with APIs that require owned buffers, +/// call [`EncodedRows::to_vecs`]. +#[derive(Debug)] +pub struct Encoder { + schema: SchemaRef, + encoder: RecordEncoder, + row_capacity: Option, + buffer: BytesMut, + offsets: Vec, +} + +impl Encoder { + /// Serialize one [`RecordBatch`] into the internal buffer. + pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + if batch.schema().fields() != self.schema.fields() { + return Err(ArrowError::SchemaError( + "Schema of RecordBatch differs from Writer schema".to_string(), + )); + } + self.encoder.encode_rows( + batch, + self.row_capacity.unwrap_or(0), + &mut self.buffer, + &mut self.offsets, + )?; + Ok(()) + } + + /// A convenience method to write a slice of [`RecordBatch`] values. + pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), ArrowError> { + for b in batches { + self.encode(b)?; + } + Ok(()) + } + + /// Drain and return all currently buffered encoded rows. + /// + /// The returned [`EncodedRows`] provides per-row payloads as `Bytes` slices. + /// For owned buffers, use [`EncodedRows::to_vecs`]. + pub fn flush(&mut self) -> EncodedRows { + let data = self.buffer.split().freeze(); + let offsets = std::mem::replace(&mut self.offsets, vec![0]); + EncodedRows::new(data, offsets) + } + + /// Returns the Arrow schema used by this encoder. + /// + /// The returned schema includes metadata with the Avro schema JSON under + /// the `avro.schema` key. + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Returns the number of encoded rows currently buffered. + pub fn buffered_len(&self) -> usize { + self.offsets.len().saturating_sub(1) + } +} + /// Generic Avro writer. /// /// This type is generic over the output Write sink (`W`) and the Avro format (`F`). @@ -182,7 +462,7 @@ impl WriterBuilder { #[derive(Debug)] pub struct Writer { writer: W, - schema: Arc, + schema: SchemaRef, format: F, compression: Option, capacity: usize, @@ -401,6 +681,8 @@ mod tests { use crate::schema::{AvroSchema, SchemaStore}; use crate::test_util::arrow_test_data; use arrow::datatypes::TimeUnit; + use arrow::util::pretty::pretty_format_batches; + use arrow_array::builder::{Int32Builder, ListBuilder}; #[cfg(feature = "avro_custom_types")] use arrow_array::types::{Int16Type, Int32Type, Int64Type}; use arrow_array::types::{ @@ -408,16 +690,17 @@ mod tests { TimestampMillisecondType, TimestampNanosecondType, }; use arrow_array::{ - Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, RecordBatch, - StringArray, StructArray, UnionArray, + Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Int32Array, Int64Array, + PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray, }; #[cfg(feature = "avro_custom_types")] - use arrow_array::{Int16Array, Int64Array, RunArray}; + use arrow_array::{Int16Array, RunArray}; use arrow_schema::UnionMode; #[cfg(not(feature = "avro_custom_types"))] use arrow_schema::{DataType, Field, Schema}; #[cfg(feature = "avro_custom_types")] use arrow_schema::{DataType, Field, Schema}; + use bytes::BytesMut; use std::collections::HashMap; use std::collections::HashSet; use std::fs::File; @@ -2410,4 +2693,476 @@ mod tests { assert_eq!(roundtrip, batch); Ok(()) } + + fn make_encoder_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]) + } + + fn make_encoder_batch(schema: &Schema) -> RecordBatch { + let a = Int32Array::from(vec![1, 2, 3]); + let b = Int32Array::from(vec![10, 20, 30]); + RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], + ) + .expect("failed to build test RecordBatch") + } + + fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, AvroSchema), ArrowError> { + let avro_json = r#" + { + "type": "record", + "name": "User", + "fields": [ + { "name": "id", "type": "long" }, + { "name": "name", "type": "string" }, + { "name": "active", "type": "boolean" }, + { "name": "tags", "type": { "type": "array", "items": "int" } }, + { "name": "opt", "type": ["null", "string"], "default": null } + ] + }"#; + let avro_schema = AvroSchema::new(avro_json.to_string()); + let mut md = HashMap::new(); + md.insert( + SCHEMA_METADATA_KEY.to_string(), + avro_schema.json_string.clone(), + ); + let item_field = Arc::new(Field::new( + Field::LIST_FIELD_DEFAULT_NAME, + DataType::Int32, + false, + )); + let schema = Schema::new_with_metadata( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + Field::new("active", DataType::Boolean, false), + Field::new("tags", DataType::List(item_field.clone()), false), + Field::new("opt", DataType::Utf8, true), + ], + md, + ); + let id = Int64Array::from(vec![1, 2, 3]); + let name = StringArray::from(vec!["alice", "bob", "carol"]); + let active = BooleanArray::from(vec![true, false, true]); + let mut tags_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field); + tags_builder.values().append_value(1); + tags_builder.values().append_value(2); + tags_builder.append(true); + tags_builder.append(true); + tags_builder.values().append_value(3); + tags_builder.append(true); + let tags = tags_builder.finish(); + let opt = StringArray::from(vec![Some("x"), None, Some("z")]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(id) as ArrayRef, + Arc::new(name) as ArrayRef, + Arc::new(active) as ArrayRef, + Arc::new(tags) as ArrayRef, + Arc::new(opt) as ArrayRef, + ], + )?; + Ok((schema, batch, avro_schema)) + } + + #[test] + fn test_row_writer_matches_stream_writer_soe() -> Result<(), ArrowError> { + let schema = make_encoder_schema(); + let batch = make_encoder_batch(&schema); + let mut stream = AvroStreamWriter::new(Vec::::new(), schema.clone())?; + stream.write(&batch)?; + stream.finish()?; + let stream_bytes = stream.into_inner(); + let mut row_writer = WriterBuilder::new(schema).build_encoder::()?; + row_writer.encode(&batch)?; + let rows = row_writer.flush(); + // NOTE: EncodedRows is now zero-copy; `to_vecs()` is an explicit compatibility copy. + let row_bytes: Vec = rows.to_vecs()?.concat(); + assert_eq!(stream_bytes, row_bytes); + Ok(()) + } + + #[test] + fn test_row_writer_flush_clears_buffer() -> Result<(), ArrowError> { + let schema = make_encoder_schema(); + let batch = make_encoder_batch(&schema); + let mut row_writer = WriterBuilder::new(schema).build_encoder::()?; + row_writer.encode(&batch)?; + assert_eq!(row_writer.buffered_len(), batch.num_rows()); + let out1 = row_writer.flush(); + assert_eq!(out1.len(), batch.num_rows()); + assert_eq!(row_writer.buffered_len(), 0); + let out2 = row_writer.flush(); + assert_eq!(out2.len(), 0); + Ok(()) + } + + #[test] + fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), ArrowError> { + let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?; + let mut store = SchemaStore::new(); + store.register(avro_schema.clone())?; + let mut row_writer = WriterBuilder::new(schema).build_encoder::()?; + row_writer.encode(&batch)?; + let rows = row_writer.flush(); + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_batch_size(1024) + .build_decoder()?; + for row in rows.rows() { + let row = row?; + let consumed = decoder.decode(row.as_ref())?; + assert_eq!( + consumed, + row.len(), + "decoder should consume the full row frame" + ); + } + let out = decoder.flush()?.expect("decoded batch"); + let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string(); + let actual = pretty_format_batches(&[out])?.to_string(); + assert_eq!(expected, actual); + Ok(()) + } + + #[test] + fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), ArrowError> { + let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?; + let mut store = SchemaStore::new(); + store.register(avro_schema.clone())?; + let mut row_writer = WriterBuilder::new(schema).build_encoder::()?; + row_writer.encode(&batch)?; + let rows = row_writer.flush(); + // Build a contiguous stream and frame boundaries (prefix sums) from EncodedRows. + let mut stream: Vec = Vec::new(); + let mut boundaries: Vec = Vec::with_capacity(rows.len() + 1); + boundaries.push(0usize); + for row in rows.rows() { + let row = row?; + stream.extend_from_slice(row.as_ref()); + boundaries.push(stream.len()); + } + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_batch_size(1024) + .build_decoder()?; + let mut buffered = BytesMut::new(); + let chunk_rows = [1usize, 2, 3, 1, 4, 2]; + let mut row_idx = 0usize; + let mut i = 0usize; + let n_rows = rows.len(); + while row_idx < n_rows { + let take = chunk_rows[i % chunk_rows.len()]; + i += 1; + let end_row = (row_idx + take).min(n_rows); + let byte_start = boundaries[row_idx]; + let byte_end = boundaries[end_row]; + buffered.extend_from_slice(&stream[byte_start..byte_end]); + loop { + let consumed = decoder.decode(&buffered)?; + if consumed == 0 { + break; + } + let _ = buffered.split_to(consumed); + } + assert!( + buffered.is_empty(), + "expected decoder to consume the entire frame-aligned chunk" + ); + row_idx = end_row; + } + let out = decoder.flush()?.expect("decoded batch"); + let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string(); + let actual = pretty_format_batches(&[out])?.to_string(); + assert_eq!(expected, actual); + Ok(()) + } + + #[test] + fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> Result<(), ArrowError> { + let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?; + let schema_id: u32 = 42; + let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); + store.set(Fingerprint::Id(schema_id), avro_schema.clone())?; + let mut row_writer = WriterBuilder::new(schema) + .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) + .build_encoder::()?; + row_writer.encode(&batch)?; + let rows = row_writer.flush(); + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_batch_size(1024) + .build_decoder()?; + for row in rows.rows() { + let row = row?; + let consumed = decoder.decode(row.as_ref())?; + assert_eq!(consumed, row.len()); + } + let out = decoder.flush()?.expect("decoded batch"); + let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string(); + let actual = pretty_format_batches(&[out])?.to_string(); + assert_eq!(expected, actual); + Ok(()) + } + + #[test] + fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format() + -> Result<(), ArrowError> { + use crate::writer::format::AvroBinaryFormat; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let schema_ref = Arc::new(schema.clone()); + let batch1 = RecordBatch::try_new( + schema_ref.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef, + ], + )?; + let batch2 = RecordBatch::try_new( + schema_ref, + vec![ + Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef, + Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef, + ], + )?; + let mut encoder = WriterBuilder::new(schema).build_encoder::()?; + let empty = Encoder::flush(&mut encoder); + assert_eq!(EncodedRows::len(&empty), 0); + assert!(EncodedRows::is_empty(&empty)); + assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]); + assert_eq!(EncodedRows::offsets(&empty), &[0_u64]); + assert_eq!(EncodedRows::rows(&empty).count(), 0); + assert!(EncodedRows::to_vecs(&empty)?.is_empty()); + let batches = vec![batch1, batch2]; + Encoder::encode_batches(&mut encoder, &batches)?; + assert_eq!(encoder.buffered_len(), 5); + let rows = Encoder::flush(&mut encoder); + assert_eq!( + encoder.buffered_len(), + 0, + "Encoder::flush should reset the internal offsets" + ); + assert_eq!(EncodedRows::len(&rows), 5); + assert!(!EncodedRows::is_empty(&rows)); + let expected_offsets: &[u64] = &[0, 2, 4, 6, 8, 10]; + assert_eq!(EncodedRows::offsets(&rows), expected_offsets); + let expected_rows: Vec> = vec![ + vec![2, 20], + vec![4, 40], + vec![6, 60], + vec![8, 80], + vec![10, 100], + ]; + let expected_stream: Vec = expected_rows.concat(); + assert_eq!( + EncodedRows::bytes(&rows).as_ref(), + expected_stream.as_slice() + ); + for (i, expected) in expected_rows.iter().enumerate() { + assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice()); + } + let iter_rows: Vec> = EncodedRows::rows(&rows) + .collect::, _>>()? + .into_iter() + .map(|b| b.to_vec()) + .collect(); + assert_eq!(iter_rows, expected_rows); + assert_eq!(EncodedRows::to_vecs(&rows)?, expected_rows); + let recreated = EncodedRows::new( + EncodedRows::bytes(&rows).clone(), + EncodedRows::offsets(&rows).to_vec(), + ); + assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows)); + assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows)); + assert_eq!( + EncodedRows::offsets(&recreated), + EncodedRows::offsets(&rows) + ); + assert_eq!( + EncodedRows::to_vecs(&recreated)?, + EncodedRows::to_vecs(&rows)? + ); + let empty_again = Encoder::flush(&mut encoder); + assert!(EncodedRows::is_empty(&empty_again)); + Ok(()) + } + + #[test] + fn test_writer_builder_build_rejects_avro_binary_format() { + use crate::writer::format::AvroBinaryFormat; + use arrow_schema::{DataType, Field, Schema}; + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let err = WriterBuilder::new(schema) + .build::<_, AvroBinaryFormat>(Vec::::new()) + .unwrap_err(); + match err { + ArrowError::InvalidArgumentError(msg) => assert_eq!( + msg, + "AvroBinaryFormat is only supported with Encoder, use build_encoder instead" + ), + other => panic!("expected InvalidArgumentError, got {:?}", other), + } + } + #[test] + fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing() + -> Result<(), ArrowError> { + use crate::writer::format::AvroBinaryFormat; + let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?; + let batches: Vec = vec![batch.clone(), batch.slice(1, 2)]; + let expected = arrow::compute::concat_batches(&batch.schema(), &batches)?; + let mut binary_encoder = + WriterBuilder::new(schema.clone()).build_encoder::()?; + binary_encoder.encode_batches(&batches)?; + let binary_rows = binary_encoder.flush(); + assert_eq!( + binary_rows.len(), + expected.num_rows(), + "binary encoder row count mismatch" + ); + let mut soe_encoder = WriterBuilder::new(schema).build_encoder::()?; + soe_encoder.encode_batches(&batches)?; + let soe_rows = soe_encoder.flush(); + assert_eq!( + soe_rows.len(), + binary_rows.len(), + "SOE vs binary row count mismatch" + ); + let mut store = SchemaStore::new(); // Rabin by default + let fp = store.register(avro_schema)?; + let fp_le_bytes = match fp { + Fingerprint::Rabin(v) => v.to_le_bytes(), + other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"), + }; + const SOE_MAGIC: [u8; 2] = [0xC3, 0x01]; + const SOE_PREFIX_LEN: usize = 2 + 8; + for i in 0..binary_rows.len() { + let body = binary_rows.row(i)?; + let soe = soe_rows.row(i)?; + assert!( + soe.len() >= SOE_PREFIX_LEN, + "expected SOE row to include prefix" + ); + assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC); + assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes); + assert_eq!( + &soe.as_ref()[SOE_PREFIX_LEN..], + body.as_ref(), + "SOE body bytes differ from AvroBinaryFormat body bytes (row {i})" + ); + } + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_batch_size(1024) + .build_decoder()?; + for body in binary_rows.rows() { + let body = body?; + let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len()); + framed.extend_from_slice(&SOE_MAGIC); + framed.extend_from_slice(&fp_le_bytes); + framed.extend_from_slice(body.as_ref()); + let consumed = decoder.decode(&framed)?; + assert_eq!( + consumed, + framed.len(), + "decoder should consume the full SOE-framed message" + ); + } + let out = decoder.flush()?.expect("expected a decoded RecordBatch"); + let expected_str = pretty_format_batches(&[expected])?.to_string(); + let actual_str = pretty_format_batches(&[out])?.to_string(); + assert_eq!(expected_str, actual_str); + Ok(()) + } + + #[test] + fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks() + -> Result<(), ArrowError> { + use crate::writer::format::AvroBinaryFormat; + let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?; + let mut encoder = WriterBuilder::new(schema).build_encoder::()?; + encoder.encode(&batch)?; + let rows = encoder.flush(); + let mut store = SchemaStore::new(); + let fp = store.register(avro_schema)?; + let fp_le_bytes = match fp { + Fingerprint::Rabin(v) => v.to_le_bytes(), + other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"), + }; + const SOE_MAGIC: [u8; 2] = [0xC3, 0x01]; + const SOE_PREFIX_LEN: usize = 2 + 8; + let mut stream: Vec = Vec::new(); + for body in rows.rows() { + let body = body?; + let msg_len: u32 = (SOE_PREFIX_LEN + body.len()) + .try_into() + .expect("message length must fit in u32"); + stream.extend_from_slice(&msg_len.to_le_bytes()); + stream.extend_from_slice(&SOE_MAGIC); + stream.extend_from_slice(&fp_le_bytes); + stream.extend_from_slice(body.as_ref()); + } + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .with_batch_size(1024) + .build_decoder()?; + let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34]; + let mut pos = 0usize; + let mut i = 0usize; + let mut buffered = BytesMut::new(); + let mut decoded_frames = 0usize; + while pos < stream.len() { + let take = chunk_sizes[i % chunk_sizes.len()]; + i += 1; + let end = (pos + take).min(stream.len()); + buffered.extend_from_slice(&stream[pos..end]); + pos = end; + loop { + if buffered.len() < 4 { + break; + } + let msg_len = + u32::from_le_bytes([buffered[0], buffered[1], buffered[2], buffered[3]]) + as usize; + if buffered.len() < 4 + msg_len { + break; + } + let frame = buffered.split_to(4 + msg_len); + let payload = &frame[4..]; + let consumed = decoder.decode(payload)?; + assert_eq!( + consumed, + payload.len(), + "decoder should consume the full SOE-framed message" + ); + + decoded_frames += 1; + } + } + assert!( + buffered.is_empty(), + "expected transport framer to consume all bytes; leftover = {}", + buffered.len() + ); + assert_eq!( + decoded_frames, + rows.len(), + "expected to decode exactly one frame per encoded row" + ); + let out = decoder.flush()?.expect("expected decoded RecordBatch"); + let expected_str = pretty_format_batches(std::slice::from_ref(&batch))?.to_string(); + let actual_str = pretty_format_batches(&[out])?.to_string(); + assert_eq!(expected_str, actual_str); + Ok(()) + } } From f41ffb08ec2dc80538460a7b333b8195d9378d99 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Jan 2026 20:35:07 -0500 Subject: [PATCH 2/4] Add additional test coverage --- arrow-avro/src/writer/encoder.rs | 46 ++++++++++++++++++++++++++++++++ arrow-avro/src/writer/format.rs | 44 ++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 97825b1bd895..a53e540de9b0 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -3380,4 +3380,50 @@ mod tests { expected_row2.extend(avro_long_bytes(3)); // value assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2); } + + #[test] + fn encode_prefix_write_error() { + use crate::codec::AvroFieldBuilder; + use crate::schema::{AvroSchema, FingerprintAlgorithm}; + use std::io; + + struct FailWriter { + failed: bool, + } + + impl io::Write for FailWriter { + fn write(&mut self, _buf: &[u8]) -> io::Result { + if !self.failed { + self.failed = true; + Err(io::Error::new(io::ErrorKind::Other, "fail write")) + } else { + Ok(0) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]); + let arr = Int32Array::from(vec![42]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap(); + let avro_schema = AvroSchema::try_from(&schema).unwrap(); + let fingerprint = avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(); + let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap()) + .build() + .unwrap(); + let encoder = RecordEncoderBuilder::new(&avro_root, &schema) + .with_fingerprint(Some(fingerprint)) + .build() + .unwrap(); + + let mut writer = FailWriter { failed: false }; + let err = encoder.encode(&mut writer, &batch).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("write prefix"), "unexpected error: {msg}"); + } } diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs index b8d69787b03c..9abb8755ab8c 100644 --- a/arrow-avro/src/writer/format.rs +++ b/arrow-avro/src/writer/format.rs @@ -169,6 +169,50 @@ impl AvroFormat for AvroBinaryFormat { } } +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + fn test_schema() -> Schema { + Schema::new(vec![Field::new("x", DataType::Int32, false)]) + } + + #[test] + fn avro_binary_format_rejects_compression() { + let mut format = AvroBinaryFormat::default(); + let schema = test_schema(); + let err = format + .start_stream( + &mut Vec::::new(), + &schema, + Some(CompressionCodec::Snappy), + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("Compression not supported for Avro binary streaming") + ); + } + + #[test] + fn avro_soe_format_rejects_compression() { + let mut format = AvroSoeFormat::default(); + let schema = test_schema(); + let err = format + .start_stream( + &mut Vec::::new(), + &schema, + Some(CompressionCodec::Snappy), + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("Compression not supported for Avro SOE streaming") + ); + } +} + #[inline] fn write_string(writer: &mut W, s: &str) -> Result<(), ArrowError> { write_bytes(writer, s.as_bytes()) From 172566b3e567ed34ae0597eb23fee4f228a8ff8c Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Sat, 24 Jan 2026 01:18:31 -0600 Subject: [PATCH 3/4] Address PR Comments --- arrow-avro/Cargo.toml | 4 + arrow-avro/benches/encoder.rs | 87 ++++++++++++ arrow-avro/src/writer/encoder.rs | 116 ++++++---------- arrow-avro/src/writer/mod.rs | 220 +++++++++++++++---------------- 4 files changed, 239 insertions(+), 188 deletions(-) create mode 100644 arrow-avro/benches/encoder.rs diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index edcce33ac71f..a4699a2efa80 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -95,3 +95,7 @@ harness = false [[bench]] name = "avro_writer" harness = false + +[[bench]] +name = "encoder" +harness = false diff --git a/arrow-avro/benches/encoder.rs b/arrow-avro/benches/encoder.rs new file mode 100644 index 000000000000..2e0a7d1a3eea --- /dev/null +++ b/arrow-avro/benches/encoder.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for the `arrow-avro` Encoder + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_avro::writer::format::AvroSoeFormat; +use arrow_avro::writer::{EncodedRows, WriterBuilder}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use once_cell::sync::Lazy; +use std::hint::black_box; +use std::sync::Arc; +use std::time::Duration; + +const SIZES: [usize; 4] = [1_000, 10_000, 100_000, 1_000_000]; + +/// Pre-generate EncodedRows for each size to avoid setup overhead in benchmarks. +static ENCODED_DATA: Lazy> = + Lazy::new(|| SIZES.iter().map(|&n| make_encoded_rows(n)).collect()); + +/// Create an EncodedRows with `n` rows of Int32 data. +fn make_encoded_rows(n: usize) -> EncodedRows { + let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let values: Vec = (0..n as i32).collect(); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(values)) as ArrayRef], + ) + .unwrap(); + let mut encoder = WriterBuilder::new(schema) + .build_encoder::() + .unwrap(); + encoder.encode(&batch).unwrap(); + encoder.flush() +} + +fn bench_row_access(c: &mut Criterion) { + let mut group = c.benchmark_group("row_access"); + for (idx, &size) in SIZES.iter().enumerate() { + let encoded = &ENCODED_DATA[idx]; + let num_rows = encoded.len(); + // Configure sampling based on data size + match size { + 100_000 | 1_000_000 => { + group + .sample_size(20) + .measurement_time(Duration::from_secs(10)) + .warm_up_time(Duration::from_secs(3)); + } + _ => { + group.sample_size(100); + } + } + group.throughput(Throughput::Elements(num_rows as u64)); + group.bench_function(BenchmarkId::from_parameter(size), |b| { + b.iter(|| { + for i in 0..num_rows { + black_box(encoded.row(i).unwrap()); + } + }) + }); + } + group.finish(); +} + +criterion_group! { + name = encoder; + config = Criterion::default().configure_from_args(); + targets = bench_row_access +} + +criterion_main!(encoder); diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 58b9d388f9c7..c0806b8d0ebc 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -637,17 +637,13 @@ impl<'a> FieldEncoder<'a> { // Compute the effective null state from writer-declared nullability and data nulls. let null_state = match nullability { None => NullState::NonNullable, - Some(null_order) => { - match array.nulls() { - Some(nulls) if array.null_count() > 0 => { - NullState::Nullable { nulls, null_order } - } - _ => NullState::NullableNoNulls { - // Nullable site with no null buffer for this view - union_value_byte: union_value_branch_byte(null_order, false), - }, - } - } + Some(null_order) => match array.nulls() { + Some(nulls) if array.null_count() > 0 => NullState::Nullable { nulls, null_order }, + _ => NullState::NullableNoNulls { + // Nullable site with no null buffer for this view + union_value_byte: union_value_branch_byte(null_order, false), + }, + }, }; Ok(Self { encoder, @@ -848,26 +844,14 @@ impl RecordEncoder { batch: &RecordBatch, row_capacity: usize, out: &mut BytesMut, - offsets: &mut Vec, + offsets: &mut Vec, ) -> Result<(), ArrowError> { - let last_offset = *offsets.last().ok_or_else(|| { - ArrowError::AvroError( - "encode_rows requires offsets to be non-empty and seeded with a 0 sentinel" - .to_string(), - ) - })?; - // The first offset must be 0. (Safe index [0] because last() exists ensures len >= 1). - if offsets[0] != 0 { + let out_len = out.len(); + if offsets.first() != Some(&0) || offsets.last() != Some(&out_len) { return Err(ArrowError::AvroError( - "encode_rows requires offsets[0] == 0".to_string(), + "encode_rows requires offsets to start with 0 and end at out.len()".to_string(), )); } - let out_len_u64 = out.len() as u64; - if last_offset != out_len_u64 { - return Err(ArrowError::AvroError(format!( - "encode_rows requires offsets.last() == out.len() ({last_offset} != {out_len_u64})", - ))); - } let n = batch.num_rows(); if n == 0 { return Ok(()); @@ -880,49 +864,31 @@ impl RecordEncoder { let mut column_encoders = self.prepare_for_batch(batch)?; offsets.reserve(n); let prefix_bytes = self.prefix.as_ref().map(|p| p.as_slice()); - let prefix_len = prefix_bytes.map_or(0usize, |p| p.len()); - let per_row_hint = match row_capacity { - 0 => prefix_len, - _ => row_capacity.max(prefix_len), - }; - if per_row_hint != 0 { - if let Some(additional) = n.checked_mul(per_row_hint) { - if out.len().checked_add(additional).is_some() { - out.reserve(additional); - } - } + let prefix_len = prefix_bytes.map_or(0, |p| p.len()); + let per_row_hint = row_capacity.max(prefix_len); + if let Some(additional) = n + .checked_mul(per_row_hint) + .filter(|&a| out_len.checked_add(a).is_some()) + { + out.reserve(additional); } let start_out_len = out.len(); let start_offsets_len = offsets.len(); let res = (|| -> Result<(), ArrowError> { - if column_encoders.is_empty() { - if let Some(prefix) = prefix_bytes { - let inc = prefix.len() as u64; - let mut cur = out_len_u64; - for _ in 0..n { - out.extend_from_slice(prefix); - cur += inc; - offsets.push(cur); - } - } else { - offsets.extend(std::iter::repeat_n(out_len_u64, n)); - } - return Ok(()); - } let mut w = out.writer(); if let [enc0] = column_encoders.as_mut_slice() { for_rows_with_prefix!(n, prefix_bytes, w, |row| { enc0.encode(&mut w, row)?; - offsets.push(w.get_ref().len() as u64); + offsets.push(w.get_ref().len()); + }); + } else { + for_rows_with_prefix!(n, prefix_bytes, w, |row| { + for enc in column_encoders.iter_mut() { + enc.encode(&mut w, row)?; + } + offsets.push(w.get_ref().len()); }); - return Ok(()); } - for_rows_with_prefix!(n, prefix_bytes, w, |row| { - for enc in column_encoders.iter_mut() { - enc.encode(&mut w, row)?; - } - offsets.push(w.get_ref().len() as u64); - }); Ok(()) })(); if res.is_err() { @@ -931,7 +897,7 @@ impl RecordEncoder { } else { debug_assert_eq!( *offsets.last().unwrap(), - out.len() as u64, + out.len(), "encode_rows: offsets/out length mismatch after successful encode" ); } @@ -2088,9 +2054,9 @@ mod tests { } } - fn row_slice<'a>(buf: &'a [u8], offsets: &[u64], row: usize) -> &'a [u8] { - let start = offsets[row] as usize; - let end = offsets[row + 1] as usize; + fn row_slice<'a>(buf: &'a [u8], offsets: &[usize], row: usize) -> &'a [u8] { + let start = offsets[row]; + let end = offsets[row + 1]; &buf[start..end] } @@ -3177,12 +3143,12 @@ mod tests { prefix: None, }; let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 16, &mut out, &mut offsets) .unwrap(); assert_eq!(offsets.len(), 4); - assert_eq!(*offsets.last().unwrap(), out.len() as u64); + assert_eq!(*offsets.last().unwrap(), out.len()); assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1)); assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2)); assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3)); @@ -3217,12 +3183,12 @@ mod tests { prefix: None, }; let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 32, &mut out, &mut offsets) .unwrap(); assert_eq!(offsets.len(), 3); - assert_eq!(*offsets.last().unwrap(), out.len() as u64); + assert_eq!(*offsets.last().unwrap(), out.len()); let mut expected_row0 = Vec::new(); expected_row0.extend(avro_long_bytes(10)); expected_row0.extend(avro_len_prefixed_bytes(b"hello")); @@ -3252,7 +3218,7 @@ mod tests { .build() .unwrap(); let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 32, &mut out, &mut offsets) .unwrap(); @@ -3277,11 +3243,11 @@ mod tests { prefix: None, }; let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 16, &mut out, &mut offsets) .unwrap(); - assert_eq!(offsets, vec![0u64]); + assert_eq!(offsets, vec![0]); assert!(out.is_empty()); } @@ -3316,7 +3282,7 @@ mod tests { let mut stream_buf = Vec::new(); encoder.encode(&mut stream_buf, &batch).unwrap(); let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 32, &mut out, &mut offsets) .unwrap(); @@ -3339,12 +3305,12 @@ mod tests { }; let mut out = BytesMut::new(); out.extend_from_slice(&[0xAA, 0xBB]); - let mut offsets: Vec = vec![0, out.len() as u64]; + let mut offsets: Vec = vec![0, out.len()]; encoder .encode_rows(&batch, 16, &mut out, &mut offsets) .unwrap(); assert_eq!(offsets.len(), 4); - assert_eq!(*offsets.last().unwrap(), out.len() as u64); + assert_eq!(*offsets.last().unwrap(), out.len()); assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]); assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5)); assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6)); @@ -3364,7 +3330,7 @@ mod tests { prefix: None, }; let mut out = BytesMut::new(); - let mut offsets: Vec = vec![0]; + let mut offsets: Vec = vec![0]; encoder .encode_rows(&batch, 16, &mut out, &mut offsets) .unwrap(); diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index 124cc536f327..205ae11759a1 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -135,8 +135,7 @@ //! assert_eq!(rows.len(), 3); //! //! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body -//! for row in rows.rows() { -//! let row = row?; +//! for row in rows.iter() { //! assert_eq!(row[0], 0x00); // Confluent magic byte //! } //! # Ok(()) @@ -162,19 +161,20 @@ mod encoder; /// Logic for different Avro container file formats. pub mod format; -/// A contiguous set of encoded rows. +/// A contiguous set of Avro encoded rows. /// /// `EncodedRows` stores: /// - a single backing byte buffer (`bytes::Bytes`) -/// - a `Vec` of row boundary offsets (length = `rows + 1`) +/// - a `Vec` of row boundary offsets (length = `rows + 1`) /// /// This lets callers get per-row payloads as zero-copy `Bytes` slices. /// -/// For compatibility with APIs that require owned `Vec`, use [`EncodedRows::to_vecs`]. +/// For compatibility with APIs that require owned `Vec`, use: +/// `let vecs: Vec> = rows.iter().map(|b| b.to_vec()).collect();` #[derive(Debug, Clone)] pub struct EncodedRows { data: Bytes, - offsets: Vec, + offsets: Vec, } impl EncodedRows { @@ -182,41 +182,50 @@ impl EncodedRows { /// /// `offsets` must have length `rows + 1`, and be monotonically non-decreasing. /// The last offset should equal `data.len()`. - pub fn new(data: Bytes, offsets: Vec) -> Self { + pub fn new(data: Bytes, offsets: Vec) -> Self { Self { data, offsets } } - /// Number of rows in this buffer. + /// Returns the number of encoded rows stored in this container. #[inline] pub fn len(&self) -> usize { self.offsets.len().saturating_sub(1) } - /// Returns `true` if there are no rows. + /// Returns `true` if this container holds no encoded rows. #[inline] pub fn is_empty(&self) -> bool { self.len() == 0 } - /// Returns the backing buffer. + /// Returns a reference to the single contiguous backing buffer. /// - /// Note: individual rows should typically be accessed via [`Self::row`] or [`Self::rows`]. + /// This buffer contains the payloads of all rows concatenated together. + /// + /// # Note + /// + /// To access individual row payloads, prefer using [`Self::row`] or [`Self::iter`] + /// rather than slicing this buffer manually. #[inline] pub fn bytes(&self) -> &Bytes { &self.data } - /// Returns the row boundary offsets (length = `len() + 1`). + /// Returns the row boundary offsets. + /// + /// The returned slice always has the length `self.len() + 1`. The `n`th row payload + /// corresponds to `bytes[offsets[n] ... offsets[n+1]]`. #[inline] - pub fn offsets(&self) -> &[u64] { + pub fn offsets(&self) -> &[usize] { &self.offsets } - /// Return the `i`th row as a zero-copy `Bytes` slice. + /// Return the `n`th row as a zero-copy `Bytes` slice. /// /// # Errors /// - /// Returns an error if the row offsets are invalid (e.g. exceed `usize::MAX`). + /// Returns an error if `n` is out of bounds or if the internal offsets are invalid + /// (e.g., offsets are not within the backing buffer). /// /// # Examples /// @@ -238,76 +247,40 @@ impl EncodedRows { /// encoder.encode(&batch)?; /// let rows = encoder.flush(); /// - /// // Access the first row (index 0) - /// let row0 = rows.row(0)?; - /// assert!(!row0.is_empty()); + /// assert_eq!(rows.iter().count(), 2); /// # Ok(()) /// # } /// ``` - pub fn row(&self, i: usize) -> Result { - if i >= self.len() { + pub fn row(&self, n: usize) -> Result { + if n >= self.len() { return Err(ArrowError::AvroError(format!( - "Row index {i} out of bounds for len {}", + "Row index {n} out of bounds for len {}", self.len() ))); } // SAFETY: // self.len() is defined as self.offsets.len().saturating_sub(1). - // The check `i >= self.len()` above ensures that `i < self.offsets.len() - 1`. - // Therefore, both `i` and `i + 1` are strictly within the bounds of `self.offsets`. - let (start_u64, end_u64) = unsafe { + // The check `n >= self.len()` above ensures that `n < self.offsets.len() - 1`. + // Therefore, both `n` and `n + 1` are strictly within the bounds of `self.offsets`. + let (start, end) = unsafe { ( - *self.offsets.get_unchecked(i), - *self.offsets.get_unchecked(i + 1), + *self.offsets.get_unchecked(n), + *self.offsets.get_unchecked(n + 1), ) }; - let start = usize::try_from(start_u64).map_err(|e| { - ArrowError::AvroError(format!("row start offset does not fit in usize: {e}")) - })?; - let end = usize::try_from(end_u64).map_err(|e| { - ArrowError::AvroError(format!("row end offset does not fit in usize: {e}")) - })?; + if start > end || end > self.data.len() { + return Err(ArrowError::AvroError(format!( + "Invalid row offsets for row {n}: start={start}, end={end}, data_len={}", + self.data.len() + ))); + } Ok(self.data.slice(start..end)) } /// Iterate over rows as zero-copy `Bytes` slices. /// - /// # Examples - /// - /// ``` - /// use std::sync::Arc; - /// use arrow_array::{ArrayRef, Int32Array, RecordBatch}; - /// use arrow_schema::{DataType, Field, Schema}; - /// use arrow_avro::writer::WriterBuilder; - /// use arrow_avro::writer::format::AvroSoeFormat; - /// - /// # fn main() -> Result<(), Box> { - /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); - /// let batch = RecordBatch::try_new( - /// Arc::new(schema.clone()), - /// vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef], - /// )?; - /// - /// let mut encoder = WriterBuilder::new(schema).build_encoder::()?; - /// encoder.encode(&batch)?; - /// let rows = encoder.flush(); - /// - /// let mut count = 0; - /// for row in rows.rows() { - /// let _bytes = row?; - /// count += 1; - /// } - /// assert_eq!(count, 2); - /// # Ok(()) - /// # } - /// ``` - pub fn rows(&self) -> impl Iterator> + '_ { - (0..self.len()).map(|i| self.row(i)) - } - - /// Copy all rows into independent `Vec` buffers. - /// - /// This is useful for compatibility with APIs that require owned, mutable byte vectors. + /// This iterator is infallible and is intended for the common case where + /// `EncodedRows` is produced by [`Encoder::flush`], which guarantees valid offsets. /// /// # Examples /// @@ -322,25 +295,23 @@ impl EncodedRows { /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); /// let batch = RecordBatch::try_new( /// Arc::new(schema.clone()), - /// vec![Arc::new(Int32Array::from(vec![100])) as ArrayRef], + /// vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef], /// )?; /// /// let mut encoder = WriterBuilder::new(schema).build_encoder::()?; /// encoder.encode(&batch)?; /// let rows = encoder.flush(); /// - /// let vecs = rows.to_vecs()?; - /// assert_eq!(vecs.len(), 1); - /// assert!(!vecs[0].is_empty()); + /// assert_eq!(rows.iter().count(), 2); /// # Ok(()) /// # } /// ``` - pub fn to_vecs(&self) -> Result>, ArrowError> { - let mut out = Vec::with_capacity(self.len()); - for i in 0..self.len() { - out.push(self.row(i)?.to_vec()); - } - Ok(out) + #[inline] + pub fn iter(&self) -> impl ExactSizeIterator + '_ { + self.offsets.windows(2).map(|w| { + debug_assert!(w[0] <= w[1] && w[1] <= self.data.len()); + self.data.slice(w[0]..w[1]) + }) } } @@ -480,21 +451,58 @@ impl WriterBuilder { } } -/// A row-by-row streaming encoder for Avro **Single Object Encoding** (SOE) streams. +/// A row-by-row encoder for Avro *stream/message* formats (SOE / registry wire formats / raw binary). +/// +/// Unlike [`Writer`], which emits a single continuous byte stream to a [`std::io::Write`] sink, +/// `Encoder` tracks row boundaries during encoding and returns an [`EncodedRows`] containing: +/// - one backing buffer (`Bytes`) +/// - row boundary offsets +/// +/// This enables zero-copy per-row payloads (for instance, one Kafka message per Arrow row) without +/// re-encoding or decoding the byte stream to recover record boundaries. +/// +/// ### Example +/// +/// ``` +/// use std::sync::Arc; +/// use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +/// use arrow_schema::{DataType, Field, Schema}; +/// use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; +/// use arrow_avro::schema::FingerprintStrategy; +/// +/// # fn main() -> Result<(), Box> { +/// let schema = Schema::new(vec![Field::new("value", DataType::Int32, false)]); +/// let batch = RecordBatch::try_new( +/// Arc::new(schema.clone()), +/// vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], +/// )?; +/// +/// // Configure the encoder (here: Confluent Wire Format with schema ID 100) +/// let mut encoder = WriterBuilder::new(schema) +/// .with_fingerprint_strategy(FingerprintStrategy::Id(100)) +/// .build_encoder::()?; /// -/// Unlike [`Writer`], which writes directly to an output sink, `Encoder` buffers rows in a -/// single contiguous backing buffer and returns per-row payloads as zero-copy [`bytes::Bytes`] -/// slices via [`EncodedRows`]. +/// // Encode the batch +/// encoder.encode(&batch)?; /// -/// To get owned `Vec` payloads for compatibility with APIs that require owned buffers, -/// call [`EncodedRows::to_vecs`]. +/// // Get the encoded rows +/// let rows = encoder.flush(); +/// +/// // Convert to owned Vec payloads (e.g., for a Kafka producer) +/// let payloads: Vec> = rows.iter().map(|row| row.to_vec()).collect(); +/// +/// assert_eq!(payloads.len(), 3); +/// assert_eq!(payloads[0][0], 0x00); // Magic byte +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] pub struct Encoder { schema: SchemaRef, encoder: RecordEncoder, row_capacity: Option, buffer: BytesMut, - offsets: Vec, + offsets: Vec, } impl Encoder { @@ -525,7 +533,6 @@ impl Encoder { /// Drain and return all currently buffered encoded rows. /// /// The returned [`EncodedRows`] provides per-row payloads as `Bytes` slices. - /// For owned buffers, use [`EncodedRows::to_vecs`]. pub fn flush(&mut self) -> EncodedRows { let data = self.buffer.split().freeze(); let mut offsets = Vec::with_capacity(self.offsets.len()); @@ -2877,8 +2884,7 @@ mod tests { let mut row_writer = WriterBuilder::new(schema).build_encoder::()?; row_writer.encode(&batch)?; let rows = row_writer.flush(); - // NOTE: EncodedRows is now zero-copy; `to_vecs()` is an explicit compatibility copy. - let row_bytes: Vec = rows.to_vecs()?.concat(); + let row_bytes: Vec = rows.bytes().to_vec(); assert_eq!(stream_bytes, row_bytes); Ok(()) } @@ -2910,8 +2916,7 @@ mod tests { .with_writer_schema_store(store) .with_batch_size(1024) .build_decoder()?; - for row in rows.rows() { - let row = row?; + for row in rows.iter() { let consumed = decoder.decode(row.as_ref())?; assert_eq!( consumed, @@ -2938,8 +2943,7 @@ mod tests { let mut stream: Vec = Vec::new(); let mut boundaries: Vec = Vec::with_capacity(rows.len() + 1); boundaries.push(0usize); - for row in rows.rows() { - let row = row?; + for row in rows.iter() { stream.extend_from_slice(row.as_ref()); boundaries.push(stream.len()); } @@ -2994,8 +2998,7 @@ mod tests { .with_writer_schema_store(store) .with_batch_size(1024) .build_decoder()?; - for row in rows.rows() { - let row = row?; + for row in rows.iter() { let consumed = decoder.decode(row.as_ref())?; assert_eq!(consumed, row.len()); } @@ -3005,7 +3008,6 @@ mod tests { assert_eq!(expected, actual); Ok(()) } - #[test] fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format() -> Result<(), ArrowError> { @@ -3037,9 +3039,10 @@ mod tests { assert_eq!(EncodedRows::len(&empty), 0); assert!(EncodedRows::is_empty(&empty)); assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]); - assert_eq!(EncodedRows::offsets(&empty), &[0_u64]); - assert_eq!(EncodedRows::rows(&empty).count(), 0); - assert!(EncodedRows::to_vecs(&empty)?.is_empty()); + assert_eq!(EncodedRows::offsets(&empty), &[0usize]); + assert_eq!(EncodedRows::iter(&empty).count(), 0); + let empty_vecs: Vec> = empty.iter().map(|b| b.to_vec()).collect(); + assert!(empty_vecs.is_empty()); let batches = vec![batch1, batch2]; Encoder::encode_batches(&mut encoder, &batches)?; assert_eq!(encoder.buffered_len(), 5); @@ -3051,7 +3054,7 @@ mod tests { ); assert_eq!(EncodedRows::len(&rows), 5); assert!(!EncodedRows::is_empty(&rows)); - let expected_offsets: &[u64] = &[0, 2, 4, 6, 8, 10]; + let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10]; assert_eq!(EncodedRows::offsets(&rows), expected_offsets); let expected_rows: Vec> = vec![ vec![2, 20], @@ -3068,13 +3071,8 @@ mod tests { for (i, expected) in expected_rows.iter().enumerate() { assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice()); } - let iter_rows: Vec> = EncodedRows::rows(&rows) - .collect::, _>>()? - .into_iter() - .map(|b| b.to_vec()) - .collect(); + let iter_rows: Vec> = EncodedRows::iter(&rows).map(|b| b.to_vec()).collect(); assert_eq!(iter_rows, expected_rows); - assert_eq!(EncodedRows::to_vecs(&rows)?, expected_rows); let recreated = EncodedRows::new( EncodedRows::bytes(&rows).clone(), EncodedRows::offsets(&rows).to_vec(), @@ -3085,10 +3083,8 @@ mod tests { EncodedRows::offsets(&recreated), EncodedRows::offsets(&rows) ); - assert_eq!( - EncodedRows::to_vecs(&recreated)?, - EncodedRows::to_vecs(&rows)? - ); + let rec_vecs: Vec> = recreated.iter().map(|b| b.to_vec()).collect(); + assert_eq!(rec_vecs, iter_rows); let empty_again = Encoder::flush(&mut encoder); assert!(EncodedRows::is_empty(&empty_again)); Ok(()) @@ -3161,8 +3157,7 @@ mod tests { .with_writer_schema_store(store) .with_batch_size(1024) .build_decoder()?; - for body in binary_rows.rows() { - let body = body?; + for body in binary_rows.iter() { let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len()); framed.extend_from_slice(&SOE_MAGIC); framed.extend_from_slice(&fp_le_bytes); @@ -3198,8 +3193,7 @@ mod tests { const SOE_MAGIC: [u8; 2] = [0xC3, 0x01]; const SOE_PREFIX_LEN: usize = 2 + 8; let mut stream: Vec = Vec::new(); - for body in rows.rows() { - let body = body?; + for body in rows.iter() { let msg_len: u32 = (SOE_PREFIX_LEN + body.len()) .try_into() .expect("message length must fit in u32"); From b51f14ff003572b8c3a600a527c655a2f661c089 Mon Sep 17 00:00:00 2001 From: Connor Sanders <170039284+jecsand838@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:37:52 -0600 Subject: [PATCH 4/4] Address PR Comments --- arrow-avro/src/writer/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index 205ae11759a1..eedc82a04c49 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -308,10 +308,7 @@ impl EncodedRows { /// ``` #[inline] pub fn iter(&self) -> impl ExactSizeIterator + '_ { - self.offsets.windows(2).map(|w| { - debug_assert!(w[0] <= w[1] && w[1] <= self.data.len()); - self.data.slice(w[0]..w[1]) - }) + self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1])) } }