Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ md5 = ["dep:md5"]
sha256 = ["dep:sha2"]
small_decimals = []
avro_custom_types = ["dep:arrow-select"]
avro_1_12 = []

# Enable async APIs
async = ["futures", "tokio"]
Expand Down
78 changes: 64 additions & 14 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,52 @@ impl AvroDataType {
self.nullability
}

// Returns `Ok` if this data type accepts a JSON null default value,
// according to Avro schema rules prior to spec version 1.12, otherwise
// returns an `Err` with a schema error.
// Prior to 1.12, Avro only allowed default values matching the first branch of a union.
#[cfg(not(feature = "avro_1_12"))]
fn validate_null_default(&self) -> Result<(), ArrowError> {
match self.codec() {
Codec::Null => Ok(()),
Codec::Union(encodings, _, _)
if encodings
.first()
.map_or(false, |enc| matches!(enc.codec(), Codec::Null)) =>
{
Ok(())
}
_ if self.nullability() == Some(Nullability::NullFirst) => Ok(()),
_ => Err(ArrowError::SchemaError(
"JSON null default is only valid for `null` type or for a union whose first branch is `null`"
.to_string(),
)),
}
}

// Returns `Ok` if this data type accepts a JSON null default value,
// according to Avro schema rules for spec version 1.12 and later, otherwise
// returns an `Err` with a schema error.
// Since 1.12, Avro allows default values matching any branch of a union.
#[cfg(feature = "avro_1_12")]
fn validate_null_default(&self) -> Result<(), ArrowError> {
match self.codec() {
Codec::Null => Ok(()),
Codec::Union(encodings, _, _)
if encodings
.iter()
.any(|enc| matches!(enc.codec(), Codec::Null)) =>
{
Ok(())
}
_ if self.nullability().is_some() => Ok(()),
_ => Err(ArrowError::SchemaError(
"JSON null default is only valid for `null` type or for a union with a `null` branch"
.to_string(),
)),
}
}

#[inline]
fn parse_default_literal(&self, default_json: &Value) -> Result<AvroLiteral, ArrowError> {
fn expect_string<'v>(
Expand Down Expand Up @@ -313,21 +359,10 @@ impl AvroDataType {
}
}

// Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST
// Handle JSON nulls per the spec rules
if default_json.is_null() {
return match self.codec() {
Codec::Null => Ok(AvroLiteral::Null),
Codec::Union(encodings, _, _) if !encodings.is_empty()
&& matches!(encodings[0].codec(), Codec::Null) =>
{
Ok(AvroLiteral::Null)
}
_ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null),
_ => Err(ArrowError::SchemaError(
"JSON null default is only valid for `null` type or for a union whose first branch is `null`"
.to_string(),
)),
};
self.validate_null_default()?;
return Ok(AvroLiteral::Null);
}
let lit = match self.codec() {
Codec::Null => {
Expand Down Expand Up @@ -3282,6 +3317,11 @@ mod tests {
let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
assert_eq!(lit2, AvroLiteral::Null);
assert_default_stored(&dt_int_nf, &Value::Null);
}

#[cfg(not(feature = "avro_1_12"))]
#[test]
fn test_validate_and_store_default_null_and_nullability_rules_avro_1_11() {
let mut dt_int_ns =
AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
Expand All @@ -3292,6 +3332,16 @@ mod tests {
);
}

#[cfg(feature = "avro_1_12")]
#[test]
fn test_validate_and_store_default_null_and_nullability_rules_avro_1_12() {
let mut dt_int_ns =
AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond));
let lit3 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap();
assert_eq!(lit3, AvroLiteral::Null);
assert_default_stored(&dt_int_ns, &Value::Null);
}

#[test]
fn test_validate_and_store_default_primitives_and_temporal() {
let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None);
Expand Down
30 changes: 30 additions & 0 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3091,6 +3091,36 @@ mod test {
);
}

#[cfg(feature = "avro_1_12")]
#[test]
fn test_schema_resolution_defaults_cases_supported_by_avro_1_12() {
let path = "test/data/skippable_types.avro";
let reader_schema = make_reader_schema_with_default_fields(
path,
vec![
serde_json::json!({"name":"d_nullable_null_second","type":["int","null"],"default":null}),
],
);
let actual = read_alltypes_with_reader_schema(path, reader_schema);
let num_rows = actual.num_rows();
assert!(num_rows > 0, "skippable_types.avro should contain rows");
assert_eq!(
actual.num_columns(),
1,
"expected exactly our defaulted fields"
);
let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(22);
arrays.push(Arc::new(Int32Array::from_iter(std::iter::repeat_n(
None::<i32>,
num_rows,
))));
let expected = RecordBatch::try_new(actual.schema(), arrays).unwrap();
assert_eq!(
actual, expected,
"defaults should materialize correctly for all fields"
);
}

#[test]
fn test_schema_resolution_default_enum_invalid_symbol_errors() {
let path = "test/data/skippable_types.avro";
Expand Down
Loading