Skip to content

Commit fab8e75

Browse files
jecsand838alamb
andauthored
Add BinaryFormatSupport and Row Encoder to arrow-avro Writer (#9171)
# Which issue does this PR close? - Closes #8701. # Rationale for this change `arrow-avro` already supports writing Avro Object Container Files (OCF) and framed streaming encodings (e.g. Single-Object Encoding / registry wire formats). However, many systems exchange **raw Avro binary datum payloads** (i.e. *only* the Avro record body bytes) while supplying the schema out-of-band (configuration, RPC contract, topic metadata, etc.). Without first-class support for unframed datum output, users must either: - accept framing overhead that downstream systems don’t expect, or - re-implement datum encoding themselves. This PR adds the missing unframed write path and exposes a row-by-row encoding API to make it easy to embed Avro datums into other transport protocols. # What changes are included in this PR? - Added `AvroBinaryFormat` (unframed) as an `AvroFormat` implementation to emit **raw Avro record body bytes** (no SOE prefix and no OCF header) and to explicitly reject container-level compression for this format. - Added `RecordEncoder::encode_rows` to encode a `RecordBatch` into a single contiguous buffer while tracking per-row boundaries via appended offsets. - Introduced a higher-level `Encoder` + `EncodedRows` API for row-by-row streaming use cases, providing zero-copy access to individual row slices (via `Bytes`). - Updated the writer API to provide `build_encoder` for stream formats (e.g. SOE) and added row-capacity configuration to better support incremental/streaming workflows. - Added the `bytes` crate as a dependency to support efficient buffering and slicing in the row encoder, and adjusted dev-dependencies to support the new tests/docs. # Are these changes tested? Yes. This PR adds unit tests that cover: - single- and multi-column row encoding - nullable columns - prefix-based vs. unprefixed row encoding behavior - empty batch encoding - appending to existing output buffers and validating offset invariants # Are there any user-facing changes? Yes, these changes are additive (no breaking public API changes expected). - New writer format support for **unframed Avro binary datum** output (`AvroBinaryFormat`). - New row-by-row encoding APIs (`RecordEncoder::encode_rows`, `Encoder`, `EncodedRows`) to support zero-copy access to per-row encoded bytes. - New `WriterBuilder` functionality (`build_encoder` + row-capacity configuration) to enable encoder construction without committing to a specific `Write` sink. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent fe126d0 commit fab8e75

File tree

5 files changed

+1485
-126
lines changed

5 files changed

+1485
-126
lines changed

arrow-avro/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ indexmap = "2.10"
6666
rand = "0.9"
6767
md5 = { version = "0.8", optional = true }
6868
sha2 = { version = "0.10", optional = true }
69+
bytes = "1.11"
6970

7071
[dev-dependencies]
7172
arrow-data = { workspace = true }
@@ -76,9 +77,8 @@ rand = { version = "0.9.1", default-features = false, features = [
7677
] }
7778
criterion = { workspace = true, default-features = false }
7879
tempfile = "3.3"
79-
arrow = { workspace = true }
80+
arrow = { workspace = true, features = ["prettyprint"] }
8081
futures = "0.3.31"
81-
bytes = "1.10.1"
8282
async-stream = "0.3.6"
8383
apache-avro = "0.21.0"
8484
num-bigint = "0.4"
@@ -95,3 +95,7 @@ harness = false
9595
[[bench]]
9696
name = "avro_writer"
9797
harness = false
98+
99+
[[bench]]
100+
name = "encoder"
101+
harness = false

arrow-avro/benches/encoder.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for the `arrow-avro` Encoder
19+
20+
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
21+
use arrow_avro::writer::format::AvroSoeFormat;
22+
use arrow_avro::writer::{EncodedRows, WriterBuilder};
23+
use arrow_schema::{DataType, Field, Schema};
24+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
25+
use once_cell::sync::Lazy;
26+
use std::hint::black_box;
27+
use std::sync::Arc;
28+
use std::time::Duration;
29+
30+
const SIZES: [usize; 4] = [1_000, 10_000, 100_000, 1_000_000];
31+
32+
/// Pre-generate EncodedRows for each size to avoid setup overhead in benchmarks.
33+
static ENCODED_DATA: Lazy<Vec<EncodedRows>> =
34+
Lazy::new(|| SIZES.iter().map(|&n| make_encoded_rows(n)).collect());
35+
36+
/// Create an EncodedRows with `n` rows of Int32 data.
37+
fn make_encoded_rows(n: usize) -> EncodedRows {
38+
let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
39+
let values: Vec<i32> = (0..n as i32).collect();
40+
let batch = RecordBatch::try_new(
41+
Arc::new(schema.clone()),
42+
vec![Arc::new(Int32Array::from(values)) as ArrayRef],
43+
)
44+
.unwrap();
45+
let mut encoder = WriterBuilder::new(schema)
46+
.build_encoder::<AvroSoeFormat>()
47+
.unwrap();
48+
encoder.encode(&batch).unwrap();
49+
encoder.flush()
50+
}
51+
52+
fn bench_row_access(c: &mut Criterion) {
53+
let mut group = c.benchmark_group("row_access");
54+
for (idx, &size) in SIZES.iter().enumerate() {
55+
let encoded = &ENCODED_DATA[idx];
56+
let num_rows = encoded.len();
57+
// Configure sampling based on data size
58+
match size {
59+
100_000 | 1_000_000 => {
60+
group
61+
.sample_size(20)
62+
.measurement_time(Duration::from_secs(10))
63+
.warm_up_time(Duration::from_secs(3));
64+
}
65+
_ => {
66+
group.sample_size(100);
67+
}
68+
}
69+
group.throughput(Throughput::Elements(num_rows as u64));
70+
group.bench_function(BenchmarkId::from_parameter(size), |b| {
71+
b.iter(|| {
72+
for i in 0..num_rows {
73+
black_box(encoded.row(i).unwrap());
74+
}
75+
})
76+
});
77+
}
78+
group.finish();
79+
}
80+
81+
criterion_group! {
82+
name = encoder;
83+
config = Criterion::default().configure_from_args();
84+
targets = bench_row_access
85+
}
86+
87+
criterion_main!(encoder);

0 commit comments

Comments
 (0)