Skip to content

Commit 7219290

Browse files
authored
Merge branch 'main' into issue-9497-list-json-reader-bench
2 parents 42f3833 + 8c89814 commit 7219290

File tree

4 files changed

+150
-99
lines changed

4 files changed

+150
-99
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ zstd = { version = "0.13", default-features = false, optional = true }
7070
bzip2 = { version = "0.6.0", optional = true }
7171
xz = { package = "liblzma", version = "0.4", default-features = false, optional = true }
7272
crc = { version = "3.0", optional = true }
73-
strum_macros = "0.27"
73+
strum_macros = "0.28"
7474
uuid = "1.17"
7575
indexmap = "2.10"
7676
rand = "0.9"

arrow-avro/benches/project_record.rs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,22 @@ fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) ->
121121
)
122122
}
123123

124-
const READER_SCHEMA: &str = r#"
124+
fn gen_mixed(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
125+
encode_records_with_prefix(
126+
sc,
127+
prefix,
128+
(0..n).map(|i| {
129+
Value::Record(vec![
130+
("f1".into(), Value::Int(rng.random())),
131+
("f2".into(), Value::Long(rng.random())),
132+
("f3".into(), Value::String(format!("name-{i}"))),
133+
("f4".into(), Value::Double(rng.random())),
134+
])
135+
}),
136+
)
137+
}
138+
139+
const SKIP_READER_SCHEMA: &str = r#"
125140
{
126141
"type":"record",
127142
"name":"table",
@@ -175,11 +190,42 @@ const DOUBLE_SCHEMA: &str = r#"
175190
}
176191
"#;
177192

178-
fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
193+
const MIX_SCHEMA: &str = r#"
194+
{
195+
"type":"record",
196+
"name":"Mix",
197+
"fields": [
198+
{ "name": "f1", "type": "int" },
199+
{ "name": "f2", "type": "long" },
200+
{ "name": "f3", "type": "string" },
201+
{ "name": "f4", "type": "double" }
202+
]
203+
}
204+
"#;
205+
206+
// Project the record type writen to MIX_SCHEMA:
207+
// skip "f2" and "f4", add "f5" with a default
208+
const PROJECT_READER_SCHEMA: &str = r#"
209+
{
210+
"type":"record",
211+
"name":"Mix",
212+
"fields": [
213+
{ "name": "f1", "type": "int" },
214+
{ "name": "f3", "type": "string" },
215+
{ "name": "f5", "type": "long", "default": 0 }
216+
]
217+
}
218+
"#;
219+
220+
fn new_decoder(
221+
schema_json: &'static str,
222+
reader_schema_json: &'static str,
223+
batch_size: usize,
224+
) -> Decoder {
179225
let schema = AvroSchema::new(schema_json.to_owned());
180226
let mut store = SchemaStore::new();
181227
store.register(schema).unwrap();
182-
let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
228+
let reader_schema = AvroSchema::new(reader_schema_json.to_owned());
183229
ReaderBuilder::new()
184230
.with_writer_schema_store(store)
185231
.with_batch_size(batch_size)
@@ -215,19 +261,24 @@ fn bench_with_decoder<F>(
215261
fn criterion_benches(c: &mut Criterion) {
216262
let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
217263
bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
218-
new_decoder(INT_SCHEMA, BATCH_SIZE)
264+
new_decoder(INT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
219265
});
220266
let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
221267
bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
222-
new_decoder(LONG_SCHEMA, BATCH_SIZE)
268+
new_decoder(LONG_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
223269
});
224270
let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
225271
bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
226-
new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
272+
new_decoder(FLOAT_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
227273
});
228274
let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
229275
bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
230-
new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
276+
new_decoder(DOUBLE_SCHEMA, SKIP_READER_SCHEMA, BATCH_SIZE)
277+
});
278+
279+
let data = gen_avro_data_with(MIX_SCHEMA, NUM_ROWS, gen_mixed);
280+
bench_with_decoder(c, "project_primitives", &data, NUM_ROWS, || {
281+
new_decoder(MIX_SCHEMA, PROJECT_READER_SCHEMA, BATCH_SIZE)
231282
});
232283
}
233284

arrow-avro/src/codec.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,20 @@ pub(crate) enum AvroLiteral {
8484
/// Contains the necessary information to resolve a writer's record against a reader's record schema.
8585
#[derive(Debug, Clone, PartialEq)]
8686
pub(crate) struct ResolvedRecord {
87-
/// Maps a writer's field index to the corresponding reader's field index.
88-
/// `None` if the writer's field is not present in the reader's schema.
89-
pub(crate) writer_to_reader: Arc<[Option<usize>]>,
87+
/// Maps a writer's field index to the field's resolution against the reader's schema.
88+
pub(crate) writer_fields: Arc<[ResolvedField]>,
9089
/// A list of indices in the reader's schema for fields that have a default value.
9190
pub(crate) default_fields: Arc<[usize]>,
91+
}
92+
93+
/// Resolution information for record fields in the writer schema.
94+
#[derive(Debug, Clone, PartialEq)]
95+
pub(crate) enum ResolvedField {
96+
/// Resolves to a field indexed in the reader schema.
97+
ToReader(usize),
9298
/// For fields present in the writer's schema but not the reader's, this stores their data type.
9399
/// This is needed to correctly skip over these fields during deserialization.
94-
pub(crate) skip_fields: Arc<[Option<AvroDataType>]>,
100+
Skip(AvroDataType),
95101
}
96102

97103
/// Defines the type of promotion to be applied during schema resolution.
@@ -2281,24 +2287,27 @@ impl<'a> Maker<'a> {
22812287
data_type: dt,
22822288
});
22832289
}
2284-
// Build skip_fields in writer order; pre-size and push.
2285-
let mut skip_fields: Vec<Option<AvroDataType>> =
2286-
Vec::with_capacity(writer_record.fields.len());
2287-
for (writer_index, writer_field) in writer_record.fields.iter().enumerate() {
2288-
if writer_to_reader[writer_index].is_some() {
2289-
skip_fields.push(None);
2290-
} else {
2291-
skip_fields.push(Some(self.parse_type(&writer_field.r#type, writer_ns)?));
2292-
}
2293-
}
2290+
// Build writer field map.
2291+
let writer_fields = writer_record
2292+
.fields
2293+
.iter()
2294+
.enumerate()
2295+
.map(|(writer_index, writer_field)| {
2296+
if let Some(reader_index) = writer_to_reader[writer_index] {
2297+
Ok(ResolvedField::ToReader(reader_index))
2298+
} else {
2299+
let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
2300+
Ok(ResolvedField::Skip(dt))
2301+
}
2302+
})
2303+
.collect::<Result<_, ArrowError>>()?;
22942304
let resolved = AvroDataType::new_with_resolution(
22952305
Codec::Struct(Arc::from(reader_fields)),
22962306
reader_md,
22972307
None,
22982308
Some(ResolutionInfo::Record(ResolvedRecord {
2299-
writer_to_reader: Arc::from(writer_to_reader),
2309+
writer_fields,
23002310
default_fields: Arc::from(default_fields),
2301-
skip_fields: Arc::from(skip_fields),
23022311
})),
23032312
);
23042313
// Register a resolved record by reader name+namespace for potential named type refs.
@@ -2792,16 +2801,13 @@ mod tests {
27922801
};
27932802
match resolution {
27942803
ResolutionInfo::Record(ResolvedRecord {
2795-
writer_to_reader,
2804+
writer_fields,
27962805
default_fields,
2797-
skip_fields,
27982806
}) => {
2799-
assert_eq!(writer_to_reader.len(), 1);
2800-
assert_eq!(writer_to_reader[0], Some(0));
2807+
assert_eq!(writer_fields.len(), 1);
2808+
assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
28012809
assert_eq!(default_fields.len(), 1);
28022810
assert_eq!(default_fields[0], 1);
2803-
assert_eq!(skip_fields.len(), 1);
2804-
assert_eq!(skip_fields[0], None);
28052811
}
28062812
other => panic!("unexpected resolution {other:?}"),
28072813
}
@@ -2888,16 +2894,13 @@ mod tests {
28882894
};
28892895
match resolution {
28902896
ResolutionInfo::Record(ResolvedRecord {
2891-
writer_to_reader,
2897+
writer_fields,
28922898
default_fields,
2893-
skip_fields,
28942899
}) => {
2895-
assert_eq!(writer_to_reader.len(), 1);
2896-
assert_eq!(writer_to_reader[0], Some(0));
2900+
assert_eq!(writer_fields.len(), 1);
2901+
assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
28972902
assert_eq!(default_fields.len(), 1);
28982903
assert_eq!(default_fields[0], 1);
2899-
assert_eq!(skip_fields.len(), 1);
2900-
assert_eq!(skip_fields[0], None);
29012904
}
29022905
other => panic!("unexpected resolution {other:?}"),
29032906
}
@@ -3714,11 +3717,18 @@ mod tests {
37143717
Some(ResolutionInfo::Record(ref r)) => r.clone(),
37153718
other => panic!("expected record resolution, got {other:?}"),
37163719
};
3717-
assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
3720+
assert!(matches!(
3721+
&rec.writer_fields[..],
3722+
&[
3723+
ResolvedField::ToReader(1),
3724+
ResolvedField::Skip(_),
3725+
ResolvedField::ToReader(0),
3726+
]
3727+
));
37183728
assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
3719-
assert!(rec.skip_fields[0].is_none());
3720-
assert!(rec.skip_fields[2].is_none());
3721-
let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
3729+
let ResolvedField::Skip(skip1) = &rec.writer_fields[1] else {
3730+
panic!("should skip field 1")
3731+
};
37223732
assert!(matches!(skip1.codec(), Codec::Utf8));
37233733
let name_md = &fields[2].data_type().metadata;
37243734
assert_eq!(

0 commit comments

Comments
 (0)