Skip to content

Commit 016d0f6

Browse files
Kriskras99martin-g
andauthored
feat!: Support enums and tuples in SchemaAwareSerializer and implement SchemaAwareDeserializer (#512)
* doc: Document the mapping between the Serde and Avro data models * feat: `SchemaAwareDeserializer` (no tests) * feat: Use `SchemaAwareDeserializer` in the readers and add tests * feat: Rework `SchemaAwareSerializer` to be more strict and follow the documented model (no tests) * feat: Update tests and provide support for old ways of `BigDecimal` and `[T; N]` * fix: Add missing license header and set PR number for tests * fix: Don't call `T::field_default()` if the schema is overwritten using `#[avro(with)]` * fix: Enable `rustdoc_internal` feature when `cfg(docsrs)` so we can use `fake_variadic` * fix: Review feedback --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> * fix: Apply suggestions from code review Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> * fix: Second round of code review Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> * fix: Use longs when serializing block item count and byte size * fix: third round of review feedback Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> * fix: Don't allow serializing a unit variant inside the UnionSerializer as that would require a nested union * chore: Enable and fix Clippy lints * fix: fourth round of review feedback * fix: Typo Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
1 parent e2901d3 commit 016d0f6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+7633
-3150
lines changed

Cargo.toml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,13 @@ clippy.too_long_first_doc_paragraph = "warn"
5858
clippy.doc_markdown = "warn"
5959
# TODO: Needs more work
6060
#clippy.missing_errors_doc = "warn"
61-
#clippy.missing_panics_doc = "warn"
61+
clippy.missing_panics_doc = "warn"
6262
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(nightly)'] }
6363
clippy.cargo = { level = "warn", priority = -1 }
6464
clippy.multiple_crate_versions = "allow"
65-
# clippy.needless_raw_string_hashes = "warn"
66-
# clippy.missing_panics_doc = "warn"
67-
# clippy.semicolon_if_nothing_returned = "warn"
68-
# clippy.manual_assert = "warn"
69-
# clippy.enum_glob_use = "warn"
70-
# clippy.needless_pass_by_value = "warn"
71-
# clippy.single_match_else = "warn"
65+
clippy.needless_raw_string_hashes = "warn"
66+
clippy.semicolon_if_nothing_returned = "warn"
67+
clippy.manual_assert = "warn"
68+
clippy.enum_glob_use = "warn"
69+
clippy.needless_pass_by_value = "warn"
70+
clippy.single_match_else = "warn"

avro/benches/serde.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ fn bench_small_schema_write_10_000_record_ser(c: &mut Criterion) {
360360
10_000,
361361
"small schema, write 10k records (serde way)",
362362
)
363-
.unwrap()
363+
.unwrap();
364364
}
365365

366366
fn bench_small_schema_read_1_record(c: &mut Criterion) {

avro/benches/single.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ fn bench_small_schema_write_record(c: &mut Criterion) {
180180
.build()
181181
.unwrap()
182182
.write_value_to_vec(record.clone())
183-
})
183+
});
184184
});
185185
}
186186

@@ -192,23 +192,23 @@ fn bench_big_schema_write_record(c: &mut Criterion) {
192192
.build()
193193
.unwrap()
194194
.write_value_to_vec(record.clone())
195-
})
195+
});
196196
});
197197
}
198198

199199
fn bench_small_schema_write_record_reuse_datum_writer(c: &mut Criterion) {
200200
let (schema, record) = make_small_record().unwrap();
201201
let writer = GenericDatumWriter::builder(&schema).build().unwrap();
202202
c.bench_function("small record (reused writer)", |b| {
203-
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record))
203+
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record));
204204
});
205205
}
206206

207207
fn bench_big_schema_write_record_reuse_datum_writer(c: &mut Criterion) {
208208
let (schema, record) = make_big_record().unwrap();
209209
let writer = GenericDatumWriter::builder(&schema).build().unwrap();
210210
c.bench_function("big record (reused writer)", |b| {
211-
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record))
211+
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record));
212212
});
213213
}
214214

@@ -219,7 +219,7 @@ fn bench_small_schema_write_record_no_validation(c: &mut Criterion) {
219219
.build()
220220
.unwrap();
221221
c.bench_function("small record (no validation)", |b| {
222-
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record))
222+
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record));
223223
});
224224
}
225225

@@ -230,7 +230,7 @@ fn bench_big_schema_write_record_no_validation(c: &mut Criterion) {
230230
.build()
231231
.unwrap();
232232
c.bench_function("big record (no validation)", |b| {
233-
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record))
233+
b.iter(|| writer.write_value_ref(&mut Vec::new(), &record));
234234
});
235235
}
236236

avro/examples/test_interop_single_object_encoding.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn main() -> Result<(), Box<dyn Error>> {
5858
let single_object = std::fs::read(format!("{resource_folder}/test_message.bin"))
5959
.expect("File with single object not found or error occurred while reading it.");
6060
test_write(&single_object);
61-
test_read(single_object);
61+
test_read(&single_object);
6262

6363
Ok(())
6464
}
@@ -69,15 +69,16 @@ fn test_write(expected: &[u8]) {
6969
.expect("Resolving failed")
7070
.write_value(InteropMessage, &mut encoded)
7171
.expect("Encoding failed");
72-
assert_eq!(expected, &encoded)
72+
assert_eq!(expected, &encoded);
7373
}
7474

75-
fn test_read(encoded: Vec<u8>) {
76-
let mut encoded = &encoded[..];
77-
let read_message = apache_avro::GenericSingleObjectReader::new(InteropMessage::get_schema())
75+
fn test_read(mut encoded: &[u8]) {
76+
let read_message = apache_avro::GenericSingleObjectReader::builder()
77+
.schema(InteropMessage::get_schema())
78+
.build()
7879
.expect("Resolving failed")
7980
.read_value(&mut encoded)
8081
.expect("Decoding failed");
8182
let expected_value: Value = InteropMessage.into();
82-
assert_eq!(expected_value, read_message)
83+
assert_eq!(expected_value, read_message);
8384
}

avro/src/bigdecimal.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,23 @@ pub(crate) fn deserialize_big_decimal(mut bytes: &[u8]) -> AvroResult<BigDecimal
6969

7070
#[cfg(test)]
7171
mod tests {
72-
use super::*;
73-
use crate::{Codec, Reader, Schema, Writer, error::Error, from_value, types::Record};
74-
use apache_avro_test_helper::TestResult;
75-
use bigdecimal::{One, Zero};
76-
use pretty_assertions::assert_eq;
7772
use std::{
7873
fs::File,
7974
io::BufReader,
8075
ops::{Div, Mul},
8176
str::FromStr,
8277
};
8378

79+
use apache_avro_test_helper::TestResult;
80+
use bigdecimal::{One, Zero};
81+
use pretty_assertions::assert_eq;
82+
83+
use super::*;
84+
use crate::{
85+
Codec, Reader, Schema, Writer, error::Error, reader::datum::GenericDatumReader,
86+
types::Record, writer::datum::GenericDatumWriter,
87+
};
88+
8489
#[test]
8590
fn test_avro_3779_bigdecimal_serial() -> TestResult {
8691
let value: BigDecimal =
@@ -190,6 +195,43 @@ mod tests {
190195
big_decimal: BigDecimal,
191196
}
192197

198+
let schema_str = r#"
199+
{
200+
"type": "record",
201+
"name": "Test",
202+
"fields": [
203+
{
204+
"name": "big_decimal",
205+
"type": "string"
206+
}
207+
]
208+
}
209+
"#;
210+
let schema = Schema::parse_str(schema_str)?;
211+
212+
let test = Test::default();
213+
214+
let serialized = GenericDatumWriter::builder(&schema)
215+
.build()?
216+
.write_ser_to_vec(&test)?;
217+
let value: Test = GenericDatumReader::builder(&schema)
218+
.build()?
219+
.read_deser(&mut &serialized[..])?;
220+
221+
assert_eq!(value, test);
222+
223+
Ok(())
224+
}
225+
226+
#[test]
227+
fn avro_rs_338_deserialize_serde_way_with_bigdecimal() -> TestResult {
228+
#[derive(Clone, PartialEq, Eq, Debug, Default, serde::Deserialize, serde::Serialize)]
229+
#[serde(rename = "test")]
230+
struct Test {
231+
#[serde(with = "crate::serde::bigdecimal")]
232+
big_decimal: BigDecimal,
233+
}
234+
193235
let schema_str = r#"
194236
{
195237
"type": "record",
@@ -216,11 +258,11 @@ mod tests {
216258
let wrote_data = writer.into_inner()?;
217259

218260
// read record
219-
let mut reader = Reader::new(&wrote_data[..])?;
261+
let mut reader = Reader::new(&wrote_data[..])?.into_deser_iter();
220262

221263
let value = reader.next().unwrap()?;
222264

223-
assert_eq!(test, from_value::<Test>(&value)?);
265+
assert_eq!(test, value);
224266

225267
Ok(())
226268
}

avro/src/codec.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ impl Codec {
108108
#[cfg(feature = "zstandard")]
109109
Codec::Zstandard(settings) => {
110110
use std::io::Write;
111-
let mut encoder =
112-
zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
111+
let mut encoder = zstd::Encoder::new(Vec::new(), settings.compression_level as i32)
112+
.map_err(Details::ZstdCompress)?;
113113
encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114-
*stream = encoder.finish().unwrap();
114+
*stream = encoder.finish().map_err(Details::ZstdCompress)?;
115115
}
116116
#[cfg(feature = "bzip")]
117117
Codec::Bzip2(settings) => {
@@ -120,7 +120,9 @@ impl Codec {
120120

121121
let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122122
let mut buffer = Vec::new();
123-
encoder.read_to_end(&mut buffer).unwrap();
123+
encoder
124+
.read_to_end(&mut buffer)
125+
.unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
124126
*stream = buffer;
125127
}
126128
#[cfg(feature = "xz")]
@@ -130,7 +132,9 @@ impl Codec {
130132

131133
let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
132134
let mut buffer = Vec::new();
133-
encoder.read_to_end(&mut buffer).unwrap();
135+
encoder
136+
.read_to_end(&mut buffer)
137+
.unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
134138
*stream = buffer;
135139
}
136140
};
@@ -144,7 +148,7 @@ impl Codec {
144148
Codec::Null => return Ok(()),
145149
Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
146150
let err = {
147-
use miniz_oxide::inflate::TINFLStatus::*;
151+
use miniz_oxide::inflate::TINFLStatus::{FailedCannotMakeProgress, BadParam, Adler32Mismatch, Failed, Done, NeedsMoreInput, HasMoreOutput};
148152
use std::io::{Error,ErrorKind};
149153
match e.status {
150154
FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
@@ -189,7 +193,7 @@ impl Codec {
189193
let mut decoded = Vec::new();
190194
let buffer_size = zstd_safe::DCtx::in_size();
191195
let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
192-
let mut decoder = zstd::Decoder::new(buffer).unwrap();
196+
let mut decoder = zstd::Decoder::new(buffer).map_err(Details::ZstdDecompress)?;
193197
std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
194198
decoded
195199
}
@@ -200,7 +204,7 @@ impl Codec {
200204

201205
let mut decoder = BzDecoder::new(&stream[..]);
202206
let mut decoded = Vec::new();
203-
decoder.read_to_end(&mut decoded).unwrap();
207+
decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
204208
decoded
205209
}
206210
#[cfg(feature = "xz")]
@@ -210,7 +214,7 @@ impl Codec {
210214

211215
let mut decoder = XzDecoder::new(&stream[..]);
212216
let mut decoded: Vec<u8> = Vec::new();
213-
decoder.read_to_end(&mut decoded).unwrap();
217+
decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
214218
decoded
215219
}
216220
};
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # Mapping the Avro data model to the Serde data model
19+
//!
20+
//! When manually mapping an Avro schema to Rust types it is important to understand
21+
//! how the different data models are mapped. When mapping from Rust types to an Avro schema,
22+
//! see [the documentation for the reverse](super::serde_data_model_to_avro).
23+
//!
24+
//! Only the mapping as defined here is supported. Any other behavior might change in a minor version.
25+
//!
26+
//! ## Primitive Types
27+
//! - `null`: `()`
28+
//! - `boolean`: [`bool`]
29+
//! - `int`: [`i32`] (or [`i16`], [`i8`], [`u16`], [`u8`])
30+
//! - `long`: [`i64`] (or [`u32`])
31+
//! - `float`: [`f32`]
32+
//! - `double`: [`f64`]
33+
//! - `bytes`: [`Vec<u8>`](std::vec::Vec) (or any type that uses [`Serializer::serialize_bytes`](serde::Serializer), [`Deserializer::deserialize_bytes`](serde::Deserializer), [`Deserializer::deserialize_byte_buf`](serde::Deserializer))
34+
//! - It is required to use [`apache_avro::serde::bytes`] as otherwise Serde will (de)serialize a `Vec` as an array of integers instead.
35+
//! - `string`: [`String`] (or any type that uses [`Serializer::serialize_str`](serde::Serializer), [`Deserializer::deserialize_str`](serde::Deserializer), [`Deserializer::deserialize_string`](serde::Deserializer))
36+
//!
37+
//! ## Complex Types
38+
//! - `records`: A struct with the same name and fields or a tuple with the same fields.
39+
//! - Extra fields can be added to the struct if they are marked with `#[serde(skip)]`
40+
//! - `enums`: A enum with the same name and unit variants for every symbol
41+
//! - The index of the symbol must match the index of the variant
42+
//! - `arrays`: [`Vec`] (or any type that uses [`Serializer::serialize_seq`](serde::Serializer), [`Deserializer::deserialize_seq`](serde::Deserializer))
43+
//! - `[T; N]` is (de)serialized as a tuple by Serde, to (de)serialize them as an `array` use [`apache_avro::serde::array`]
44+
//! - `maps`: [`HashMap<String, _>`](std::collections::HashMap) (or any type that uses [`Serializer::serialize_map`](serde::Serializer), [`Deserializer::deserialize_map`](serde::Deserializer))
45+
//! - `unions`: An enum with a variant for each union variant
46+
//! - The index of the union variant must match the enum variant
47+
//! - A `null` can be a unit variant or a newtype variant with a unit type
48+
//! - All other variants must be newtype variants, struct variants, or tuple variants.
49+
//! - `fixed`: [`Vec<u8>`](std::vec::Vec) (or any type that uses [`Serializer::serialize_bytes`](serde::Serializer), [`Deserializer::deserialize_bytes`](serde::Deserializer), [`Deserializer::deserialize_byte_buf`](serde::Deserializer))
50+
//! - It is required to use [`apache_avro::serde::fixed`] as otherwise Serde will (de)serialize a `Vec` as an array of integers instead.
51+
//!
52+
//! [`apache_avro::serde::array`]: crate::serde::array
53+
//! [`apache_avro::serde::bytes`]: crate::serde::bytes
54+
//! [`apache_avro::serde::fixed`]: crate::serde::fixed

avro/src/documentation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@
2020
//! This module does not contain any code, and is only available during `rustdoc` builds.
2121
//!
2222
23+
pub mod avro_data_model_to_serde;
2324
pub mod dynamic;
2425
pub mod primer;
26+
pub mod serde_data_model_to_avro;

0 commit comments

Comments
 (0)