499: feat: Replace to_datum* functions with GenericDatumWriter#60
499: feat: Replace to_datum* functions with GenericDatumWriter#60martin-augment wants to merge 2 commits intomainfrom
to_datum* functions with GenericDatumWriter#60Conversation
WalkthroughThis change refactors the Avro writing API by introducing a new public ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the Avro data writing mechanism by introducing a new Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a great feature enhancement, replacing the standalone to_datum* functions with a more idiomatic and performant GenericDatumWriter. The introduction of new benchmarks to showcase the performance improvements is also a valuable addition. I've found a few potential issues related to namespace handling during serialization which could cause problems with schemas that use relative name resolution. My comments below provide specific suggestions to address these.
Note: Security Review did not run due to the size of the PR.
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | ||
| if self.validate | ||
| && self.resolved.get_schemata().iter().all(|s| { | ||
| value | ||
| .validate_internal(s, self.resolved.get_names(), None) | ||
| .is_some() | ||
| }) | ||
| { | ||
| return Err(Details::Validation.into()); | ||
| } | ||
| encode_internal(value, self.schema, self.resolved.get_names(), None, writer) | ||
| } |
There was a problem hiding this comment.
The calls to validate_internal and encode_internal are missing the enclosing namespace, which is hardcoded to None. This can lead to errors when resolving nested schemas that use relative names. The namespace of the schema being used should be passed to ensure correct name resolution.
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | |
| if self.validate | |
| && self.resolved.get_schemata().iter().all(|s| { | |
| value | |
| .validate_internal(s, self.resolved.get_names(), None) | |
| .is_some() | |
| }) | |
| { | |
| return Err(Details::Validation.into()); | |
| } | |
| encode_internal(value, self.schema, self.resolved.get_names(), None, writer) | |
| } | |
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | |
| if self.validate | |
| && self.resolved.get_schemata().iter().all(|s| { | |
| value | |
| .validate_internal(s, self.resolved.get_names(), s.namespace()) | |
| .is_some() | |
| }) | |
| { | |
| return Err(Details::Validation.into()); | |
| } | |
| encode_internal(value, self.schema, self.resolved.get_names(), self.schema.namespace(), writer) | |
| } |
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Instead of passing None as an enclosing namespace the main schema's namespace should be used instead. This will help with the resolving of any schema references used in record fields.
| let mut serializer = | ||
| SchemaAwareWriteSerializer::new(writer, self.schema, self.resolved.get_names(), None); | ||
| value.serialize(&mut serializer) |
There was a problem hiding this comment.
The enclosing_namespace parameter to SchemaAwareWriteSerializer::new is hardcoded to None. It should be self.schema.namespace() to correctly resolve relative names in schemas.
| let mut serializer = | |
| SchemaAwareWriteSerializer::new(writer, self.schema, self.resolved.get_names(), None); | |
| value.serialize(&mut serializer) | |
| let mut serializer = | |
| SchemaAwareWriteSerializer::new(writer, self.schema, self.resolved.get_names(), self.schema.namespace()); | |
| value.serialize(&mut serializer) |
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Instead of passing None as an enclosing namespace the main schema's namespace should be used instead. This will help with the resolving of any schema references used in record fields.
| let mut serializer = SchemaAwareWriteSerializer::new( | ||
| writer, | ||
| self.resolved.get_root_schema(), | ||
| self.resolved.get_names(), | ||
| data, | ||
| writer, | ||
| )?; | ||
| None, | ||
| ); | ||
| let bytes = data.serialize(&mut serializer)?; |
There was a problem hiding this comment.
The enclosing_namespace parameter to SchemaAwareWriteSerializer::new is hardcoded to None. It should be self.resolved.get_root_schema().namespace() to correctly resolve relative names in schemas.
| let mut serializer = SchemaAwareWriteSerializer::new( | |
| writer, | |
| self.resolved.get_root_schema(), | |
| self.resolved.get_names(), | |
| data, | |
| writer, | |
| )?; | |
| None, | |
| ); | |
| let bytes = data.serialize(&mut serializer)?; | |
| let mut serializer = SchemaAwareWriteSerializer::new( | |
| writer, | |
| self.resolved.get_root_schema(), | |
| self.resolved.get_names(), | |
| self.resolved.get_root_schema().namespace(), | |
| ); | |
| let bytes = data.serialize(&mut serializer)?; |
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Instead of passing None as an enclosing namespace the main schema's namespace should be used instead. This will help with the resolving of any schema references used in record fields.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| }) | ||
| { | ||
| return Err(Details::Validation.into()); | ||
| } |
There was a problem hiding this comment.
Validation checks all schemata instead of primary schema
Medium Severity
The write_value_ref validation iterates over self.resolved.get_schemata() (all schemata) with .all(), meaning it only rejects a value if it's invalid for every schema. The old write_avro_datum_schemata validated only against the primary schema (self.schema). When auxiliary schemata are provided (via .schemata()), a value that's invalid for the primary schema but coincidentally valid for an auxiliary schema will bypass validation and be encoded with the wrong schema, potentially producing corrupt data. The check needs to validate against self.schema, not all resolved schemata.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! The value should be validated against the main schema by using the names of all additional schemata, not against all schemata. This will return the correct error reason when there is a problem. Prevents extra calculations to validate against unrelated schemas and returning wrong reasons for the error.
PR Review: Replace
|
| Category | Issue |
|---|---|
| 🐛 Bug | enclosing_namespace not passed to encode_internal/validate_internal |
| 🐛 Bug | Validation may skip the primary schema when using .schemata() |
| 📉 Regression | Details::ValidationWithReason dropped, only Details::Validation returned |
| 📝 Style | Missing doc comments on 3 public methods |
| 💡 Minor | GenericDatumWriter not re-exported from crate root, inconsistent with other writer types |
The core abstraction is well-designed and the refactor is otherwise clean. Fixing the namespace and primary-schema validation issues is the main blocker before this is safe to merge.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
avro/benches/single.rs (1)
199-234: Reduce allocation noise in reused-writer benchmarks.The reused/no-validation paths still allocate a fresh
Veceach iteration, which can dominate the measurement and blur writer-reuse gains. Reusing a buffer per benchmark gives cleaner writer-focused numbers.♻️ Suggested benchmark adjustment
fn bench_small_schema_write_record_reuse_datum_writer(c: &mut Criterion) { let (schema, record) = make_small_record().unwrap(); let writer = GenericDatumWriter::builder(&schema).build().unwrap(); + let mut buf = Vec::new(); c.bench_function("small record (reused writer)", |b| { - b.iter(|| writer.write_value_ref(&mut Vec::new(), &record)) + b.iter(|| { + buf.clear(); + writer.write_value_ref(&mut buf, &record).unwrap() + }) }); } fn bench_big_schema_write_record_reuse_datum_writer(c: &mut Criterion) { let (schema, record) = make_big_record().unwrap(); let writer = GenericDatumWriter::builder(&schema).build().unwrap(); + let mut buf = Vec::new(); c.bench_function("big record (reused writer)", |b| { - b.iter(|| writer.write_value_ref(&mut Vec::new(), &record)) + b.iter(|| { + buf.clear(); + writer.write_value_ref(&mut buf, &record).unwrap() + }) }); } fn bench_small_schema_write_record_no_validation(c: &mut Criterion) { let (schema, record) = make_small_record().unwrap(); let writer = GenericDatumWriter::builder(&schema) .validate(false) .build() .unwrap(); + let mut buf = Vec::new(); c.bench_function("small record (no validation)", |b| { - b.iter(|| writer.write_value_ref(&mut Vec::new(), &record)) + b.iter(|| { + buf.clear(); + writer.write_value_ref(&mut buf, &record).unwrap() + }) }); } fn bench_big_schema_write_record_no_validation(c: &mut Criterion) { let (schema, record) = make_big_record().unwrap(); let writer = GenericDatumWriter::builder(&schema) .validate(false) .build() .unwrap(); + let mut buf = Vec::new(); c.bench_function("big record (no validation)", |b| { - b.iter(|| writer.write_value_ref(&mut Vec::new(), &record)) + b.iter(|| { + buf.clear(); + writer.write_value_ref(&mut buf, &record).unwrap() + }) }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@avro/benches/single.rs` around lines 199 - 234, Benchmarks allocate a fresh Vec in each iteration which adds allocation noise; for the four functions bench_small_schema_write_record_reuse_datum_writer, bench_big_schema_write_record_reuse_datum_writer, bench_small_schema_write_record_no_validation, and bench_big_schema_write_record_no_validation, create a single mutable buffer (e.g., let mut buf = Vec::new()) outside the b.iter closure and inside the closure call buf.clear() then pass &mut buf to writer.write_value_ref so the buffer is reused across iterations and removes allocation noise from the measurement.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@avro/src/writer/datum.rs`:
- Around line 88-99: The validation in write_value_ref is using
self.resolved.get_schemata().iter().all(...) which lets values pass if any
auxiliary schema accepts them; instead validate only against the writer schema.
Replace the current all(...) check with a direct call to
value.validate_internal(self.schema, self.resolved.get_names(), None) and, if
that returns Some(...), return the Validation error; leave the subsequent call
to encode_internal(value, self.schema, ...) unchanged. This targets
write_value_ref, self.schema, self.resolved.get_schemata(), and
value.validate_internal to ensure validation is performed against the writer
schema only.
---
Nitpick comments:
In `@avro/benches/single.rs`:
- Around line 199-234: Benchmarks allocate a fresh Vec in each iteration which
adds allocation noise; for the four functions
bench_small_schema_write_record_reuse_datum_writer,
bench_big_schema_write_record_reuse_datum_writer,
bench_small_schema_write_record_no_validation, and
bench_big_schema_write_record_no_validation, create a single mutable buffer
(e.g., let mut buf = Vec::new()) outside the b.iter closure and inside the
closure call buf.clear() then pass &mut buf to writer.write_value_ref so the
buffer is reused across iterations and removes allocation noise from the
measurement.
ℹ️ Review info
Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5827b7eb-13bc-4b90-a71d-ff126deedf23
📒 Files selected for processing (11)
avro/benches/single.rsavro/src/lib.rsavro/src/schema/mod.rsavro/src/serde/de.rsavro/src/writer/datum.rsavro/src/writer/single_object.rsavro/tests/avro-3786.rsavro/tests/avro-3787.rsavro/tests/io.rsavro/tests/schema.rsavro/tests/to_from_avro_datum_schemata.rs
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | ||
| if self.validate | ||
| && self.resolved.get_schemata().iter().all(|s| { | ||
| value | ||
| .validate_internal(s, self.resolved.get_names(), None) | ||
| .is_some() | ||
| }) | ||
| { | ||
| return Err(Details::Validation.into()); | ||
| } | ||
| encode_internal(value, self.schema, self.resolved.get_names(), None, writer) | ||
| } |
There was a problem hiding this comment.
Validate against the writer schema only; current all(...) check can bypass validation.
Line 90 currently checks every resolved schema and only fails if all validations fail. That means a value valid for any auxiliary schema can pass, even when invalid for self.schema, then Line 98 encodes against self.schema anyway.
🐛 Suggested fix
pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> {
- if self.validate
- && self.resolved.get_schemata().iter().all(|s| {
- value
- .validate_internal(s, self.resolved.get_names(), None)
- .is_some()
- })
- {
+ if self.validate
+ && value
+ .validate_internal(
+ self.schema,
+ self.resolved.get_names(),
+ self.schema.namespace(),
+ )
+ .is_some()
+ {
return Err(Details::Validation.into());
}
- encode_internal(value, self.schema, self.resolved.get_names(), None, writer)
+ encode_internal(
+ value,
+ self.schema,
+ self.resolved.get_names(),
+ self.schema.namespace(),
+ writer,
+ )
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | |
| if self.validate | |
| && self.resolved.get_schemata().iter().all(|s| { | |
| value | |
| .validate_internal(s, self.resolved.get_names(), None) | |
| .is_some() | |
| }) | |
| { | |
| return Err(Details::Validation.into()); | |
| } | |
| encode_internal(value, self.schema, self.resolved.get_names(), None, writer) | |
| } | |
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | |
| if self.validate | |
| && value | |
| .validate_internal( | |
| self.schema, | |
| self.resolved.get_names(), | |
| self.schema.namespace(), | |
| ) | |
| .is_some() | |
| { | |
| return Err(Details::Validation.into()); | |
| } | |
| encode_internal( | |
| value, | |
| self.schema, | |
| self.resolved.get_names(), | |
| self.schema.namespace(), | |
| writer, | |
| ) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@avro/src/writer/datum.rs` around lines 88 - 99, The validation in
write_value_ref is using self.resolved.get_schemata().iter().all(...) which lets
values pass if any auxiliary schema accepts them; instead validate only against
the writer schema. Replace the current all(...) check with a direct call to
value.validate_internal(self.schema, self.resolved.get_names(), None) and, if
that returns Some(...), return the Validation error; leave the subsequent call
to encode_internal(value, self.schema, ...) unchanged. This targets
write_value_ref, self.schema, self.resolved.get_schemata(), and
value.validate_internal to ensure validation is performed against the writer
schema only.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The value should be validated against the main schema by using the names of all additional schemata, not against all schemata. This will return the correct error reason when there is a problem. Prevents extra calculations to validate against unrelated schemas and returning wrong reasons for the error.
🤖 Augment PR SummarySummary: This PR introduces Changes:
Technical Notes: The 🤖 Was this summary useful? React with 👍 or 👎 |
| } | ||
|
|
||
| pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> { | ||
| if self.validate |
There was a problem hiding this comment.
Value::validate_internal returns Some(reason) when the value is invalid, but this check only errors when all schemata return Some (and it validates against every schema in ResolvedSchema, including dependencies). That seems likely to let values that are invalid for the writer's root schema slip through validation (or validate against the wrong schema).
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! The value should be validated against the main schema by using the names of all additional schemata, not against all schemata. This will return the correct error reason when there is a problem. Prevents extra calculations to validate against unrelated schemas and returning wrong reasons for the error.
value:useful; category:bug; feedback: The Claude AI reviewer is correct! The value should be validated against the main schema by using the names of all additional schemata, not against all schemata. This will return the correct error reason when there is a problem. Prevents extra calculations to validate against unrelated schemas and returning wrong reasons for the error. |


499: To review by AI