Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 202 additions & 71 deletions avro/src/serde/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
// under the License.

use crate::Schema;
use crate::schema::{FixedSchema, Name, Names, Namespace, UnionSchema, UuidSchema};
use serde_json::Map;
use crate::schema::{
FixedSchema, Name, Names, Namespace, RecordField, RecordSchema, UnionSchema, UuidSchema,
};
use std::borrow::Cow;
use std::collections::HashMap;

Expand Down Expand Up @@ -82,7 +83,134 @@ pub trait AvroSchema {
///}
/// ```
pub trait AvroSchemaComponent {
/// Get the schema for this component
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema;

/// Get the fields of this schema if it is a record.
///
/// This returns `None` if the schema is not a record.
///
/// The default implementation has to do a lot of extra work, so it is strongly recommended to
/// implement this function when manually implementing this trait.
fn get_record_fields_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Option<Vec<RecordField>> {
get_record_fields_in_ctxt(named_schemas, enclosing_namespace, Self::get_schema_in_ctxt)
}
}

/// Get the record fields from `schema_fn` without polluting `named_schemas` or causing duplicate names
///
/// This is public so the derive macro can use it for `#[avro(with = ||)]` and `#[avro(with = path)]`
pub fn get_record_fields_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
schema_fn: fn(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema,
) -> Option<Vec<RecordField>> {
let mut record = match schema_fn(named_schemas, enclosing_namespace) {
Schema::Record(record) => record,
Schema::Ref { name } => {
// This schema already exists in `named_schemas` so temporarily remove it so we can
// get the actual schema.
let temp = named_schemas
.remove(&name)
.expect("Name should exist in `named_schemas` otherwise Ref is invalid");
// Get the schema
let schema = schema_fn(named_schemas, enclosing_namespace);
// Reinsert the old value
named_schemas.insert(name, temp);

// Now check if we actually got a record and return the fields if that is the case
let Schema::Record(record) = schema else {
return None;
};
return Some(record.fields);
}
_ => return None,
};
// This schema did not yet exist in `named_schemas`, so we need to remove it if and only if
// it isn't used somewhere in the schema (recursive type).

// Find the first Schema::Ref that has the target name
fn find_first_ref<'a>(schema: &'a mut Schema, target: &Name) -> Option<&'a mut Schema> {
match schema {
Schema::Ref { name } if name == target => Some(schema),
Schema::Array(array) => find_first_ref(&mut array.items, target),
Schema::Map(map) => find_first_ref(&mut map.types, target),
Schema::Union(union) => {
for schema in &mut union.schemas {
if let Some(schema) = find_first_ref(schema, target) {
return Some(schema);
}
}
None
}
Schema::Record(record) => {
assert_ne!(
&record.name, target,
"Only expecting a Ref named {target:?}"
);
for field in &mut record.fields {
if let Some(schema) = find_first_ref(&mut field.schema, target) {
return Some(schema);
}
}
None
}
_ => None,
}
}

// Prepare the fields for the new record. All named types will become references.
let new_fields = record
.fields
.iter()
.map(|field| RecordField {
name: field.name.clone(),
doc: field.doc.clone(),
aliases: field.aliases.clone(),
default: field.default.clone(),
schema: if field.schema.is_named() {
Schema::Ref {
name: field.schema.name().expect("Schema is named").clone(),
}
} else {
field.schema.clone()
},
order: field.order.clone(),
position: field.position,
custom_attributes: field.custom_attributes.clone(),
})
.collect();

// Remove the name in case it is not used
named_schemas.remove(&record.name);

// Find the first reference to this schema so we can replace it with the actual schema
for field in &mut record.fields {
if let Some(schema) = find_first_ref(&mut field.schema, &record.name) {
let new_schema = RecordSchema {
name: record.name,
aliases: record.aliases,
doc: record.doc,
fields: new_fields,
lookup: record.lookup,
attributes: record.attributes,
};

let Schema::Ref { name } = std::mem::replace(schema, Schema::Record(new_schema)) else {
panic!("Expected only Refs from find_first_ref");
};

// The schema is used, so reinsert it
named_schemas.insert(name.clone(), Schema::Ref { name });

break;
}
}

Some(record.fields)
}

impl<T> AvroSchema for T
Expand All @@ -100,6 +228,10 @@ macro_rules! impl_schema (
fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
$variant_constructor
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}
);
);
Expand All @@ -118,32 +250,44 @@ impl_schema!(String, Schema::String);
impl_schema!(str, Schema::String);
impl_schema!(char, Schema::String);

impl<T> AvroSchemaComponent for &T
where
T: AvroSchemaComponent + ?Sized,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
macro_rules! impl_passthrough_schema (
($type:ty where T: AvroSchemaComponent + ?Sized $(+ $bound:tt)*) => (
impl<T: AvroSchemaComponent $(+ $bound)* + ?Sized> AvroSchemaComponent for $type {
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}

impl<T> AvroSchemaComponent for &mut T
where
T: AvroSchemaComponent + ?Sized,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
fn get_record_fields_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Option<Vec<RecordField>> {
T::get_record_fields_in_ctxt(named_schemas, enclosing_namespace)
}
}
);
);

impl<T> AvroSchemaComponent for [T]
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}
impl_passthrough_schema!(&T where T: AvroSchemaComponent + ?Sized);
impl_passthrough_schema!(&mut T where T: AvroSchemaComponent + ?Sized);
impl_passthrough_schema!(Box<T> where T: AvroSchemaComponent + ?Sized);
impl_passthrough_schema!(Cow<'_, T> where T: AvroSchemaComponent + ?Sized + ToOwned);
impl_passthrough_schema!(std::sync::Mutex<T> where T: AvroSchemaComponent + ?Sized);

macro_rules! impl_array_schema (
($type:ty where T: AvroSchemaComponent) => (
impl<T: AvroSchemaComponent> AvroSchemaComponent for $type {
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}
);
);

impl_array_schema!([T] where T: AvroSchemaComponent);
impl_array_schema!(Vec<T> where T: AvroSchemaComponent);
// This doesn't work as the macro doesn't allow specifying the N parameter
// impl_array_schema!([T; N] where T: AvroSchemaComponent);

impl<const N: usize, T> AvroSchemaComponent for [T; N]
where
Expand All @@ -152,14 +296,22 @@ where
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

impl<T> AvroSchemaComponent for Vec<T>
impl<T> AvroSchemaComponent for HashMap<String, T>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change drops the AvroSchemaComponent impl for serde_json::Map<String, T> (only HashMap<String, T> remains); if downstream users relied on serde_json::Map schema generation, this may be an unintended regression.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The old implementation of AvroSchemaComponent for serde_json::Map<String, T> is lost in the refactoring. Prevents broken builds for external users of this implementation

where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

Expand All @@ -177,50 +329,9 @@ where
UnionSchema::new(variants).expect("Option<T> must produce a valid (non-nested) union"),
)
}
}

impl<T> AvroSchemaComponent for Map<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}

impl<T> AvroSchemaComponent for HashMap<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}

impl<T> AvroSchemaComponent for Box<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for std::sync::Mutex<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for Cow<'_, T>
where
T: AvroSchemaComponent + Clone,
{
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

Expand Down Expand Up @@ -248,6 +359,10 @@ impl AvroSchemaComponent for core::time::Duration {
schema
}
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

impl AvroSchemaComponent for uuid::Uuid {
Expand All @@ -274,6 +389,10 @@ impl AvroSchemaComponent for uuid::Uuid {
schema
}
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

impl AvroSchemaComponent for u64 {
Expand All @@ -298,6 +417,10 @@ impl AvroSchemaComponent for u64 {
schema
}
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

impl AvroSchemaComponent for u128 {
Expand All @@ -322,6 +445,10 @@ impl AvroSchemaComponent for u128 {
schema
}
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

impl AvroSchemaComponent for i128 {
Expand All @@ -346,6 +473,10 @@ impl AvroSchemaComponent for i128 {
schema
}
}

fn get_record_fields_in_ctxt(_: &mut Names, _: &Namespace) -> Option<Vec<RecordField>> {
None
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions avro/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ pub use de::from_value;
pub use derive::{AvroSchema, AvroSchemaComponent};
pub use ser::to_value;
pub use with::{bytes, bytes_opt, fixed, fixed_opt, slice, slice_opt};

#[doc(hidden)]
pub use derive::get_record_fields_in_ctxt;
Loading