Skip to content

Commit 9175874

Browse files
committed
fix: handle Avro reader schema with no fields
In the degenerate case when the Avro reader schema has no fields, the RecordDecoder should be able to produce empty record batches with the number of rows counted from the data.
1 parent 471f6c3 commit 9175874

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

arrow-avro/src/reader/async_reader/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,6 +1532,38 @@ mod tests {
15321532
assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
15331533
}
15341534

1535+
#[tokio::test]
1536+
async fn test_alltypes_with_empty_schema_large_batch() {
1537+
// With an empty reader schema -- should count rows but produce no columns
1538+
let file = arrow_test_data("avro/alltypes_plain.avro");
1539+
let schema = Arc::new(Schema::new(Vec::<Field>::new()));
1540+
let batches = read_async_file(&file, 1024, None, Some(schema), None)
1541+
.await
1542+
.unwrap();
1543+
assert_eq!(batches.len(), 1);
1544+
let batch = &batches[0];
1545+
1546+
assert_eq!(batch.num_rows(), 8);
1547+
assert_eq!(batch.num_columns(), 0);
1548+
}
1549+
1550+
#[tokio::test]
1551+
async fn test_alltypes_with_empty_schema_small_batch() {
1552+
// With an empty reader schema -- should count rows but produce no columns
1553+
let file = arrow_test_data("avro/alltypes_plain.avro");
1554+
let schema = Arc::new(Schema::new(Vec::<Field>::new()));
1555+
let batches = read_async_file(&file, 5, None, Some(schema), None)
1556+
.await
1557+
.unwrap();
1558+
1559+
assert_eq!(batches.len(), 2);
1560+
1561+
assert_eq!(batches[0].num_rows(), 5);
1562+
assert_eq!(batches[0].num_columns(), 0);
1563+
assert_eq!(batches[1].num_rows(), 3);
1564+
assert_eq!(batches[1].num_columns(), 0);
1565+
}
1566+
15351567
#[tokio::test]
15361568
async fn test_nested_no_schema_no_projection() {
15371569
// No reader schema, no projection
@@ -1597,6 +1629,31 @@ mod tests {
15971629
assert_eq!(batch.schema().field(2).name(), "f1");
15981630
}
15991631

1632+
#[tokio::test]
1633+
async fn test_nested_with_empty_schema() {
1634+
// With an empty reader schema -- should count rows but produce no columns
1635+
let file = arrow_test_data("avro/nested_records.avro");
1636+
let schema = Arc::new(
1637+
Schema::new(Vec::<Field>::new()).with_metadata(HashMap::from([(
1638+
SCHEMA_METADATA_KEY.into(),
1639+
r#"{
1640+
"type": "record",
1641+
"namespace": "ns1",
1642+
"name": "record1",
1643+
"fields": []
1644+
}"#
1645+
.to_owned(),
1646+
)])),
1647+
);
1648+
let batches = read_async_file(&file, 1024, None, Some(schema), None)
1649+
.await
1650+
.unwrap();
1651+
let batch = &batches[0];
1652+
1653+
assert_eq!(batch.num_rows(), 2);
1654+
assert_eq!(batch.num_columns(), 0);
1655+
}
1656+
16001657
#[tokio::test]
16011658
async fn test_projection_error_out_of_bounds() {
16021659
let file = arrow_test_data("avro/alltypes_plain.avro");

arrow-avro/src/reader/record.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub(crate) struct RecordDecoder {
9494
schema: SchemaRef,
9595
fields: Vec<Decoder>,
9696
projector: Option<Projector>,
97+
row_count: usize,
9798
}
9899

99100
impl RecordDecoder {
@@ -136,6 +137,7 @@ impl RecordDecoder {
136137
schema: Arc::new(ArrowSchema::new(arrow_fields)),
137138
fields: encodings,
138139
projector,
140+
row_count: 0,
139141
})
140142
}
141143
other => Err(AvroError::ParseError(format!(
@@ -166,6 +168,7 @@ impl RecordDecoder {
166168
}
167169
}
168170
}
171+
self.row_count += count;
169172
Ok(cursor.position())
170173
}
171174

@@ -176,7 +179,10 @@ impl RecordDecoder {
176179
.iter_mut()
177180
.map(|x| x.flush(None))
178181
.collect::<Result<Vec<_>, _>>()?;
179-
RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
182+
let batch_options = RecordBatchOptions::new().with_row_count(Some(self.row_count));
183+
self.row_count = 0;
184+
RecordBatch::try_new_with_options(self.schema.clone(), arrays, &batch_options)
185+
.map_err(Into::into)
180186
}
181187
}
182188

0 commit comments

Comments
 (0)