Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion avro/benches/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ fn write_ser<T: Serialize>(schema: &Schema, records: &[T]) -> AvroResult<Vec<u8>
}

fn read(schema: &Schema, bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let reader = Reader::with_schema(schema, bytes)?;
let reader = Reader::builder(bytes).schema(schema).build()?;

for record in reader {
let _ = record?;
Expand Down
4 changes: 3 additions & 1 deletion avro/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ fn benchmark(

for _ in 0..runs {
let start = Instant::now();
let reader = Reader::with_schema(schema, BufReader::new(&bytes[..]))?;
let reader = Reader::builder(BufReader::new(&bytes[..]))
.schema(schema)
.build()?;

let mut read_records = Vec::with_capacity(count);
for record in reader {
Expand Down
8 changes: 4 additions & 4 deletions avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod tests {
#[test]
fn test_decode_array_without_size() -> TestResult {
let mut input: &[u8] = &[6, 2, 4, 6, 0];
let result = decode(&Schema::array(Schema::Int), &mut input);
let result = decode(&Schema::array(Schema::Int).build(), &mut input);
assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?);

Ok(())
Expand All @@ -385,7 +385,7 @@ mod tests {
#[test]
fn test_decode_array_with_size() -> TestResult {
let mut input: &[u8] = &[5, 6, 2, 4, 6, 0];
let result = decode(&Schema::array(Schema::Int), &mut input);
let result = decode(&Schema::array(Schema::Int).build(), &mut input);
assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?);

Ok(())
Expand All @@ -394,7 +394,7 @@ mod tests {
#[test]
fn test_decode_map_without_size() -> TestResult {
let mut input: &[u8] = &[0x02, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00];
let result = decode(&Schema::map(Schema::Int), &mut input);
let result = decode(&Schema::map(Schema::Int).build(), &mut input);
let mut expected = HashMap::new();
expected.insert(String::from("test"), Int(1));
assert_eq!(Map(expected), result?);
Expand All @@ -405,7 +405,7 @@ mod tests {
#[test]
fn test_decode_map_with_size() -> TestResult {
let mut input: &[u8] = &[0x01, 0x0C, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00];
let result = decode(&Schema::map(Schema::Int), &mut input);
let result = decode(&Schema::map(Schema::Int).build(), &mut input);
let mut expected = HashMap::new();
expected.insert(String::from("test"), Int(1));
assert_eq!(Map(expected), result?);
Expand Down
4 changes: 2 additions & 2 deletions avro/src/documentation/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@
//! let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();
//!
//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
//! let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
//! let reader = Reader::builder(&input[..]).schema(&reader_schema).build().unwrap();
//!
//! // value is a Result of an Avro Value in case the read operation fails
//! for value in reader {
Expand Down Expand Up @@ -268,7 +268,7 @@
//! writer.append_ser(test)?;
//!
//! let input = writer.into_inner()?;
//! let reader = Reader::with_schema(&schema, &input[..])?;
//! let reader = Reader::builder(&input[..]).schema(&schema).build()?;
//!
//! for record in reader {
//! println!("{:?}", from_value::<Test>(&record?));
Expand Down
14 changes: 10 additions & 4 deletions avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,13 @@ pub(crate) mod tests {
let empty: Vec<Value> = Vec::new();
encode(
&Value::Array(empty.clone()),
&Schema::array(Schema::Int),
&Schema::array(Schema::Int).build(),
&mut buf,
)
.expect(&success(&Value::Array(empty), &Schema::array(Schema::Int)));
.expect(&success(
&Value::Array(empty),
&Schema::array(Schema::Int).build(),
));
assert_eq!(vec![0u8], buf);
}

Expand All @@ -405,10 +408,13 @@ pub(crate) mod tests {
let empty: HashMap<String, Value> = HashMap::new();
encode(
&Value::Map(empty.clone()),
&Schema::map(Schema::Int),
&Schema::map(Schema::Int).build(),
&mut buf,
)
.expect(&success(&Value::Map(empty), &Schema::map(Schema::Int)));
.expect(&success(
&Value::Map(empty),
&Schema::map(Schema::Int).build(),
));
assert_eq!(vec![0u8], buf);
}

Expand Down
7 changes: 5 additions & 2 deletions avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ mod tests {
record.put("b", "foo");
writer.append_value(record).unwrap();
let input = writer.into_inner().unwrap();
let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
let mut reader = Reader::builder(&input[..])
.schema(&reader_schema)
.build()
.unwrap();
assert_eq!(
reader.next().unwrap().unwrap(),
Value::Record(vec![
Expand Down Expand Up @@ -235,7 +238,7 @@ mod tests {
record.put("c", "clubs");
writer.append_value(record).unwrap();
let input = writer.into_inner().unwrap();
let mut reader = Reader::with_schema(&schema, &input[..]).unwrap();
let mut reader = Reader::builder(&input[..]).schema(&schema).build().unwrap();
assert_eq!(
reader.next().unwrap().unwrap(),
Value::Record(vec![
Expand Down
72 changes: 30 additions & 42 deletions avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
types::Value,
util,
};
use bon::bon;
use log::warn;
use serde::de::DeserializeOwned;
use serde_json::from_slice;
Expand Down Expand Up @@ -88,7 +89,7 @@ impl<'r, R: Read> Block<'r, R> {
return Err(Details::HeaderMagic.into());
}

let meta_schema = Schema::map(Schema::Bytes);
let meta_schema = Schema::map(Schema::Bytes).build();
match decode(&meta_schema, &mut self.reader)? {
Value::Map(metadata) => {
self.read_writer_schema(&metadata)?;
Expand Down Expand Up @@ -337,57 +338,39 @@ pub struct Reader<'a, R> {
should_resolve_schema: bool,
}

#[bon]
impl<'a, R: Read> Reader<'a, R> {
/// Creates a `Reader` given something implementing the `io::Read` trait to read from.
/// No reader `Schema` will be set.
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
let block = Block::new(reader, vec![])?;
let reader = Reader {
block,
reader_schema: None,
errored: false,
should_resolve_schema: false,
};
Ok(reader)
}

/// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
/// to read from.
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
let block = Block::new(reader, vec![schema])?;
let mut reader = Reader {
block,
reader_schema: Some(schema),
errored: false,
should_resolve_schema: false,
};
// Check if the reader and writer schemas disagree.
reader.should_resolve_schema = reader.writer_schema() != schema;
Ok(reader)
Reader::builder(reader).build()
}

/// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
/// to read from.
/// Creates a `Reader` given something implementing the `io::Read` trait to read from.
/// With an optional reader `Schema` and optional schemata to use for resolving schema
/// references.
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn with_schemata(
schema: &'a Schema,
schemata: Vec<&'a Schema>,
reader: R,
#[builder(finish_fn = build)]
pub fn builder(
#[builder(start_fn)] reader: R,
schema: Option<&'a Schema>,
schemata: Option<Vec<&'a Schema>>,
) -> AvroResult<Reader<'a, R>> {
let schemata = match schemata {
Some(schemata) => schemata,
None => match schema {
Some(schema) => vec![schema],
None => vec![],
},
};
Comment on lines +357 to +363
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for determining the schemata can be expressed more concisely using unwrap_or_else and map, which is more idiomatic in Rust.

        let schemata = schemata.unwrap_or_else(|| schema.map(|s| vec![s]).unwrap_or_default());

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The logic for determining the schemata could be simplified to a one-liner. This will make it easier to understand and to maintain.

let block = Block::new(reader, schemata)?;
let mut reader = Reader {
block,
reader_schema: Some(schema),
reader_schema: schema,
errored: false,
should_resolve_schema: false,
};
// Check if the reader and writer schemas disagree.
reader.should_resolve_schema = reader.writer_schema() != schema;
reader.should_resolve_schema =
schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
Ok(reader)
}

Expand Down Expand Up @@ -744,7 +727,7 @@ mod tests {
#[test]
fn test_reader_iterator() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
let reader = Reader::with_schema(&schema, ENCODED)?;
let reader = Reader::builder(ENCODED).schema(&schema).build()?;

let mut record1 = Record::new(&schema).unwrap();
record1.put("a", 27i64);
Expand All @@ -767,7 +750,12 @@ mod tests {
fn test_reader_invalid_header() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
let mut invalid = &ENCODED[1..];
assert!(Reader::with_schema(&schema, &mut invalid).is_err());
assert!(
Reader::builder(&mut invalid)
.schema(&schema)
.build()
.is_err()
);

Ok(())
}
Expand All @@ -776,7 +764,7 @@ mod tests {
fn test_reader_invalid_block() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
let mut invalid = &ENCODED[0..ENCODED.len() - 19];
let reader = Reader::with_schema(&schema, &mut invalid)?;
let reader = Reader::builder(&mut invalid).schema(&schema).build()?;
for value in reader {
assert!(value.is_err());
}
Expand Down
59 changes: 26 additions & 33 deletions avro/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use crate::schema::{
resolve::ResolvedSchema,
union::UnionSchema,
};
use bon::bon;

/// Represents documentation for complex Avro schemas.
pub type Documentation = Option<String>;
Expand Down Expand Up @@ -383,6 +384,7 @@ type DecimalMetadata = usize;
pub(crate) type Precision = DecimalMetadata;
pub(crate) type Scale = DecimalMetadata;

#[bon]
impl Schema {
/// Converts `self` into its [Parsing Canonical Form].
///
Expand Down Expand Up @@ -645,35 +647,27 @@ impl Schema {
}
}

/// Returns a `Schema::Map` with the given types.
pub fn map(types: Schema) -> Self {
Schema::Map(MapSchema {
types: Box::new(types),
default: None,
attributes: Default::default(),
})
}

/// Returns a `Schema::Map` with the given types and custom attributes.
pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, JsonValue>) -> Self {
/// Returns a `Schema::Map` with the given types and optional custom attributes.
#[builder(finish_fn = build)]
pub fn map(
#[builder(start_fn)] types: Schema,
attributes: Option<BTreeMap<String, JsonValue>>,
) -> Self {
let attributes = attributes.unwrap_or_default();
Schema::Map(MapSchema {
types: Box::new(types),
default: None,
attributes,
})
}

/// Returns a `Schema::Array` with the given items.
pub fn array(items: Schema) -> Self {
Schema::Array(ArraySchema {
items: Box::new(items),
default: None,
attributes: Default::default(),
})
}

/// Returns a `Schema::Array` with the given items and custom attributes.
pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, JsonValue>) -> Self {
/// Returns a `Schema::Array` with the given items and optional custom attributes.
#[builder(finish_fn = build)]
pub fn array(
#[builder(start_fn)] items: Schema,
attributes: Option<BTreeMap<String, JsonValue>>,
) -> Self {
let attributes = attributes.unwrap_or_default();
Schema::Array(ArraySchema {
items: Box::new(items),
default: None,
Expand Down Expand Up @@ -1154,14 +1148,14 @@ mod tests {
#[test]
fn test_array_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
assert_eq!(Schema::array(Schema::String), schema);
assert_eq!(Schema::array(Schema::String).build(), schema);
Ok(())
}

#[test]
fn test_map_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
assert_eq!(Schema::map(Schema::Double), schema);
assert_eq!(Schema::map(Schema::Double).build(), schema);
Ok(())
}

Expand Down Expand Up @@ -1607,7 +1601,8 @@ mod tests {
aliases: None,
schema: Schema::array(Schema::Ref {
name: Name::new("Node")?,
}),
})
.build(),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
Expand Down Expand Up @@ -4682,10 +4677,9 @@ mod tests {

#[test]
fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
let expected = Schema::array_with_attributes(
Schema::Long,
BTreeMap::from([("field-id".to_string(), "1".into())]),
);
let expected = Schema::array(Schema::Long)
.attributes(BTreeMap::from([("field-id".to_string(), "1".into())]))
.build();

let value = serde_json::to_value(&expected)?;
let serialized = serde_json::to_string(&value)?;
Expand All @@ -4705,10 +4699,9 @@ mod tests {

#[test]
fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
let expected = Schema::map_with_attributes(
Schema::Long,
BTreeMap::from([("field-id".to_string(), "1".into())]),
);
let expected = Schema::map(Schema::Long)
.attributes(BTreeMap::from([("field-id".to_string(), "1".into())]))
.build();

let value = serde_json::to_value(&expected)?;
let serialized = serde_json::to_string(&value)?;
Expand Down
Loading