Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,14 @@ pub enum Details {

#[error("Failed to serialize field '{field_name}' for record {record_schema:?}: {error}")]
SerializeRecordFieldWithSchema {
field_name: &'static str,
field_name: String,
record_schema: Schema,
error: Box<Error>,
},

#[error("Missing default for skipped field '{field_name}' for schema {schema:?}")]
MissingDefaultForSkippedField { field_name: String, schema: Schema },

#[error("Failed to deserialize Avro value into value: {0}")]
DeserializeValue(String),

Expand Down
168 changes: 146 additions & 22 deletions avro/src/ser_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
schema::{Name, NamesRef, Namespace, RecordField, RecordSchema, Schema},
};
use bigdecimal::BigDecimal;
use serde::{Serialize, ser};
use serde::ser;
use std::{borrow::Cow, io::Write, str::FromStr};

const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
Expand Down Expand Up @@ -249,6 +249,9 @@ impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> {
pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> {
ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
record_schema: &'s RecordSchema,
/// Fields we received in the wrong order
field_cache: Vec<(usize, Vec<u8>)>,
next_field: usize,
bytes_written: usize,
}

Expand All @@ -260,6 +263,8 @@ impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> {
SchemaAwareWriteSerializeStruct {
ser,
record_schema,
field_cache: Vec::new(),
next_field: 0,
bytes_written: 0,
}
}
Expand All @@ -268,19 +273,86 @@ impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> {
where
T: ?Sized + ser::Serialize,
{
// If we receive fields in order, write them directly to the main writer
let mut value_ser = SchemaAwareWriteSerializer::new(
&mut *self.ser.writer,
&field.schema,
self.ser.names,
self.ser.enclosing_namespace.clone(),
);
self.bytes_written += value.serialize(&mut value_ser)?;
if field.position == self.next_field {
// If we receive fields in order, write them directly to the main writer
let mut value_ser = SchemaAwareWriteSerializer::new(
&mut *self.ser.writer,
&field.schema,
self.ser.names,
self.ser.enclosing_namespace.clone(),
);
self.bytes_written += value.serialize(&mut value_ser)?;

self.next_field += 1;
while let Some(index) = self
.field_cache
.iter()
.position(|(pos, _)| pos == &self.next_field)
{
let (_, bytes) = self.field_cache.remove(index);
self.ser
.writer
.write_all(&bytes)
.map_err(Details::WriteBytes)?;
self.bytes_written += bytes.len();
self.next_field += 1;
}
} else {
// This field is in the wrong order, write it to a temporary buffer so we can add it at the right time
let mut bytes = Vec::new();
let mut value_ser = SchemaAwareWriteSerializer::new(
&mut bytes,
&field.schema,
self.ser.names,
self.ser.enclosing_namespace.clone(),
);
value.serialize(&mut value_ser)?;
self.field_cache.push((field.position, bytes));
}
Ok(())
}

fn end(self) -> Result<usize, Error> {
fn end(mut self) -> Result<usize, Error> {
// Write any fields that are `serde(skip)` or `serde(skip_serializing)`
while self.next_field != self.record_schema.fields.len() {
let field_info = &self.record_schema.fields[self.next_field];
if let Some(index) = self
.field_cache
.iter()
.position(|(pos, _)| pos == &self.next_field)
{
let (_, bytes) = self.field_cache.remove(index);
self.ser
.writer
.write_all(&bytes)
.map_err(Details::WriteBytes)?;
self.bytes_written += bytes.len();
self.next_field += 1;
} else if let Some(default) = &field_info.default {
self.serialize_next_field(field_info, default)
.map_err(|e| Details::SerializeRecordFieldWithSchema {
field_name: field_info.name.clone(),
record_schema: Schema::Record(self.record_schema.clone()),
error: Box::new(e),
})?;
} else {
return Err(Details::MissingDefaultForSkippedField {
field_name: field_info.name.clone(),
schema: Schema::Record(self.record_schema.clone()),
}
.into());
}
}

// Check if all fields were written
if self.next_field != self.record_schema.fields.len() {
let name = self.record_schema.fields[self.next_field].name.clone();
return Err(Details::GetField(name).into());
}
Comment on lines +347 to +351
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Dead code: This check can never trigger.

The while loop at line 317 only exits when self.next_field == self.record_schema.fields.len(). Therefore, the condition at line 348 is always false and this block is unreachable.

Remove the redundant check:

         }
 
-        // Check if all fields were written
-        if self.next_field != self.record_schema.fields.len() {
-            let name = self.record_schema.fields[self.next_field].name.clone();
-            return Err(Details::GetField(name).into());
-        }
         assert!(
             self.field_cache.is_empty(),
             "There should be no more unwritten fields at this point"
         );
         Ok(self.bytes_written)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check if all fields were written
if self.next_field != self.record_schema.fields.len() {
let name = self.record_schema.fields[self.next_field].name.clone();
return Err(Details::GetField(name).into());
}
}
assert!(
self.field_cache.is_empty(),
"There should be no more unwritten fields at this point"
);
Ok(self.bytes_written)
🤖 Prompt for AI Agents
In avro/src/ser_schema.rs around lines 347 to 351, the final check that returns
Err(Details::GetField(...)) is dead code because the preceding while loop always
exits only when self.next_field == self.record_schema.fields.len(); remove this
redundant if-block entirely (delete the lines that fetch the field name and
return the error) and ensure any trailing code and formatting remain consistent
after its removal.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! There is no need to check that the tracked position and the number of fields are the same because this is already checked in the earlier whilez condition. Prevents adding unused code.

assert!(
self.field_cache.is_empty(),
"There should be no more unwritten fields at this point"
);
Ok(self.bytes_written)
}
}
Expand All @@ -304,7 +376,7 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
// self.item_count += 1;
self.serialize_next_field(field, value).map_err(|e| {
Details::SerializeRecordFieldWithSchema {
field_name: key,
field_name: key.to_string(),
record_schema: Schema::Record(self.record_schema.clone()),
error: Box::new(e),
}
Expand All @@ -323,15 +395,20 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
.and_then(|idx| self.record_schema.fields.get(*idx));

if let Some(skipped_field) = skipped_field {
// self.item_count += 1;
skipped_field
.default
.serialize(&mut SchemaAwareWriteSerializer::new(
self.ser.writer,
&skipped_field.schema,
self.ser.names,
self.ser.enclosing_namespace.clone(),
))?;
if let Some(default) = &skipped_field.default {
self.serialize_next_field(skipped_field, default)
.map_err(|e| Details::SerializeRecordFieldWithSchema {
field_name: key.to_string(),
record_schema: Schema::Record(self.record_schema.clone()),
error: Box::new(e),
})?;
} else {
return Err(Details::MissingDefaultForSkippedField {
field_name: key.to_string(),
schema: Schema::Record(self.record_schema.clone()),
}
.into());
}
} else {
return Err(Details::GetField(key.to_string()).into());
}
Expand Down Expand Up @@ -1741,12 +1818,13 @@ impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s
mod tests {
use super::*;
use crate::{
Days, Duration, Millis, Months, decimal::Decimal, error::Details, schema::ResolvedSchema,
Days, Duration, Millis, Months, Reader, Writer, decimal::Decimal, error::Details,
from_value, schema::ResolvedSchema,
};
use apache_avro_test_helper::TestResult;
use bigdecimal::BigDecimal;
use num_bigint::{BigInt, Sign};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_bytes::{ByteArray, Bytes};
use std::{
collections::{BTreeMap, HashMap},
Expand Down Expand Up @@ -2900,4 +2978,50 @@ mod tests {
string_record.serialize(&mut serializer)?;
Ok(())
}

#[test]
fn different_field_order_serde_vs_schema() -> TestResult {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct Foo {
a: String,
b: String,
}
let schema = Schema::parse_str(
r#"
{
"type":"record",
"name":"Foo",
"fields": [
{
"name":"b",
"type":"string"
},
{
"name":"a",
"type":"string"
}
]
}
"#,
)?;

let mut writer = Writer::new(&schema, Vec::new())?;
if let Err(e) = writer.append_ser(Foo {
a: "Hello".into(),
b: "World".into(),
}) {
panic!("{e:?}");
}
let encoded = writer.into_inner()?;
let mut reader = Reader::with_schema(&schema, &encoded[..])?;
let decoded = from_value::<Foo>(&reader.next().unwrap()?)?;
assert_eq!(
decoded,
Foo {
a: "Hello".into(),
b: "World".into(),
}
);
Ok(())
}
}
73 changes: 72 additions & 1 deletion avro/tests/avro-rs-226.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field
struct T {
x: Option<i8>,
#[serde(skip_serializing_if = "Option::is_none")]
#[avro(default = "null")]
y: Option<String>,
z: Option<i8>,
}
Expand All @@ -64,6 +65,7 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field(
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
#[serde(skip_serializing_if = "Option::is_none")]
#[avro(default = "null")]
x: Option<i8>,
y: Option<String>,
z: Option<i8>,
Expand All @@ -86,6 +88,7 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_last_field()
x: Option<i8>,
y: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[avro(default = "null")]
z: Option<i8>,
}

Expand All @@ -100,18 +103,20 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_last_field()
}

#[test]
#[ignore = "This test should be re-enabled once the serde-driven deserialization is implemented! PR #227"]
fn avro_rs_226_index_out_of_bounds_with_serde_skip_multiple_fields() -> TestResult {
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
no_skip1: Option<i8>,
#[serde(skip_serializing)]
#[avro(default = "null")]
skip_serializing: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[avro(default = "null")]
skip_serializing_if: Option<i8>,
#[serde(skip_deserializing)]
skip_deserializing: Option<String>,
#[serde(skip)]
#[avro(skip)]
skip: Option<String>,
no_skip2: Option<i8>,
}
Expand All @@ -128,3 +133,69 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_multiple_fields() -> TestResu
},
)
}

#[test]
#[should_panic(expected = "Missing default for skipped field 'y' for schema")]
fn avro_rs_351_no_default_for_serde_skip_serializing_if_should_panic() {
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
x: Option<i8>,
#[serde(skip_serializing_if = "Option::is_none")]
y: Option<String>,
z: Option<i8>,
}

ser_deser::<T>(
&T::get_schema(),
T {
x: None,
y: None,
z: Some(1),
},
)
.unwrap()
}

#[test]
#[should_panic(expected = "Missing default for skipped field 'x' for schema")]
fn avro_rs_351_no_default_for_serde_skip_should_panic() {
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
#[serde(skip)]
x: Option<i8>,
y: Option<String>,
z: Option<i8>,
}

ser_deser::<T>(
&T::get_schema(),
T {
x: None,
y: None,
z: Some(1),
},
)
.unwrap()
}

#[test]
#[should_panic(expected = "Missing default for skipped field 'z' for schema")]
fn avro_rs_351_no_default_for_serde_skip_serializing_should_panic() {
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
x: Option<i8>,
y: Option<String>,
#[serde(skip_serializing)]
z: Option<i8>,
}

ser_deser::<T>(
&T::get_schema(),
T {
x: Some(0),
y: None,
z: None,
},
)
.unwrap()
}
Loading