@@ -22,13 +22,12 @@ use crate::{
2222 bigdecimal:: big_decimal_as_bytes,
2323 encode:: { encode_int, encode_long} ,
2424 error:: { Details , Error } ,
25- schema:: { Name , NamesRef , Namespace , RecordSchema , Schema } ,
25+ schema:: { Name , NamesRef , Namespace , RecordField , RecordSchema , Schema } ,
2626} ;
2727use bigdecimal:: BigDecimal ;
28- use serde:: ser;
28+ use serde:: { Serialize , ser} ;
2929use std:: { borrow:: Cow , io:: Write , str:: FromStr } ;
3030
31- const RECORD_FIELD_INIT_BUFFER_SIZE : usize = 64 ;
3231const COLLECTION_SERIALIZER_ITEM_LIMIT : usize = 1024 ;
3332const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY : usize = 32 ;
3433const SINGLE_VALUE_INIT_BUFFER_SIZE : usize = 128 ;
@@ -250,68 +249,39 @@ impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> {
250249pub struct SchemaAwareWriteSerializeStruct < ' a , ' s , W : Write > {
251250 ser : & ' a mut SchemaAwareWriteSerializer < ' s , W > ,
252251 record_schema : & ' s RecordSchema ,
253- item_count : usize ,
254- buffered_fields : Vec < Option < Vec < u8 > > > ,
255252 bytes_written : usize ,
256253}
257254
258255impl < ' a , ' s , W : Write > SchemaAwareWriteSerializeStruct < ' a , ' s , W > {
259256 fn new (
260257 ser : & ' a mut SchemaAwareWriteSerializer < ' s , W > ,
261258 record_schema : & ' s RecordSchema ,
262- len : usize ,
263259 ) -> SchemaAwareWriteSerializeStruct < ' a , ' s , W > {
264260 SchemaAwareWriteSerializeStruct {
265261 ser,
266262 record_schema,
267- item_count : 0 ,
268- buffered_fields : vec ! [ None ; len] ,
269263 bytes_written : 0 ,
270264 }
271265 }
272266
273- fn serialize_next_field < T > ( & mut self , value : & T ) -> Result < ( ) , Error >
267+ fn serialize_next_field < T > ( & mut self , field : & RecordField , value : & T ) -> Result < ( ) , Error >
274268 where
275269 T : ?Sized + ser:: Serialize ,
276270 {
277- let next_field = self . record_schema . fields . get ( self . item_count ) . expect (
278- "Validity of the next field index was expected to have been checked by the caller" ,
279- ) ;
280-
281271 // If we receive fields in order, write them directly to the main writer
282272 let mut value_ser = SchemaAwareWriteSerializer :: new (
283273 & mut * self . ser . writer ,
284- & next_field . schema ,
274+ & field . schema ,
285275 self . ser . names ,
286276 self . ser . enclosing_namespace . clone ( ) ,
287277 ) ;
288278 self . bytes_written += value. serialize ( & mut value_ser) ?;
289279
290- self . item_count += 1 ;
291-
292- // Write any buffered data to the stream that has now become next in line
293- while let Some ( buffer) = self
294- . buffered_fields
295- . get_mut ( self . item_count )
296- . and_then ( |b| b. take ( ) )
297- {
298- self . bytes_written += self
299- . ser
300- . writer
301- . write ( buffer. as_slice ( ) )
302- . map_err ( Details :: WriteBytes ) ?;
303- self . item_count += 1 ;
304- }
305-
306280 Ok ( ( ) )
307281 }
308282
309283 fn end ( self ) -> Result < usize , Error > {
310- if self . item_count != self . record_schema . fields . len ( ) {
311- Err ( Details :: GetField ( self . record_schema . fields [ self . item_count ] . name . clone ( ) ) . into ( ) )
312- } else {
313- Ok ( self . bytes_written )
314- }
284+ Ok ( self . bytes_written )
315285 }
316286}
317287
@@ -323,63 +293,50 @@ impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_,
323293 where
324294 T : ?Sized + ser:: Serialize ,
325295 {
326- if self . item_count >= self . record_schema . fields . len ( ) {
327- return Err ( Details :: FieldName ( String :: from ( key) ) . into ( ) ) ;
328- }
329-
330- let next_field = & self . record_schema . fields [ self . item_count ] ;
331- let next_field_matches = match & next_field. aliases {
332- Some ( aliases) => {
333- key == next_field. name . as_str ( ) || aliases. iter ( ) . any ( |a| key == a. as_str ( ) )
334- }
335- None => key == next_field. name . as_str ( ) ,
336- } ;
337-
338- if next_field_matches {
339- self . serialize_next_field ( & value) . map_err ( |e| {
340- Details :: SerializeRecordFieldWithSchema {
341- field_name : key,
342- record_schema : Schema :: Record ( self . record_schema . clone ( ) ) ,
343- error : Box :: new ( e) ,
344- }
345- } ) ?;
346- Ok ( ( ) )
347- } else {
348- if self . item_count < self . record_schema . fields . len ( ) {
349- for i in self . item_count ..self . record_schema . fields . len ( ) {
350- let field = & self . record_schema . fields [ i] ;
351- let field_matches = match & field. aliases {
352- Some ( aliases) => {
353- key == field. name . as_str ( ) || aliases. iter ( ) . any ( |a| key == a. as_str ( ) )
354- }
355- None => key == field. name . as_str ( ) ,
356- } ;
357-
358- if field_matches {
359- let mut buffer: Vec < u8 > = Vec :: with_capacity ( RECORD_FIELD_INIT_BUFFER_SIZE ) ;
360- let mut value_ser = SchemaAwareWriteSerializer :: new (
361- & mut buffer,
362- & field. schema ,
363- self . ser . names ,
364- self . ser . enclosing_namespace . clone ( ) ,
365- ) ;
366- value. serialize ( & mut value_ser) . map_err ( |e| {
367- Details :: SerializeRecordFieldWithSchema {
368- field_name : key,
369- record_schema : Schema :: Record ( self . record_schema . clone ( ) ) ,
370- error : Box :: new ( e) ,
371- }
372- } ) ?;
373-
374- self . buffered_fields [ i] = Some ( buffer) ;
375-
376- return Ok ( ( ) ) ;
296+ let record_field = self
297+ . record_schema
298+ . lookup
299+ . get ( key)
300+ . and_then ( |idx| self . record_schema . fields . get ( * idx) ) ;
301+
302+ match record_field {
303+ Some ( field) => {
304+ // self.item_count += 1;
305+ self . serialize_next_field ( field, value) . map_err ( |e| {
306+ Details :: SerializeRecordFieldWithSchema {
307+ field_name : key,
308+ record_schema : Schema :: Record ( self . record_schema . clone ( ) ) ,
309+ error : Box :: new ( e) ,
377310 }
378- }
311+ . into ( )
312+ } )
379313 }
314+ None => Err ( Details :: FieldName ( String :: from ( key) ) . into ( ) ) ,
315+ }
316+ }
380317
381- Err ( Details :: FieldName ( String :: from ( key) ) . into ( ) )
318+ fn skip_field ( & mut self , key : & ' static str ) -> Result < ( ) , Self :: Error > {
319+ let skipped_field = self
320+ . record_schema
321+ . lookup
322+ . get ( key)
323+ . and_then ( |idx| self . record_schema . fields . get ( * idx) ) ;
324+
325+ if let Some ( skipped_field) = skipped_field {
326+ // self.item_count += 1;
327+ skipped_field
328+ . default
329+ . serialize ( & mut SchemaAwareWriteSerializer :: new (
330+ self . ser . writer ,
331+ & skipped_field. schema ,
332+ self . ser . names ,
333+ self . ser . enclosing_namespace . clone ( ) ,
334+ ) ) ?;
335+ } else {
336+ return Err ( Details :: GetField ( key. to_string ( ) ) . into ( ) ) ;
382337 }
338+
339+ Ok ( ( ) )
383340 }
384341
385342 fn end ( self ) -> Result < Self :: Ok , Self :: Error > {
@@ -418,7 +375,9 @@ impl<W: Write> SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
418375 {
419376 use SchemaAwareWriteSerializeTupleStruct :: * ;
420377 match self {
421- Record ( record_ser) => record_ser. serialize_next_field ( & value) ,
378+ Record ( _record_ser) => {
379+ unimplemented ! ( "Tuple struct serialization to record is not supported!" ) ;
380+ }
422381 Array ( array_ser) => array_ser. serialize_element ( & value) ,
423382 }
424383 }
@@ -1127,7 +1086,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
11271086 match variant_schema {
11281087 Schema :: Null => { /* skip */ }
11291088 _ => {
1130- encode_int ( i as i32 , & mut * self . writer ) ?;
1089+ encode_long ( i as i64 , & mut * self . writer ) ?;
11311090 let mut variant_ser = SchemaAwareWriteSerializer :: new (
11321091 & mut * self . writer ,
11331092 variant_schema,
@@ -1406,7 +1365,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
14061365 SchemaAwareWriteSerializeSeq :: new ( self , & sch. items , Some ( len) ) ,
14071366 ) ) ,
14081367 Schema :: Record ( sch) => Ok ( SchemaAwareWriteSerializeTupleStruct :: Record (
1409- SchemaAwareWriteSerializeStruct :: new ( self , sch, len ) ,
1368+ SchemaAwareWriteSerializeStruct :: new ( self , sch) ,
14101369 ) ) ,
14111370 Schema :: Ref { name : ref_name } => {
14121371 let ref_schema = self . get_ref_schema ( ref_name) ?;
@@ -1543,11 +1502,9 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
15431502 } ;
15441503
15451504 match schema {
1546- Schema :: Record ( record_schema) => Ok ( SchemaAwareWriteSerializeStruct :: new (
1547- self ,
1548- record_schema,
1549- len,
1550- ) ) ,
1505+ Schema :: Record ( record_schema) => {
1506+ Ok ( SchemaAwareWriteSerializeStruct :: new ( self , record_schema) )
1507+ }
15511508 Schema :: Ref { name : ref_name } => {
15521509 let ref_schema = self . get_ref_schema ( ref_name) ?;
15531510 self . serialize_struct_with_schema ( name, len, ref_schema)
0 commit comments