Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::schema::{InnerDecimalSchema, UuidSchema};
use crate::schema::{InnerDecimalSchema, NamespaceRef, UuidSchema};
use crate::{
AvroResult, Error,
bigdecimal::deserialize_big_decimal,
decimal::Decimal,
duration::Duration,
error::Details,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema,
},
schema::{DecimalSchema, EnumSchema, FixedSchema, Name, RecordSchema, ResolvedSchema, Schema},
types::Value,
util::{safe_len, zag_i32, zag_i64},
};
Expand Down Expand Up @@ -74,13 +71,13 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
/// Decode a `Value` from avro format given its `Schema`.
pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
let rs = ResolvedSchema::try_from(schema)?;
decode_internal(schema, rs.get_names(), &None, reader)
decode_internal(schema, rs.get_names(), None, reader)
}

pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
schema: &Schema,
names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
enclosing_namespace: NamespaceRef,
reader: &mut R,
) -> AvroResult<Value> {
match schema {
Expand Down Expand Up @@ -313,7 +310,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
decode_internal(
&field.schema,
names,
&fully_qualified_name.namespace,
fully_qualified_name.namespace(),
reader,
)?,
));
Expand Down Expand Up @@ -344,11 +341,11 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
decode_internal(
resolved.borrow(),
names,
&fully_qualified_name.namespace,
fully_qualified_name.namespace(),
reader,
)
} else {
Err(Details::SchemaResolutionError(fully_qualified_name).into())
Err(Details::SchemaResolutionError(fully_qualified_name.into_owned()).into())
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::schema::{InnerDecimalSchema, UuidSchema};
use crate::schema::{InnerDecimalSchema, NamespaceRef, UuidSchema};
use crate::{
AvroResult,
bigdecimal::serialize_big_decimal,
error::Details,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema, SchemaKind, UnionSchema,
DecimalSchema, EnumSchema, FixedSchema, Name, RecordSchema, ResolvedSchema, Schema,
SchemaKind, UnionSchema,
},
types::{Value, ValueKind},
util::{zig_i32, zig_i64},
Expand All @@ -37,7 +37,7 @@ use std::{borrow::Borrow, collections::HashMap, io::Write};
/// encoding for complex type values.
pub fn encode<W: Write>(value: &Value, schema: &Schema, writer: &mut W) -> AvroResult<usize> {
let rs = ResolvedSchema::try_from(schema)?;
encode_internal(value, schema, rs.get_names(), &None, writer)
encode_internal(value, schema, rs.get_names(), None, writer)
}

/// Encode `s` as the _bytes_ primitive type.
Expand Down Expand Up @@ -66,14 +66,16 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
value: &Value,
schema: &Schema,
names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
enclosing_namespace: NamespaceRef,
writer: &mut W,
) -> AvroResult<usize> {
if let Schema::Ref { name } = schema {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let resolved = names
.get(&fully_qualified_name)
.ok_or(Details::SchemaResolutionError(fully_qualified_name))?;
.ok_or(Details::SchemaResolutionError(
fully_qualified_name.into_owned(),
))?;
return encode_internal(value, resolved.borrow(), names, enclosing_namespace, writer);
}

Expand Down Expand Up @@ -290,7 +292,7 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
..
}) = *schema
{
let record_namespace = name.fully_qualified_name(enclosing_namespace).namespace;
let record_namespace = name.namespace().or(enclosing_namespace);

let mut lookup = HashMap::new();
value_fields.iter().for_each(|(name, field)| {
Expand All @@ -313,7 +315,7 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
value,
&schema_field.schema,
names,
&record_namespace,
record_namespace,
writer,
)?;
} else {
Expand Down
6 changes: 3 additions & 3 deletions avro/src/reader/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<'r, R: Read> Block<'r, R> {
let item = decode_internal(
&self.writer_schema,
&self.names_refs,
&None,
None,
&mut block_bytes,
)?;
let item = match read_schema {
Expand Down Expand Up @@ -221,15 +221,15 @@ impl<'r, R: Read> Block<'r, R> {
resolve_names_with_schemata(
self.schemata.iter().copied(),
&mut names,
&None,
None,
&HashMap::new(),
)?;
self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?;
} else {
self.writer_schema = Schema::parse(&json)?;
let mut names = HashMap::new();
resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())?;
resolve_names(&self.writer_schema, &mut names, None, &HashMap::new())?;
self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ pub fn from_avro_datum_reader_schemata<R: Read>(
reader_schemata: Vec<&Schema>,
) -> AvroResult<Value> {
let rs = ResolvedSchema::try_from(writer_schemata)?;
let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
let value = decode_internal(writer_schema, rs.get_names(), None, reader)?;
match reader_schema {
Some(schema) => {
if reader_schemata.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion avro/src/reader/single_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl GenericSingleObjectReader {
decode_internal(
self.write_schema.get_root_schema(),
self.write_schema.get_names(),
&None,
None,
reader,
)
} else {
Expand Down
Loading