Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! - Detection of single-object encoding with embedded schema
//! (schema lookup is not implemented; decoding is not supported)
//! - Schema-based decoding using a provided Avro schema
//! - Confluent Schema Registry wire format (magic byte 0x00 + 4-byte schema ID prefix)
//!
//! This operator is designed to be generic and reusable across different Avro schemas.

Expand Down Expand Up @@ -142,10 +143,38 @@ fn base64_encode(bytes: &[u8]) -> String {
result
}

/// Attempts to parse Avro data with the provided schema
fn parse_with_schema(mut data: &[u8], schema: &Schema) -> Result<AvroValue, String> {
from_avro_datum(schema, &mut data, None)
.map_err(|e| format!("Failed to parse Avro with schema: {}", e))
/// Strips the Confluent Schema Registry wire format prefix if present.
/// Wire format: [0x00] [4-byte big-endian schema ID] [avro payload]
fn strip_confluent_header(data: &[u8]) -> Option<&[u8]> {
if data.len() > 5 && data[0] == 0x00 {
Some(&data[5..])
} else {
None
}
}

/// Attempts to parse Avro data with the provided schema.
/// Tries raw data first; on failure, retries after stripping a Confluent
/// Schema Registry wire format prefix (magic byte 0x00 + 4-byte schema ID).
fn parse_with_schema(data: &[u8], schema: &Schema) -> Result<AvroValue, String> {
let mut cursor = data;
match from_avro_datum(schema, &mut cursor, None) {
Ok(value) => Ok(value),
Err(first_err) => {
if let Some(stripped) = strip_confluent_header(data) {
let mut cursor2 = stripped;
from_avro_datum(schema, &mut cursor2, None).map_err(|e| {
format!(
"Failed to parse Avro with schema (also tried stripping \
Confluent wire format prefix): {}",
e
)
})
} else {
Err(format!("Failed to parse Avro with schema: {}", first_err))
}
}
}
}

/// Attempts to detect and parse single-object encoded Avro
Expand Down Expand Up @@ -199,6 +228,18 @@ fn try_parse_container_file(data: &[u8]) -> Option<Result<AvroValue, String>> {
)
}

/// Unwraps a JSON-string-encoded schema value if needed.
///
/// The AIO runtime may deliver configuration values as JSON strings. When a user
/// sets `avroSchema` to a JSON object, the runtime can wrap it in an extra string
/// literal. This function detects that case and returns the inner string.
fn unwrap_schema_value(raw: &str) -> std::borrow::Cow<'_, str> {
match serde_json::from_str::<serde_json::Value>(raw) {
Ok(serde_json::Value::String(inner)) => std::borrow::Cow::Owned(inner),
_ => std::borrow::Cow::Borrowed(raw),
}
}

/// Initialize the operator with optional schema configuration
fn avro_init(configuration: ModuleConfiguration) -> bool {
logger::log(
Expand All @@ -221,7 +262,8 @@ fn avro_init(configuration: ModuleConfiguration) -> bool {
.iter()
.find(|(k, _)| k == "avroSchema")
.map(|(_, v)| {
Schema::parse_str(v).map_err(|e| {
let schema_str = unwrap_schema_value(v);
Schema::parse_str(&schema_str).map_err(|e| {
logger::log(
Level::Error,
"avro-to-json",
Expand Down Expand Up @@ -298,7 +340,7 @@ fn transform(input: DataModel) -> Result<DataModel, Error> {
// 4. If all strategies fail, return an error and log a warning

let avro_value = if let Some(schema) = AVRO_SCHEMA.get().and_then(|s| s.as_ref()) {
// Use configured schema
// Use configured schema (handles raw Avro and Confluent wire format)
match parse_with_schema(&payload, schema) {
Ok(value) => {
logger::log(
Expand Down Expand Up @@ -641,4 +683,138 @@ mod tests {
let result = avro_to_json(&AvroValue::BigDecimal(bd));
assert_eq!(result, json!("-42.5"));
}

// --- unwrap_schema_value tests ---

#[test]
fn unwrap_schema_value_raw_json_object() {
let raw = r#"{"type":"record","name":"Example","fields":[{"name":"id","type":"string"}]}"#;
let result = unwrap_schema_value(raw);
assert_eq!(result, raw);
}

#[test]
fn unwrap_schema_value_json_string_encoded() {
let inner = r#"{"type":"record","name":"Example","fields":[{"name":"id","type":"string"}]}"#;
let encoded = serde_json::to_string(inner).unwrap();
let result = unwrap_schema_value(&encoded);
assert_eq!(result, inner);
}

#[test]
fn unwrap_schema_value_primitive_schema() {
let raw = r#""string""#;
let result = unwrap_schema_value(raw);
assert_eq!(result, "string");
}

#[test]
fn unwrap_schema_value_non_json() {
let raw = "not valid json at all";
let result = unwrap_schema_value(raw);
assert_eq!(result, raw);
}

// --- Schema parsing with unwrap_schema_value ---

#[test]
fn schema_parse_raw_json_object() {
let raw = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#;
let schema_str = unwrap_schema_value(raw);
let schema = Schema::parse_str(&schema_str);
assert!(schema.is_ok(), "Raw JSON object schema should parse: {:?}", schema.err());
}

#[test]
fn schema_parse_json_string_encoded() {
let inner = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#;
let encoded = serde_json::to_string(inner).unwrap();
let schema_str = unwrap_schema_value(&encoded);
let schema = Schema::parse_str(&schema_str);
assert!(schema.is_ok(), "JSON-string-encoded schema should parse after unwrap: {:?}", schema.err());
}

#[test]
fn schema_parse_nested_record_raw() {
let raw = r#"{"type":"record","name":"Outer","namespace":"com.test","fields":[{"name":"Header","type":{"type":"record","name":"Header","namespace":"com.test.inner","fields":[{"name":"Source","type":"string"},{"name":"Timestamp","type":"long"}]}},{"name":"Payload","type":"string"}]}"#;
let schema_str = unwrap_schema_value(raw);
let schema = Schema::parse_str(&schema_str);
assert!(schema.is_ok(), "Nested record raw schema should parse: {:?}", schema.err());
}

#[test]
fn schema_parse_nested_record_json_string_encoded() {
let inner = r#"{"type":"record","name":"Outer","namespace":"com.test","fields":[{"name":"Header","type":{"type":"record","name":"Header","namespace":"com.test.inner","fields":[{"name":"Source","type":"string"},{"name":"Timestamp","type":"long"}]}},{"name":"Payload","type":"string"}]}"#;
let encoded = serde_json::to_string(inner).unwrap();

// Without unwrap, this would fail with InvalidSchemaName
let schema_str = unwrap_schema_value(&encoded);
let schema = Schema::parse_str(&schema_str);
assert!(schema.is_ok(), "Nested record JSON-string-encoded schema should parse after unwrap: {:?}", schema.err());
}

#[test]
fn schema_parse_json_string_encoded_fails_without_unwrap() {
let inner = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#;
let encoded = serde_json::to_string(inner).unwrap();

// Parsing the encoded string directly should fail (the original bug)
let result = Schema::parse_str(&encoded);
assert!(result.is_err(), "Parsing JSON-string-encoded schema without unwrap should fail");
}

// --- Confluent wire format tests ---

#[test]
fn strip_confluent_header_present() {
// Magic byte 0x00 + 4-byte schema ID + some payload
let data = [0x00, 0x00, 0x00, 0x00, 0x01, 0xAA, 0xBB];
let stripped = strip_confluent_header(&data);
assert_eq!(stripped, Some(&[0xAA, 0xBB][..]));
}

#[test]
fn strip_confluent_header_not_present() {
let data = [0x01, 0x00, 0x00, 0x00, 0x01, 0xAA];
assert!(strip_confluent_header(&data).is_none());
}

#[test]
fn strip_confluent_header_too_short() {
let data = [0x00, 0x01, 0x02];
assert!(strip_confluent_header(&data).is_none());
}

#[test]
fn parse_with_schema_confluent_prefix() {
// Schema with 5 string fields: the Confluent prefix [0x00 x4, 0x07] causes
// the 5th field to read 0x07 as a zigzag varint (-4), producing a negative
// string length that triggers the retry path
let schema_json = r#"{"type":"record","name":"Msg","fields":[
{"name":"a","type":"string"},{"name":"b","type":"string"},
{"name":"c","type":"string"},{"name":"d","type":"string"},
{"name":"e","type":"string"}
]}"#;
let schema = Schema::parse_str(schema_json).unwrap();

let mut record = apache_avro::types::Record::new(&schema).unwrap();
record.put("a", AvroValue::String("v1".to_string()));
record.put("b", AvroValue::String("v2".to_string()));
record.put("c", AvroValue::String("v3".to_string()));
record.put("d", AvroValue::String("v4".to_string()));
record.put("e", AvroValue::String("v5".to_string()));
let raw_avro = apache_avro::to_avro_datum(&schema, record).unwrap();

// Prepend Confluent wire format header (magic 0x00 + 4-byte schema ID)
let mut confluent_data = vec![0x00, 0x00, 0x00, 0x00, 0x07];
confluent_data.extend_from_slice(&raw_avro);

// parse_with_schema should succeed by stripping the prefix on retry
let result = parse_with_schema(&confluent_data, &schema);
assert!(result.is_ok(), "parse_with_schema should handle Confluent prefix: {:?}", result.err());

let json = avro_to_json(&result.unwrap());
assert_eq!(json["a"], "v1");
assert_eq!(json["e"], "v5");
}
}
Loading