From f0b918fe070ed7213c6d38e7f43a43fb99f915a0 Mon Sep 17 00:00:00 2001 From: katriendg Date: Thu, 2 Apr 2026 10:29:26 +0200 Subject: [PATCH 1/2] fix(application): add unwrap_schema_value function for JSON-string-encoded schemas - implement unwrap_schema_value to handle JSON string wrapping - add tests for unwrap_schema_value covering various cases - ensure schema parsing works correctly with unwrapped values --- .../operators/avro-to-json/src/lib.rs | 94 ++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs b/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs index d3baea3e..3f9f2c77 100644 --- a/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs +++ b/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs @@ -199,6 +199,18 @@ fn try_parse_container_file(data: &[u8]) -> Option> { ) } +/// Unwraps a JSON-string-encoded schema value if needed. +/// +/// The AIO runtime may deliver configuration values as JSON strings. When a user +/// sets `avroSchema` to a JSON object, the runtime can wrap it in an extra string +/// literal. This function detects that case and returns the inner string. +fn unwrap_schema_value(raw: &str) -> std::borrow::Cow<'_, str> { + match serde_json::from_str::(raw) { + Ok(serde_json::Value::String(inner)) => std::borrow::Cow::Owned(inner), + _ => std::borrow::Cow::Borrowed(raw), + } +} + /// Initialize the operator with optional schema configuration fn avro_init(configuration: ModuleConfiguration) -> bool { logger::log( @@ -221,7 +233,8 @@ fn avro_init(configuration: ModuleConfiguration) -> bool { .iter() .find(|(k, _)| k == "avroSchema") .map(|(_, v)| { - Schema::parse_str(v).map_err(|e| { + let schema_str = unwrap_schema_value(v); + Schema::parse_str(&schema_str).map_err(|e| { logger::log( Level::Error, "avro-to-json", @@ -641,4 +654,83 @@ mod tests { let result = avro_to_json(&AvroValue::BigDecimal(bd)); assert_eq!(result, json!("-42.5")); } + + // --- unwrap_schema_value tests --- + + #[test] + fn unwrap_schema_value_raw_json_object() { + let raw = r#"{"type":"record","name":"Example","fields":[{"name":"id","type":"string"}]}"#; + let result = unwrap_schema_value(raw); + assert_eq!(result, raw); + } + + #[test] + fn unwrap_schema_value_json_string_encoded() { + let inner = r#"{"type":"record","name":"Example","fields":[{"name":"id","type":"string"}]}"#; + let encoded = serde_json::to_string(inner).unwrap(); + let result = unwrap_schema_value(&encoded); + assert_eq!(result, inner); + } + + #[test] + fn unwrap_schema_value_primitive_schema() { + let raw = r#""string""#; + let result = unwrap_schema_value(raw); + assert_eq!(result, "string"); + } + + #[test] + fn unwrap_schema_value_non_json() { + let raw = "not valid json at all"; + let result = unwrap_schema_value(raw); + assert_eq!(result, raw); + } + + // --- Schema parsing with unwrap_schema_value --- + + #[test] + fn schema_parse_raw_json_object() { + let raw = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#; + let schema_str = unwrap_schema_value(raw); + let schema = Schema::parse_str(&schema_str); + assert!(schema.is_ok(), "Raw JSON object schema should parse: {:?}", schema.err()); + } + + #[test] + fn schema_parse_json_string_encoded() { + let inner = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#; + let encoded = serde_json::to_string(inner).unwrap(); + let schema_str = unwrap_schema_value(&encoded); + let schema = Schema::parse_str(&schema_str); + assert!(schema.is_ok(), "JSON-string-encoded schema should parse after unwrap: {:?}", schema.err()); + } + + #[test] + fn schema_parse_nested_record_raw() { + let raw = r#"{"type":"record","name":"Outer","namespace":"com.test","fields":[{"name":"Header","type":{"type":"record","name":"Header","namespace":"com.test.inner","fields":[{"name":"Source","type":"string"},{"name":"Timestamp","type":"long"}]}},{"name":"Payload","type":"string"}]}"#; + let schema_str = unwrap_schema_value(raw); + let schema = Schema::parse_str(&schema_str); + assert!(schema.is_ok(), "Nested record raw schema should parse: {:?}", schema.err()); + } + + #[test] + fn schema_parse_nested_record_json_string_encoded() { + let inner = r#"{"type":"record","name":"Outer","namespace":"com.test","fields":[{"name":"Header","type":{"type":"record","name":"Header","namespace":"com.test.inner","fields":[{"name":"Source","type":"string"},{"name":"Timestamp","type":"long"}]}},{"name":"Payload","type":"string"}]}"#; + let encoded = serde_json::to_string(inner).unwrap(); + + // Without unwrap, this would fail with InvalidSchemaName + let schema_str = unwrap_schema_value(&encoded); + let schema = Schema::parse_str(&schema_str); + assert!(schema.is_ok(), "Nested record JSON-string-encoded schema should parse after unwrap: {:?}", schema.err()); + } + + #[test] + fn schema_parse_json_string_encoded_fails_without_unwrap() { + let inner = r#"{"type":"record","name":"TestRecord","namespace":"com.test","fields":[{"name":"value","type":"string"}]}"#; + let encoded = serde_json::to_string(inner).unwrap(); + + // Parsing the encoded string directly should fail (the original bug) + let result = Schema::parse_str(&encoded); + assert!(result.is_err(), "Parsing JSON-string-encoded schema without unwrap should fail"); + } } From d3499885123c960a9d96342e917cae4a39eeddf3 Mon Sep 17 00:00:00 2001 From: katriendg Date: Thu, 2 Apr 2026 14:32:36 +0200 Subject: [PATCH 2/2] fix(application): handle Confluent Schema Registry wire format prefix in Avro parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add strip_confluent_header to detect and remove 5-byte Confluent prefix - retry Avro parsing after stripping prefix when initial parse fails - add tests for Confluent header stripping and prefixed record parsing 🔧 - Generated by Copilot --- Cargo.lock | 3 + .../operators/avro-to-json/src/lib.rs | 94 ++++++++++++++++++- 2 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 Cargo.lock diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..8e157bcd --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,3 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 diff --git a/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs b/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs index 3f9f2c77..18f8d703 100644 --- a/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs +++ b/src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs @@ -5,6 +5,7 @@ //! - Detection of single-object encoding with embedded schema //! (schema lookup is not implemented; decoding is not supported) //! - Schema-based decoding using a provided Avro schema +//! - Confluent Schema Registry wire format (magic byte 0x00 + 4-byte schema ID prefix) //! //! This operator is designed to be generic and reusable across different Avro schemas. @@ -142,10 +143,38 @@ fn base64_encode(bytes: &[u8]) -> String { result } -/// Attempts to parse Avro data with the provided schema -fn parse_with_schema(mut data: &[u8], schema: &Schema) -> Result { - from_avro_datum(schema, &mut data, None) - .map_err(|e| format!("Failed to parse Avro with schema: {}", e)) +/// Strips the Confluent Schema Registry wire format prefix if present. +/// Wire format: [0x00] [4-byte big-endian schema ID] [avro payload] +fn strip_confluent_header(data: &[u8]) -> Option<&[u8]> { + if data.len() > 5 && data[0] == 0x00 { + Some(&data[5..]) + } else { + None + } +} + +/// Attempts to parse Avro data with the provided schema. +/// Tries raw data first; on failure, retries after stripping a Confluent +/// Schema Registry wire format prefix (magic byte 0x00 + 4-byte schema ID). +fn parse_with_schema(data: &[u8], schema: &Schema) -> Result { + let mut cursor = data; + match from_avro_datum(schema, &mut cursor, None) { + Ok(value) => Ok(value), + Err(first_err) => { + if let Some(stripped) = strip_confluent_header(data) { + let mut cursor2 = stripped; + from_avro_datum(schema, &mut cursor2, None).map_err(|e| { + format!( + "Failed to parse Avro with schema (also tried stripping \ + Confluent wire format prefix): {}", + e + ) + }) + } else { + Err(format!("Failed to parse Avro with schema: {}", first_err)) + } + } + } } /// Attempts to detect and parse single-object encoded Avro @@ -311,7 +340,7 @@ fn transform(input: DataModel) -> Result { // 4. If all strategies fail, return an error and log a warning let avro_value = if let Some(schema) = AVRO_SCHEMA.get().and_then(|s| s.as_ref()) { - // Use configured schema + // Use configured schema (handles raw Avro and Confluent wire format) match parse_with_schema(&payload, schema) { Ok(value) => { logger::log( @@ -733,4 +762,59 @@ mod tests { let result = Schema::parse_str(&encoded); assert!(result.is_err(), "Parsing JSON-string-encoded schema without unwrap should fail"); } + + // --- Confluent wire format tests --- + + #[test] + fn strip_confluent_header_present() { + // Magic byte 0x00 + 4-byte schema ID + some payload + let data = [0x00, 0x00, 0x00, 0x00, 0x01, 0xAA, 0xBB]; + let stripped = strip_confluent_header(&data); + assert_eq!(stripped, Some(&[0xAA, 0xBB][..])); + } + + #[test] + fn strip_confluent_header_not_present() { + let data = [0x01, 0x00, 0x00, 0x00, 0x01, 0xAA]; + assert!(strip_confluent_header(&data).is_none()); + } + + #[test] + fn strip_confluent_header_too_short() { + let data = [0x00, 0x01, 0x02]; + assert!(strip_confluent_header(&data).is_none()); + } + + #[test] + fn parse_with_schema_confluent_prefix() { + // Schema with 5 string fields: the Confluent prefix [0x00 x4, 0x07] causes + // the 5th field to read 0x07 as a zigzag varint (-4), producing a negative + // string length that triggers the retry path + let schema_json = r#"{"type":"record","name":"Msg","fields":[ + {"name":"a","type":"string"},{"name":"b","type":"string"}, + {"name":"c","type":"string"},{"name":"d","type":"string"}, + {"name":"e","type":"string"} + ]}"#; + let schema = Schema::parse_str(schema_json).unwrap(); + + let mut record = apache_avro::types::Record::new(&schema).unwrap(); + record.put("a", AvroValue::String("v1".to_string())); + record.put("b", AvroValue::String("v2".to_string())); + record.put("c", AvroValue::String("v3".to_string())); + record.put("d", AvroValue::String("v4".to_string())); + record.put("e", AvroValue::String("v5".to_string())); + let raw_avro = apache_avro::to_avro_datum(&schema, record).unwrap(); + + // Prepend Confluent wire format header (magic 0x00 + 4-byte schema ID) + let mut confluent_data = vec![0x00, 0x00, 0x00, 0x00, 0x07]; + confluent_data.extend_from_slice(&raw_avro); + + // parse_with_schema should succeed by stripping the prefix on retry + let result = parse_with_schema(&confluent_data, &schema); + assert!(result.is_ok(), "parse_with_schema should handle Confluent prefix: {:?}", result.err()); + + let json = avro_to_json(&result.unwrap()); + assert_eq!(json["a"], "v1"); + assert_eq!(json["e"], "v5"); + } }