Skip to content

Commit a1bf90c

Browse files
fix: use writer types in Skipper for resolved named record types (#9605)
# Which issue does this PR close? #9655 # Rationale for this change When a writer-only field references a named Avro type that was previously resolved against a reader schema, `parse_type` returns the registered reader-resolved type from the shared resolver. This caused two problems: 1. The Skipper built its struct sub-skippers from the reader's field list, which omits writer-only fields. Their bytes were never consumed, leaving the cursor at the wrong position for all subsequent records. 2. Reader fields carry resolution-induced nullability (e.g. a writer plain `long` matched against a reader `["null", long]` gains `nullability = Some(NullFirst)`). The Skipper read a union-tag byte that was never written, causing "Unexpected EOF" errors. Fix: store the writer's data type in `ResolvedField::ToReader` alongside the reader index. The Skipper's `Codec::Struct` arm now iterates `rec.writer_fields` and uses the writer type from every entry - both `ToReader(_, wdt)` and `Skip(wdt)` - so it always follows the writer's wire format. # What changes are included in this PR? # Are these changes tested? Yes, added unit tests. # Are there any user-facing changes? No
1 parent 48727b3 commit a1bf90c

File tree

3 files changed

+246
-14
lines changed

3 files changed

+246
-14
lines changed

arrow-avro/src/codec.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ pub(crate) struct ResolvedRecord {
9494
#[derive(Debug, Clone, PartialEq)]
9595
pub(crate) enum ResolvedField {
9696
/// Resolves to a field indexed in the reader schema.
97-
ToReader(usize),
97+
/// The `AvroDataType` is the writer's type for this field, used by the Skipper
98+
/// to correctly consume writer bytes when the whole record is being skipped.
99+
ToReader(usize, AvroDataType),
98100
/// For fields present in the writer's schema but not the reader's, this stores their data type.
99101
/// This is needed to correctly skip over these fields during deserialization.
100102
Skip(AvroDataType),
@@ -2341,10 +2343,10 @@ impl<'a> Maker<'a> {
23412343
.iter()
23422344
.enumerate()
23432345
.map(|(writer_index, writer_field)| {
2346+
let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
23442347
if let Some(reader_index) = writer_to_reader[writer_index] {
2345-
Ok(ResolvedField::ToReader(reader_index))
2348+
Ok(ResolvedField::ToReader(reader_index, dt))
23462349
} else {
2347-
let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
23482350
Ok(ResolvedField::Skip(dt))
23492351
}
23502352
})
@@ -2888,7 +2890,7 @@ mod tests {
28882890
default_fields,
28892891
}) => {
28902892
assert_eq!(writer_fields.len(), 1);
2891-
assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2893+
assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
28922894
assert_eq!(default_fields.len(), 1);
28932895
assert_eq!(default_fields[0], 1);
28942896
}
@@ -2981,7 +2983,7 @@ mod tests {
29812983
default_fields,
29822984
}) => {
29832985
assert_eq!(writer_fields.len(), 1);
2984-
assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
2986+
assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, _)));
29852987
assert_eq!(default_fields.len(), 1);
29862988
assert_eq!(default_fields[0], 1);
29872989
}
@@ -3802,9 +3804,9 @@ mod tests {
38023804
assert!(matches!(
38033805
&rec.writer_fields[..],
38043806
&[
3805-
ResolvedField::ToReader(1),
3807+
ResolvedField::ToReader(1, _),
38063808
ResolvedField::Skip(_),
3807-
ResolvedField::ToReader(0),
3809+
ResolvedField::ToReader(0, _),
38083810
]
38093811
));
38103812
assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);

arrow-avro/src/reader/mod.rs

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9590,4 +9590,219 @@ mod test {
95909590
"entire RecordBatch mismatch (schema, all columns, all rows)"
95919591
);
95929592
}
9593+
9594+
// Build Avro OCF bytes whose schema contains a TypeName::Ref
9595+
//
9596+
// Schema written to the OCF header verbatim:
9597+
// ```text
9598+
// Root {
9599+
// ts: Timestamp { seconds: long, nanos: int },
9600+
// extra: Event { time: "Timestamp" } <- TypeName::Ref
9601+
// }
9602+
// ```
9603+
fn make_type_ref_ocf() -> Vec<u8> {
9604+
use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter, types::Value};
9605+
let schema_json = r#"{
9606+
"type": "record", "name": "Root",
9607+
"fields": [
9608+
{"name": "ts", "type": {"type": "record", "name": "Timestamp", "fields": [
9609+
{"name": "seconds", "type": "long"},
9610+
{"name": "nanos", "type": "int"}
9611+
]}},
9612+
{"name": "extra", "type": {"type": "record", "name": "Event", "fields": [
9613+
{"name": "time", "type": "Timestamp"}
9614+
]}}
9615+
]
9616+
}"#;
9617+
let schema = ApacheSchema::parse_str(schema_json).expect("valid schema");
9618+
let mut out = Vec::new();
9619+
{
9620+
let mut writer = ApacheWriter::new(&schema, &mut out);
9621+
let ts_val = |s: i64, n: i32| {
9622+
Value::Record(vec![
9623+
("seconds".into(), Value::Long(s)),
9624+
("nanos".into(), Value::Int(n)),
9625+
])
9626+
};
9627+
// Two rows: ts={1000,100}/extra.time={-1,-1} and ts={2000,200}/extra.time={-2,-2}.
9628+
for (ts_s, ts_n, ex_s, ex_n) in [(1000i64, 100i32, -1i64, -1i32), (2000, 200, -2, -2)] {
9629+
let row = Value::Record(vec![
9630+
("ts".into(), ts_val(ts_s, ts_n)),
9631+
(
9632+
"extra".into(),
9633+
Value::Record(vec![("time".into(), ts_val(ex_s, ex_n))]),
9634+
),
9635+
]);
9636+
writer.append_value_ref(&row).expect("append row");
9637+
}
9638+
writer.flush().expect("flush");
9639+
}
9640+
out
9641+
}
9642+
9643+
// writer-plain / reader-nullable mismatch.
9644+
//
9645+
// The writer schema uses a TypeName::Ref ("Timestamp" referenced in `extra.time`).
9646+
// The reader wraps `ts` in `["null", T]` unions and omits `extra`.
9647+
// The Skipper for `extra.time` resolves "Timestamp" via the resolver and must use
9648+
// the writer's plain field types (long, int) — not the nullable reader types - when
9649+
// consuming bytes. Without the fix, it skips union-encoded fields from plain data,
9650+
// reads the wrong number of bytes, and corrupts row 2's `ts.seconds`.
9651+
#[test]
9652+
fn test_nullable_reader_schema_vs_plain_writer_nested_struct() {
9653+
let bytes = make_type_ref_ocf();
9654+
let reader_schema = AvroSchema::new(
9655+
r#"{"type":"record","name":"Root","fields":[
9656+
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
9657+
{"name":"seconds","type":["null","long"]},
9658+
{"name":"nanos", "type":["null","int"]}
9659+
]}]}
9660+
]}"#
9661+
.to_string(),
9662+
);
9663+
let mut reader = ReaderBuilder::new()
9664+
.with_reader_schema(reader_schema)
9665+
.build(Cursor::new(bytes))
9666+
.expect("reader should build");
9667+
let batch = reader
9668+
.next()
9669+
.expect("should have a batch")
9670+
.expect("reading should succeed");
9671+
assert_eq!(batch.num_rows(), 2);
9672+
let ts = batch
9673+
.column(0)
9674+
.as_any()
9675+
.downcast_ref::<StructArray>()
9676+
.unwrap();
9677+
let seconds = ts
9678+
.column_by_name("seconds")
9679+
.unwrap()
9680+
.as_any()
9681+
.downcast_ref::<Int64Array>()
9682+
.unwrap();
9683+
assert_eq!(seconds.value(0), 1000);
9684+
assert_eq!(seconds.value(1), 2000);
9685+
}
9686+
9687+
// Skipper must consume all writer fields, including writer-only ones.
9688+
//
9689+
// The writer schema uses a TypeName::Ref ("Timestamp" referenced in `extra.time`).
9690+
// The reader requests only `ts.seconds` (no `nanos`, no `extra`).
9691+
// The Skipper for `extra.time` resolves "Timestamp" and must skip both `seconds`
9692+
// and `nanos` bytes. Without the fix it skips only `seconds`, leaving the `nanos`
9693+
// bytes in the buffer and corrupting row 2's `ts.seconds` read.
9694+
#[test]
9695+
fn test_skipper_consumes_writer_only_struct_fields() {
9696+
let bytes = make_type_ref_ocf();
9697+
let reader_schema = AvroSchema::new(
9698+
r#"{"type":"record","name":"Root","fields":[
9699+
{"name":"ts","type":{"type":"record","name":"Timestamp","fields":[
9700+
{"name":"seconds","type":"long"}
9701+
]}}
9702+
]}"#
9703+
.to_string(),
9704+
);
9705+
let mut reader = ReaderBuilder::new()
9706+
.with_reader_schema(reader_schema)
9707+
.build(Cursor::new(bytes))
9708+
.expect("reader should build");
9709+
let batch = reader
9710+
.next()
9711+
.expect("should have a batch")
9712+
.expect("Skipper must consume both seconds and nanos for extra.time");
9713+
assert_eq!(batch.num_rows(), 2);
9714+
let ts = batch
9715+
.column(0)
9716+
.as_any()
9717+
.downcast_ref::<StructArray>()
9718+
.unwrap();
9719+
let seconds = ts
9720+
.column_by_name("seconds")
9721+
.unwrap()
9722+
.as_any()
9723+
.downcast_ref::<Int64Array>()
9724+
.unwrap();
9725+
assert_eq!(seconds.value(0), 1000);
9726+
assert_eq!(seconds.value(1), 2000);
9727+
}
9728+
9729+
// The Skipper for a skipped array field must consume all bytes of each element,
9730+
// including every field of a nested struct resolved via a TypeName::Ref.
9731+
//
9732+
// Writer: `Root { ts: Timestamp{seconds,nanos}, events: array<Event{time:"Timestamp"}> }`
9733+
// Reader: only `ts` with nullable wrappers; `events` is absent (forces a Skip).
9734+
// The Skipper for `events` resolves each element's `time` field as "Timestamp"
9735+
// and must use the writer's plain {seconds,nanos} definition — not the
9736+
// nullable-wrapped reader type — when consuming bytes.
9737+
#[test]
9738+
fn test_skip_array_of_structs_uses_writer_schema_not_resolved() {
9739+
use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter, types::Value};
9740+
let schema_json = r#"{
9741+
"type": "record", "name": "Root",
9742+
"fields": [
9743+
{"name": "ts", "type": {"type": "record", "name": "Timestamp", "fields": [
9744+
{"name": "seconds", "type": "long"},
9745+
{"name": "nanos", "type": "int"}
9746+
]}},
9747+
{"name": "events", "type": {"type": "array", "items": {
9748+
"type": "record", "name": "Event", "fields": [
9749+
{"name": "time", "type": "Timestamp"}
9750+
]
9751+
}}}
9752+
]
9753+
}"#;
9754+
let schema = ApacheSchema::parse_str(schema_json).expect("valid schema");
9755+
let mut bytes = Vec::new();
9756+
{
9757+
let mut writer = ApacheWriter::new(&schema, &mut bytes);
9758+
// One row: ts={100, 5}, events=[{time={200, 1}}]
9759+
let ts_val = |s: i64, n: i32| {
9760+
Value::Record(vec![
9761+
("seconds".into(), Value::Long(s)),
9762+
("nanos".into(), Value::Int(n)),
9763+
])
9764+
};
9765+
let row = Value::Record(vec![
9766+
("ts".into(), ts_val(100, 5)),
9767+
(
9768+
"events".into(),
9769+
Value::Array(vec![Value::Record(vec![("time".into(), ts_val(200, 1))])]),
9770+
),
9771+
]);
9772+
writer.append_value_ref(&row).expect("append row");
9773+
writer.flush().expect("flush");
9774+
}
9775+
9776+
// Reader omits `events` (forces Skip) and wraps `ts` fields in nullable unions.
9777+
let reader_schema = AvroSchema::new(
9778+
r#"{"type":"record","name":"Root","fields":[
9779+
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
9780+
{"name":"seconds","type":["null","long"]},
9781+
{"name":"nanos", "type":["null","int"]}
9782+
]}]}
9783+
]}"#
9784+
.to_string(),
9785+
);
9786+
let mut reader = ReaderBuilder::new()
9787+
.with_reader_schema(reader_schema)
9788+
.build(Cursor::new(bytes))
9789+
.expect("reader should build");
9790+
let batch = reader
9791+
.next()
9792+
.expect("should have a batch")
9793+
.expect("Skipper must consume all events bytes using writer field types");
9794+
assert_eq!(batch.num_rows(), 1);
9795+
let ts = batch
9796+
.column(0)
9797+
.as_any()
9798+
.downcast_ref::<StructArray>()
9799+
.unwrap();
9800+
let seconds = ts
9801+
.column_by_name("seconds")
9802+
.unwrap()
9803+
.as_any()
9804+
.downcast_ref::<Int64Array>()
9805+
.unwrap();
9806+
assert_eq!(seconds.value(0), 100);
9807+
}
95939808
}

arrow-avro/src/reader/record.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,7 +2458,7 @@ impl<'a> ProjectorBuilder<'a> {
24582458
.writer_fields
24592459
.iter()
24602460
.map(|field| match field {
2461-
ResolvedField::ToReader(index) => Ok(FieldProjection::ToReader(*index)),
2461+
ResolvedField::ToReader(index, _) => Ok(FieldProjection::ToReader(*index)),
24622462
ResolvedField::Skip(datatype) => {
24632463
let skipper = Skipper::from_avro(datatype)?;
24642464
Ok(FieldProjection::Skip(skipper))
@@ -2568,12 +2568,27 @@ impl Skipper {
25682568
Codec::Uuid => Self::UuidString, // encoded as string
25692569
Codec::Enum(_) => Self::Enum,
25702570
Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2571-
Codec::Struct(fields) => Self::Struct(
2572-
fields
2573-
.iter()
2574-
.map(|f| Skipper::from_avro(f.data_type()))
2575-
.collect::<Result<_, _>>()?,
2576-
),
2571+
Codec::Struct(fields) => {
2572+
if let Some(ResolutionInfo::Record(rec)) = dt.resolution.as_ref() {
2573+
Self::Struct(
2574+
rec.writer_fields
2575+
.iter()
2576+
.map(|wf| match wf {
2577+
ResolvedField::ToReader(_, wdt) | ResolvedField::Skip(wdt) => {
2578+
Skipper::from_avro(wdt)
2579+
}
2580+
})
2581+
.collect::<Result<_, _>>()?,
2582+
)
2583+
} else {
2584+
Self::Struct(
2585+
fields
2586+
.iter()
2587+
.map(|f| Skipper::from_avro(f.data_type()))
2588+
.collect::<Result<_, _>>()?,
2589+
)
2590+
}
2591+
}
25772592
Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
25782593
Codec::Interval => Self::DurationFixed12,
25792594
Codec::Union(encodings, _, _) => {

0 commit comments

Comments
 (0)