-
Notifications
You must be signed in to change notification settings - Fork 0
458: feat: Support (De)Serialization for different representations of Nullable Unions #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
de7a6a6
d75ce1e
aac27b5
94a32ca
ae10abc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ use std::{ | |
|
|
||
| pub struct Deserializer<'de> { | ||
| input: &'de Value, | ||
| deserializing_some: bool, | ||
| } | ||
|
|
||
| struct SeqDeserializer<'de> { | ||
|
|
@@ -75,7 +76,17 @@ struct UnionDeserializer<'de> { | |
|
|
||
| impl<'de> Deserializer<'de> { | ||
| pub fn new(input: &'de Value) -> Self { | ||
| Deserializer { input } | ||
| Deserializer { | ||
| input, | ||
| deserializing_some: false, | ||
| } | ||
| } | ||
|
|
||
| pub fn new_deserializing_some(input: &'de Value) -> Self { | ||
| Deserializer { | ||
| input, | ||
| deserializing_some: true, | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -616,6 +627,21 @@ impl<'de> de::Deserializer<'de> for Deserializer<'de> { | |
| let d_bytes: [u8; 12] = d.into(); | ||
| visitor.visit_bytes(&d_bytes[..]) | ||
| } | ||
| Value::Union(i, x) => { | ||
| if matches!(x.deref(), Value::Union(_, _)) { | ||
| Err(de::Error::custom(format!( | ||
| "Directly nested union types are not supported. Got Value::Union({i}, {x:?})" | ||
| ))) | ||
| } else { | ||
| Self::new(x.deref()) | ||
| .deserialize_bytes(visitor) | ||
| .map_err(|e| { | ||
| de::Error::custom(format!( | ||
| "Attempted to deserialize Value::Union({i}, {x:?}) as bytes: {e:?}" | ||
| )) | ||
| }) | ||
| } | ||
| } | ||
| _ => Err(de::Error::custom(format!( | ||
| "Expected a String|Bytes|Fixed|Uuid|Decimal|Duration, but got {:?}", | ||
| self.input | ||
|
|
@@ -638,6 +664,21 @@ impl<'de> de::Deserializer<'de> for Deserializer<'de> { | |
| let d_bytes: [u8; 12] = d.into(); | ||
| visitor.visit_byte_buf(Vec::from(d_bytes)) | ||
| } | ||
| Value::Union(i, x) => { | ||
| if matches!(x.deref(), Value::Union(_, _)) { | ||
| Err(de::Error::custom(format!( | ||
| "Directly nested union types are not supported. Got Value::Union({i}, {x:?})" | ||
| ))) | ||
| } else { | ||
| Self::new(x.deref()) | ||
| .deserialize_byte_buf(visitor) | ||
| .map_err(|e| { | ||
| de::Error::custom(format!( | ||
| "Attempted to deserialize Value::Union({i}, {x:?}) as bytes: {e:?}" | ||
| )) | ||
| }) | ||
| } | ||
| } | ||
| _ => Err(de::Error::custom(format!( | ||
| "Expected a String|Bytes|Fixed|Uuid|Decimal|Duration, but got {:?}", | ||
| self.input | ||
|
|
@@ -651,7 +692,9 @@ impl<'de> de::Deserializer<'de> for Deserializer<'de> { | |
| { | ||
| match self.input { | ||
| Value::Union(_i, inner) if inner.as_ref() == &Value::Null => visitor.visit_none(), | ||
| Value::Union(_i, inner) => visitor.visit_some(Deserializer::new(inner)), | ||
| Value::Union(_i, _inner) => { | ||
| visitor.visit_some(Deserializer::new_deserializing_some(self.input)) | ||
| } | ||
| _ => Err(de::Error::custom(format!( | ||
| "Expected a Union, but got {:?}", | ||
| self.input | ||
|
|
@@ -753,6 +796,14 @@ impl<'de> de::Deserializer<'de> for Deserializer<'de> { | |
| match self.input { | ||
| Value::Map(items) => visitor.visit_map(MapDeserializer::new(items)), | ||
| Value::Record(fields) => visitor.visit_map(RecordDeserializer::new(fields)), | ||
| Value::Union(_i, inner) => match inner.deref() { | ||
| Value::Map(items) => visitor.visit_map(MapDeserializer::new(items)), | ||
| Value::Record(fields) => visitor.visit_map(RecordDeserializer::new(fields)), | ||
| Value::Null => visitor.visit_map(RecordDeserializer::new(&[])), | ||
| _ => Err(de::Error::custom(format!( | ||
| "Expected a Map, Record or Null, but got: {inner:?}" | ||
| ))), | ||
| }, | ||
| _ => Err(de::Error::custom(format_args!( | ||
| "Expected a record or a map. Got: {:?}", | ||
| &self.input | ||
|
|
@@ -799,9 +850,19 @@ impl<'de> de::Deserializer<'de> for Deserializer<'de> { | |
| Value::Record(fields) => visitor.visit_enum(EnumDeserializer::new(fields)), | ||
| Value::String(field) => visitor.visit_enum(EnumUnitDeserializer::new(field)), | ||
| Value::Union(idx, inner) => { | ||
| if (*idx as usize) < variants.len() { | ||
| // if we came here from a some, we need to check if we are deserializing a | ||
| // non-newtype enum | ||
| if self.deserializing_some | ||
| && let Value::Enum(_index, field) = inner.deref() | ||
| && variants.contains(&&**field) | ||
| { | ||
| return visitor.visit_enum(EnumUnitDeserializer::new(field)); | ||
| } | ||
| // Assume `null` is the first branch if deserializing some so decrement the variant index | ||
| let variant_idx = *idx as usize - usize::from(self.deserializing_some); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code performs an unchecked subtraction This occurs when the Avro schema for a nullable union has the non-null variant at index 0 (e.g., Furthermore, the logic hardcodes the assumption that
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The calculation of the branch_index may lead to usize underflow when the null variant is not the first in the union. The Avro specification recommends listing the null first, but it allows to use it in a non-first position too. Prevents an application crash at production due to invalid variant index. The calculation should be done with checked operations and an Err should be returned instead with recommendation to put the null at the first position. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Severity: high 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Augment AI reviewer is correct! The calculation of the branch_index may lead to usize underflow when the null variant is not the first in the union. The Avro specification recommends listing the null first, but it allows to use it in a non-first position too. Prevents an application crash at production due to invalid variant index. The calculation should be done with checked operations and an Err should be returned instead with recommendation to put the null at the first position. |
||
| if (variant_idx) < variants.len() { | ||
| visitor.visit_enum(UnionDeserializer::new( | ||
| variants[*idx as usize], | ||
| variants[variant_idx], | ||
| inner.as_ref(), | ||
| )) | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -606,6 +606,7 @@ pub struct SchemaAwareWriteSerializer<'s, W: Write> { | |||||||||||||||||||||||||||||
| root_schema: &'s Schema, | ||||||||||||||||||||||||||||||
| names: &'s NamesRef<'s>, | ||||||||||||||||||||||||||||||
| enclosing_namespace: Namespace, | ||||||||||||||||||||||||||||||
| serializing_some: bool, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | ||||||||||||||||||||||||||||||
|
|
@@ -629,6 +630,22 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
| root_schema: schema, | ||||||||||||||||||||||||||||||
| names, | ||||||||||||||||||||||||||||||
| enclosing_namespace, | ||||||||||||||||||||||||||||||
| serializing_some: false, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| pub fn new_serializing_some( | ||||||||||||||||||||||||||||||
| writer: &'s mut W, | ||||||||||||||||||||||||||||||
| schema: &'s Schema, | ||||||||||||||||||||||||||||||
| names: &'s NamesRef<'s>, | ||||||||||||||||||||||||||||||
| enclosing_namespace: Namespace, | ||||||||||||||||||||||||||||||
| ) -> SchemaAwareWriteSerializer<'s, W> { | ||||||||||||||||||||||||||||||
| SchemaAwareWriteSerializer { | ||||||||||||||||||||||||||||||
| writer, | ||||||||||||||||||||||||||||||
| root_schema: schema, | ||||||||||||||||||||||||||||||
| names, | ||||||||||||||||||||||||||||||
| enclosing_namespace, | ||||||||||||||||||||||||||||||
| serializing_some: true, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -1374,7 +1391,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
| where | ||||||||||||||||||||||||||||||
| T: ?Sized + ser::Serialize, | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| let mut inner_ser = SchemaAwareWriteSerializer::new( | ||||||||||||||||||||||||||||||
| let mut inner_ser = SchemaAwareWriteSerializer::new_serializing_some( | ||||||||||||||||||||||||||||||
| &mut *self.writer, | ||||||||||||||||||||||||||||||
| schema, | ||||||||||||||||||||||||||||||
| self.names, | ||||||||||||||||||||||||||||||
|
|
@@ -1460,19 +1477,37 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
| encode_int(variant_index as i32, &mut self.writer) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| Schema::Union(union_schema) => { | ||||||||||||||||||||||||||||||
| if variant_index as usize >= union_schema.schemas.len() { | ||||||||||||||||||||||||||||||
| // If we came here from a some, we need to check if we are serializing a | ||||||||||||||||||||||||||||||
| // non-newtype enum | ||||||||||||||||||||||||||||||
| if self.serializing_some { | ||||||||||||||||||||||||||||||
| for (i, variant_schema) in union_schema.schemas.iter().enumerate() { | ||||||||||||||||||||||||||||||
| match variant_schema { | ||||||||||||||||||||||||||||||
| Schema::Enum(enum_schema) if enum_schema.name.name == name => { | ||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the Severity: high 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Augment AI reviewer is correct! The new logic should add a support for Schema::Ref too because the reference may point to a Schema::Enum. Prevents an error while resolving the variant if it is a reference to an enum schema |
||||||||||||||||||||||||||||||
| if variant_index as usize >= enum_schema.symbols.len() { | ||||||||||||||||||||||||||||||
| return Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Variant index out of bounds: {}. The Enum schema has '{}' symbols", | ||||||||||||||||||||||||||||||
| variant_index, | ||||||||||||||||||||||||||||||
| enum_schema.symbols.len() | ||||||||||||||||||||||||||||||
| ))); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| encode_int(i as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| return encode_int(variant_index as i32, &mut self.writer); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| _ => { /* skip */ } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| let branch_index = variant_index as usize + usize::from(self.serializing_some); | ||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The serialization logic hardcodes the assumption that the
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The calculation of the branch_index may lead to usize underflow when the null variant is not the first in the union. The Avro specification recommends listing the null first, but it allows to use it in a non-first position too. Prevents an application crash at production due to invalid variant index. The calculation should be done with checked operations and an Err should be returned instead with recommendation to put the null at the first position. |
||||||||||||||||||||||||||||||
| if branch_index >= union_schema.schemas.len() { | ||||||||||||||||||||||||||||||
| return Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Variant index out of bounds: {}. The union schema has '{}' schemas", | ||||||||||||||||||||||||||||||
| variant_index, | ||||||||||||||||||||||||||||||
| union_schema.schemas.len() | ||||||||||||||||||||||||||||||
| ))); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| encode_int(variant_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_unit_struct_with_schema( | ||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||
| &union_schema.schemas[variant_index as usize], | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| encode_int(branch_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_unit_struct_with_schema(name, &union_schema.schemas[branch_index]) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| Schema::Ref { name: ref_name } => { | ||||||||||||||||||||||||||||||
| let ref_schema = self.get_ref_schema(ref_name)?; | ||||||||||||||||||||||||||||||
|
|
@@ -1524,17 +1559,21 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| match schema { | ||||||||||||||||||||||||||||||
| Schema::Union(union_schema) => { | ||||||||||||||||||||||||||||||
| let variant_schema = union_schema | ||||||||||||||||||||||||||||||
| .schemas | ||||||||||||||||||||||||||||||
| .get(variant_index as usize) | ||||||||||||||||||||||||||||||
| .ok_or_else(|| { | ||||||||||||||||||||||||||||||
| create_error(format!( | ||||||||||||||||||||||||||||||
| "No variant schema at position {variant_index} for {union_schema:?}" | ||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||
| })?; | ||||||||||||||||||||||||||||||
| let branch_index = variant_index as usize + usize::from(self.serializing_some); | ||||||||||||||||||||||||||||||
| if branch_index >= union_schema.schemas.len() { | ||||||||||||||||||||||||||||||
| return Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Variant index out of bounds: {}. The union schema has '{}' schemas", | ||||||||||||||||||||||||||||||
| variant_index, | ||||||||||||||||||||||||||||||
| union_schema.schemas.len() | ||||||||||||||||||||||||||||||
| ))); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+1563
to
+1569
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| encode_int(variant_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_newtype_struct_with_schema(variant, value, variant_schema) | ||||||||||||||||||||||||||||||
| encode_int(branch_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_newtype_struct_with_schema( | ||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||
| value, | ||||||||||||||||||||||||||||||
| &union_schema.schemas[branch_index], | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| _ => Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Expected Union schema. Got: {schema}" | ||||||||||||||||||||||||||||||
|
|
@@ -1703,17 +1742,21 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| match schema { | ||||||||||||||||||||||||||||||
| Schema::Union(union_schema) => { | ||||||||||||||||||||||||||||||
| let variant_schema = union_schema | ||||||||||||||||||||||||||||||
| .schemas | ||||||||||||||||||||||||||||||
| .get(variant_index as usize) | ||||||||||||||||||||||||||||||
| .ok_or_else(|| { | ||||||||||||||||||||||||||||||
| create_error(format!( | ||||||||||||||||||||||||||||||
| "Cannot find a variant at position {variant_index} in {union_schema:?}" | ||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||
| })?; | ||||||||||||||||||||||||||||||
| let branch_index = variant_index as usize + usize::from(self.serializing_some); | ||||||||||||||||||||||||||||||
| if branch_index >= union_schema.schemas.len() { | ||||||||||||||||||||||||||||||
| return Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Variant index out of bounds: {}. The union schema has '{}' schemas", | ||||||||||||||||||||||||||||||
| variant_index, | ||||||||||||||||||||||||||||||
| union_schema.schemas.len() | ||||||||||||||||||||||||||||||
| ))); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+1746
to
+1752
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| encode_int(variant_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_tuple_struct_with_schema(variant, len, variant_schema) | ||||||||||||||||||||||||||||||
| encode_int(branch_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_tuple_struct_with_schema( | ||||||||||||||||||||||||||||||
| name, | ||||||||||||||||||||||||||||||
| len, | ||||||||||||||||||||||||||||||
| &union_schema.schemas[branch_index], | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| _ => Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Expected Union schema. Got: {schema}" | ||||||||||||||||||||||||||||||
|
|
@@ -1835,17 +1878,17 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| match schema { | ||||||||||||||||||||||||||||||
| Schema::Union(union_schema) => { | ||||||||||||||||||||||||||||||
| let variant_schema = union_schema | ||||||||||||||||||||||||||||||
| .schemas | ||||||||||||||||||||||||||||||
| .get(variant_index as usize) | ||||||||||||||||||||||||||||||
| .ok_or_else(|| { | ||||||||||||||||||||||||||||||
| create_error(format!( | ||||||||||||||||||||||||||||||
| "Cannot find variant at position {variant_index} in {union_schema:?}" | ||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||
| })?; | ||||||||||||||||||||||||||||||
| let branch_index = variant_index as usize + usize::from(self.serializing_some); | ||||||||||||||||||||||||||||||
| if branch_index >= union_schema.schemas.len() { | ||||||||||||||||||||||||||||||
| return Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Variant index out of bounds: {}. The union schema has '{}' schemas", | ||||||||||||||||||||||||||||||
| variant_index, | ||||||||||||||||||||||||||||||
| union_schema.schemas.len() | ||||||||||||||||||||||||||||||
| ))); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+1882
to
+1888
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| encode_int(variant_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_struct_with_schema(variant, len, variant_schema) | ||||||||||||||||||||||||||||||
| encode_int(branch_index as i32, &mut self.writer)?; | ||||||||||||||||||||||||||||||
| self.serialize_struct_with_schema(name, len, &union_schema.schemas[branch_index]) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| _ => Err(create_error(format!( | ||||||||||||||||||||||||||||||
| "Expected Union schema. Got: {schema}" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message here seems to be a copy-paste from
deserialize_bytes. It says "as bytes", but this is insidedeserialize_byte_buf. This could be confusing when debugging. It should probably be updated to "as byte_buf".