445: feat!: Rework SpecificSingleObjectWriter#42
445: feat!: Rework SpecificSingleObjectWriter#42martin-augment wants to merge 2 commits intomainfrom
SpecificSingleObjectWriter#42Conversation
`ResolvedOwnedSchema` now uses a self-referential struct when resolving the schema, so it does not need to clone all named schemas contained in it. This also fixes a bug in `reader::Block::read_writer_schema` where it unnecessarily parses the schemata twice.
It now resolves the schema and caches it. It also no longer uses `GenericSingleObjectWriter` for the header and does it directly. This removes the need for a buffer. This is a breaking change: - `SpecificSingleObjectWriter::with_capcity` has been removed. - `write_avro_datum_ref` now also takes a `names: &NamesRef` argument
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a substantial refactoring of the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request refactors the Avro schema resolution and single-object writer implementations. The changes introduce the ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Pull Request Review: #42 - Rework SpecificSingleObjectWriterSummaryThis PR refactors the SpecificSingleObjectWriter to eliminate the internal buffer and improve the API design. The changes also introduce the ouroboros crate to handle self-referential structures in ResolvedOwnedSchema. Code Quality & Best Practices ✅Strengths:
Potential Issues & Concerns 🔍1. Breaking API Change
|
There was a problem hiding this comment.
Code Review
This pull request introduces the ouroboros crate as a dependency and modifies the SpecificSingleObjectWriter to use ResolvedOwnedSchema for schema resolution. The with_capacity function is replaced with new and new_with_header_builder functions, and the write functions are updated to use the resolved schema and header. The changes also include updates to the resolve_names function to handle known schemata and use NamesRef.
🤖 Augment PR SummarySummary: This PR reworks Changes:
Technical Notes: Adds an 🤖 Was this summary useful? React with 👍 or 👎 |
| &None, | ||
| &HashMap::new(), | ||
| )?; | ||
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); |
There was a problem hiding this comment.
names_refs is a Names (owned Schema values), but here names is a NamesRef (&Schema values), so (n, s.clone()) looks like it would keep references rather than cloning the underlying Schema (and may not type-check). This matters because decode_internal later relies on names_refs to resolve Schema::Ref during decoding.
Other Locations
avro/src/reader.rs:242
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:annoying; category:bug; feedback:The Augment AI reviewer is not correct! There is no need to dereference a variable to clone it. Calling the method on the reference will still do the right thing and return an owned clone of that object.
| /// if you don't know what you are doing, instead. | ||
| pub fn write_avro_datum_ref<T: Serialize, W: Write>( | ||
| schema: &Schema, | ||
| names: &NamesRef, |
There was a problem hiding this comment.
Now that write_avro_datum_ref takes an explicit names map, it seems worth documenting that callers must provide all named schemas needed to resolve any Schema::Ref (e.g., recursive types), otherwise serialization can fail at runtime. Without refs, callers can still pass an empty map.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback:The Augment AI reviewer is correct! The changes proposed in this Pull Request add a new parameter to the write_avro_datum_ref() function, so it has to be documented in the function's docstring. Prevents confusion in the developer trying to use this method and figuring out what this parameter is all about
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@avro/src/reader.rs`:
- Around line 228-243: When schemata is non-empty the code only fills names_refs
from the external schemata and skips named types declared inside the parsed
writer_schema; update the if-branch after calling
Schema::parse_with_names(&json, self.names_refs.clone())? to also call
resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())? (or a
merge variant) and then merge those discovered names into self.names_refs
(ensuring existing schemata entries are preserved or overridden as intended), so
resolve_names_with_schemata, self.writer_schema, names_refs, resolve_names and
Schema::parse_with_names are all involved in the merge step.
In `@avro/src/writer.rs`:
- Around line 633-671: Both write_value
(SpecificSingleObjectWriter::write_value) and write_ref
(SpecificSingleObjectWriter::write_ref) currently write self.header to the
writer before attempting validation/serialization, which can leave a partial
message on error; change each to first serialize/validate into a temporary
buffer (e.g., Vec<u8>) or validate the Value against self.resolved before
writing the header, using write_value_ref_owned_resolved and
write_avro_datum_ref to produce the payload in-memory, and only after successful
serialization write the header and then write the payload to the provided writer
so errors are returned without mutating the output stream.
🧹 Nitpick comments (1)
avro/src/writer.rs (1)
753-761: Document the newnamesrequirement.The doc comment should state that
namesmust include all named types referenced byschema(e.g., fromResolvedSchema::get_names()), otherwise resolution can fail for refs.✏️ Suggested doc tweak
/// Write the referenced [Serialize]able object to the provided [Write] object. /// Returns a result with the number of bytes written. +/// `names` must contain all named types referenced by `schema` +/// (e.g., from `ResolvedSchema::get_names()`).
| if !self.schemata.is_empty() { | ||
| let rs = ResolvedSchema::try_from(self.schemata.clone())?; | ||
| let names: Names = rs | ||
| .get_names() | ||
| .iter() | ||
| .map(|(name, schema)| (name.clone(), (*schema).clone())) | ||
| .collect(); | ||
| self.writer_schema = Schema::parse_with_names(&json, names)?; | ||
| let mut names = HashMap::new(); | ||
| resolve_names_with_schemata( | ||
| self.schemata.iter().copied(), | ||
| &mut self.names_refs, | ||
| &mut names, | ||
| &None, | ||
| &HashMap::new(), | ||
| )?; | ||
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); | ||
| self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?; | ||
| } else { | ||
| self.writer_schema = Schema::parse(&json)?; | ||
| resolve_names(&self.writer_schema, &mut self.names_refs, &None)?; | ||
| let mut names = HashMap::new(); | ||
| resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())?; | ||
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); | ||
| } |
There was a problem hiding this comment.
Writer schema names are not merged when schemata is provided.
In the non-empty schemata branch, names_refs is populated only from the provided schemata (Line 229+). Named types defined inside the writer schema itself won’t be added, so decode_internal can fail to resolve Schema::Ref (e.g., recursive records). Consider resolving names from self.writer_schema after parsing and merging them with the known schemata.
🛠️ Suggested merge of writer schema names
- let mut names = HashMap::new();
+ let mut known = HashMap::new();
resolve_names_with_schemata(
self.schemata.iter().copied(),
- &mut names,
+ &mut known,
&None,
&HashMap::new(),
)?;
- self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
- self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?;
+ let mut names_refs: Names =
+ known.iter().map(|(n, s)| (n.clone(), (*s).clone())).collect();
+ self.writer_schema = Schema::parse_with_names(&json, names_refs.clone())?;
+ let mut writer_names = HashMap::new();
+ resolve_names(&self.writer_schema, &mut writer_names, &None, &known)?;
+ names_refs.extend(writer_names.into_iter().map(|(n, s)| (n, s.clone())));
+ self.names_refs = names_refs;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if !self.schemata.is_empty() { | |
| let rs = ResolvedSchema::try_from(self.schemata.clone())?; | |
| let names: Names = rs | |
| .get_names() | |
| .iter() | |
| .map(|(name, schema)| (name.clone(), (*schema).clone())) | |
| .collect(); | |
| self.writer_schema = Schema::parse_with_names(&json, names)?; | |
| let mut names = HashMap::new(); | |
| resolve_names_with_schemata( | |
| self.schemata.iter().copied(), | |
| &mut self.names_refs, | |
| &mut names, | |
| &None, | |
| &HashMap::new(), | |
| )?; | |
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); | |
| self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?; | |
| } else { | |
| self.writer_schema = Schema::parse(&json)?; | |
| resolve_names(&self.writer_schema, &mut self.names_refs, &None)?; | |
| let mut names = HashMap::new(); | |
| resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())?; | |
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); | |
| } | |
| if !self.schemata.is_empty() { | |
| let mut known = HashMap::new(); | |
| resolve_names_with_schemata( | |
| self.schemata.iter().copied(), | |
| &mut known, | |
| &None, | |
| &HashMap::new(), | |
| )?; | |
| let mut names_refs: Names = | |
| known.iter().map(|(n, s)| (n.clone(), (*s).clone())).collect(); | |
| self.writer_schema = Schema::parse_with_names(&json, names_refs.clone())?; | |
| let mut writer_names = HashMap::new(); | |
| resolve_names(&self.writer_schema, &mut writer_names, &None, &known)?; | |
| names_refs.extend(writer_names.into_iter().map(|(n, s)| (n, s.clone()))); | |
| self.names_refs = names_refs; | |
| } else { | |
| self.writer_schema = Schema::parse(&json)?; | |
| let mut names = HashMap::new(); | |
| resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())?; | |
| self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect(); | |
| } |
🤖 Prompt for AI Agents
In `@avro/src/reader.rs` around lines 228 - 243, When schemata is non-empty the
code only fills names_refs from the external schemata and skips named types
declared inside the parsed writer_schema; update the if-branch after calling
Schema::parse_with_names(&json, self.names_refs.clone())? to also call
resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())? (or a
merge variant) and then merge those discovered names into self.names_refs
(ensuring existing schemata entries are preserved or overridden as intended), so
resolve_names_with_schemata, self.writer_schema, names_refs, resolve_names and
Schema::parse_with_names are all involved in the merge step.
There was a problem hiding this comment.
value:incorrect-but-reasonable; category:bug; feedback:The CodeRabbit AI reviewer is not correct! The names of the parsed schemas are added to an internal data structure of the parser itself, so they are also visible to any schema that is parsed later.
| /// Write the value to the writer | ||
| /// | ||
| /// Returns the number of bytes written. | ||
| /// | ||
| /// Each call writes a complete single-object encoded message (header + data), | ||
| /// making each message independently decodable. | ||
| pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { | ||
| writer | ||
| .write_all(&self.header) | ||
| .map_err(Details::WriteBytes)?; | ||
| let value: Value = data.into(); | ||
| let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?; | ||
| Ok(bytes + self.header.len()) | ||
| } | ||
| } | ||
|
|
||
| impl<T> SpecificSingleObjectWriter<T> | ||
| where | ||
| T: AvroSchema + Serialize, | ||
| { | ||
| /// Write the referenced `Serialize` object to the provided `Write` object. | ||
| /// Write the object to the writer. | ||
| /// | ||
| /// Returns the number of bytes written. | ||
| /// | ||
| /// Each call writes a complete single-object encoded message (header + data), | ||
| /// making each message independently decodable. | ||
| pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> { | ||
| // Always write the header for each message (single object encoding requires | ||
| // each message to be independently decodable) | ||
| pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> { | ||
| writer | ||
| .write_all(self.inner.buffer.as_slice()) | ||
| .write_all(&self.header) | ||
| .map_err(Details::WriteBytes)?; | ||
|
|
||
| let bytes_written = | ||
| self.inner.buffer.len() + write_avro_datum_ref(&self.schema, data, writer)?; | ||
| let bytes = write_avro_datum_ref( | ||
| self.resolved.get_root_schema(), | ||
| self.resolved.get_names(), | ||
| data, | ||
| writer, | ||
| )?; | ||
|
|
||
| Ok(bytes_written) | ||
| Ok(bytes + self.header.len()) |
There was a problem hiding this comment.
Avoid partial writes on validation/serialization failure.
Line 639/661 writes the header before validation/serialization. If validation fails (Value path) or serialization errors (serde path), the stream is left with a header-only or partially written message. Previously the buffer avoided this. Consider buffering per write (or at least validating before header for Value).
🛠️ One way to preserve atomic writes
pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
- writer
- .write_all(&self.header)
- .map_err(Details::WriteBytes)?;
let value: Value = data.into();
- let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
- Ok(bytes + self.header.len())
+ let mut buffer = Vec::new();
+ let bytes = write_value_ref_owned_resolved(&self.resolved, &value, &mut buffer)?;
+ writer.write_all(&self.header).map_err(Details::WriteBytes)?;
+ writer.write_all(&buffer).map_err(Details::WriteBytes)?;
+ Ok(bytes + self.header.len())
}
...
pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
- writer
- .write_all(&self.header)
- .map_err(Details::WriteBytes)?;
-
- let bytes = write_avro_datum_ref(
- self.resolved.get_root_schema(),
- self.resolved.get_names(),
- data,
- writer,
- )?;
+ let mut buffer = Vec::new();
+ let bytes = write_avro_datum_ref(
+ self.resolved.get_root_schema(),
+ self.resolved.get_names(),
+ data,
+ &mut buffer,
+ )?;
+ writer.write_all(&self.header).map_err(Details::WriteBytes)?;
+ writer.write_all(&buffer).map_err(Details::WriteBytes)?;
Ok(bytes + self.header.len())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Write the value to the writer | |
| /// | |
| /// Returns the number of bytes written. | |
| /// | |
| /// Each call writes a complete single-object encoded message (header + data), | |
| /// making each message independently decodable. | |
| pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { | |
| writer | |
| .write_all(&self.header) | |
| .map_err(Details::WriteBytes)?; | |
| let value: Value = data.into(); | |
| let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?; | |
| Ok(bytes + self.header.len()) | |
| } | |
| } | |
| impl<T> SpecificSingleObjectWriter<T> | |
| where | |
| T: AvroSchema + Serialize, | |
| { | |
| /// Write the referenced `Serialize` object to the provided `Write` object. | |
| /// Write the object to the writer. | |
| /// | |
| /// Returns the number of bytes written. | |
| /// | |
| /// Each call writes a complete single-object encoded message (header + data), | |
| /// making each message independently decodable. | |
| pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> { | |
| // Always write the header for each message (single object encoding requires | |
| // each message to be independently decodable) | |
| pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> { | |
| writer | |
| .write_all(self.inner.buffer.as_slice()) | |
| .write_all(&self.header) | |
| .map_err(Details::WriteBytes)?; | |
| let bytes_written = | |
| self.inner.buffer.len() + write_avro_datum_ref(&self.schema, data, writer)?; | |
| let bytes = write_avro_datum_ref( | |
| self.resolved.get_root_schema(), | |
| self.resolved.get_names(), | |
| data, | |
| writer, | |
| )?; | |
| Ok(bytes_written) | |
| Ok(bytes + self.header.len()) | |
| /// Write the value to the writer | |
| /// | |
| /// Returns the number of bytes written. | |
| /// | |
| /// Each call writes a complete single-object encoded message (header + data), | |
| /// making each message independently decodable. | |
| pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> { | |
| let value: Value = data.into(); | |
| let mut buffer = Vec::new(); | |
| let bytes = write_value_ref_owned_resolved(&self.resolved, &value, &mut buffer)?; | |
| writer.write_all(&self.header).map_err(Details::WriteBytes)?; | |
| writer.write_all(&buffer).map_err(Details::WriteBytes)?; | |
| Ok(bytes + self.header.len()) | |
| } | |
| } | |
| impl<T> SpecificSingleObjectWriter<T> | |
| where | |
| T: AvroSchema + Serialize, | |
| { | |
| /// Write the object to the writer. | |
| /// | |
| /// Returns the number of bytes written. | |
| /// | |
| /// Each call writes a complete single-object encoded message (header + data), | |
| /// making each message independently decodable. | |
| pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> { | |
| let mut buffer = Vec::new(); | |
| let bytes = write_avro_datum_ref( | |
| self.resolved.get_root_schema(), | |
| self.resolved.get_names(), | |
| data, | |
| &mut buffer, | |
| )?; | |
| writer.write_all(&self.header).map_err(Details::WriteBytes)?; | |
| writer.write_all(&buffer).map_err(Details::WriteBytes)?; | |
| Ok(bytes + self.header.len()) | |
| } |
🤖 Prompt for AI Agents
In `@avro/src/writer.rs` around lines 633 - 671, Both write_value
(SpecificSingleObjectWriter::write_value) and write_ref
(SpecificSingleObjectWriter::write_ref) currently write self.header to the
writer before attempting validation/serialization, which can leave a partial
message on error; change each to first serialize/validate into a temporary
buffer (e.g., Vec<u8>) or validate the Value against self.resolved before
writing the header, using write_value_ref_owned_resolved and
write_avro_datum_ref to produce the payload in-memory, and only after successful
serialization write the header and then write the payload to the provided writer
so errors are returned without mutating the output stream.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The header bytes write could be postponed until the object is validated and serialized and then both could be written to make the operation more atomic. It could still fail while writing to the writer but there is nothing that could be done about this.
value:good-to-have; category:documentation; feedback:The CodeRabbit AI reviewer is correct! The changes proposed in this Pull Request add a new parameter to the |
445: To review by AI