1818//! Logic for serde-compatible schema-aware serialization
1919//! which writes directly to a `Write` stream
2020
21+ use crate :: schema:: RecordField ;
2122use crate :: {
2223 bigdecimal:: big_decimal_as_bytes,
2324 encode:: { encode_int, encode_long} ,
2425 error:: { Details , Error } ,
2526 schema:: { Name , NamesRef , Namespace , RecordSchema , Schema } ,
2627} ;
2728use bigdecimal:: BigDecimal ;
28- use serde:: ser;
29+ use serde:: { Serialize , ser} ;
2930use std:: { borrow:: Cow , io:: Write , str:: FromStr } ;
3031
3132const RECORD_FIELD_INIT_BUFFER_SIZE : usize = 64 ;
@@ -328,12 +329,7 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
328329 }
329330
330331 let next_field = & self . record_schema . fields [ self . item_count ] ;
331- let next_field_matches = match & next_field. aliases {
332- Some ( aliases) => {
333- key == next_field. name . as_str ( ) || aliases. iter ( ) . any ( |a| key == a. as_str ( ) )
334- }
335- None => key == next_field. name . as_str ( ) ,
336- } ;
332+ let next_field_matches = field_matches ( next_field, key) ;
337333
338334 if next_field_matches {
339335 self . serialize_next_field ( & value) . map_err ( |e| {
@@ -342,18 +338,13 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
342338 record_schema : Schema :: Record ( self . record_schema . clone ( ) ) ,
343339 error : Box :: new ( e) ,
344340 }
345- } ) ? ;
346- Ok ( ( ) )
341+ . into ( )
342+ } )
347343 } else {
348344 if self . item_count < self . record_schema . fields . len ( ) {
349345 for i in self . item_count ..self . record_schema . fields . len ( ) {
350346 let field = & self . record_schema . fields [ i] ;
351- let field_matches = match & field. aliases {
352- Some ( aliases) => {
353- key == field. name . as_str ( ) || aliases. iter ( ) . any ( |a| key == a. as_str ( ) )
354- }
355- None => key == field. name . as_str ( ) ,
356- } ;
347+ let field_matches = field_matches ( field, key) ;
357348
358349 if field_matches {
359350 let mut buffer: Vec < u8 > = Vec :: with_capacity ( RECORD_FIELD_INIT_BUFFER_SIZE ) ;
@@ -382,8 +373,27 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
382373 }
383374 }
384375
385- fn skip_field ( & mut self , _key : & ' static str ) -> Result < ( ) , Self :: Error > {
386- self . item_count += 1 ;
376+ fn skip_field ( & mut self , key : & ' static str ) -> Result < ( ) , Self :: Error > {
377+ match self . record_schema . fields . get ( self . item_count ) {
378+ Some ( skipped_field) => {
379+ if field_matches ( skipped_field, key) {
380+ self . item_count += 1 ;
381+ skipped_field
382+ . default
383+ . serialize ( & mut SchemaAwareWriteSerializer :: new (
384+ self . ser . writer ,
385+ & skipped_field. schema ,
386+ self . ser . names ,
387+ self . ser . enclosing_namespace . clone ( ) ,
388+ ) ) ?;
389+ } else {
390+ return Err ( Details :: GetField ( key. to_string ( ) ) . into ( ) ) ;
391+ }
392+ }
393+ None => {
394+ return Err ( Details :: GetField ( key. to_string ( ) ) . into ( ) ) ;
395+ }
396+ }
387397 Ok ( ( ) )
388398 }
389399
@@ -392,6 +402,16 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
392402 }
393403}
394404
405+ fn field_matches ( record_field : & RecordField , expected_name : & str ) -> bool {
406+ let field_name = record_field. name . as_str ( ) ;
407+ match & record_field. aliases {
408+ Some ( aliases) => {
409+ expected_name == field_name || aliases. iter ( ) . any ( |a| expected_name == a. as_str ( ) )
410+ }
411+ None => expected_name == field_name,
412+ }
413+ }
414+
395415impl < W : Write > ser:: SerializeStructVariant for SchemaAwareWriteSerializeStruct < ' _ , ' _ , W > {
396416 type Ok = usize ;
397417 type Error = Error ;
@@ -1132,7 +1152,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
11321152 match variant_schema {
11331153 Schema :: Null => { /* skip */ }
11341154 _ => {
1135- encode_int ( i as i32 , & mut * self . writer ) ?;
1155+ encode_long ( i as i64 , & mut * self . writer ) ?;
11361156 let mut variant_ser = SchemaAwareWriteSerializer :: new (
11371157 & mut * self . writer ,
11381158 variant_schema,
0 commit comments