Skip to content

fix(application): avro-to-json handle JSON-string-encoded schema and Confluent wire format prefix#336

Open
katriendg wants to merge 2 commits intomainfrom
fix/335-avro-encoding
Open

fix(application): avro-to-json handle JSON-string-encoded schema and Confluent wire format prefix#336
katriendg wants to merge 2 commits intomainfrom
fix/335-avro-encoding

Conversation

@katriendg
Copy link
Copy Markdown
Collaborator

@katriendg katriendg commented Apr 2, 2026

Description

The avro-to-json operator had two encoding-related failures preventing it from processing Avro messages in production.

JSON-string-encoded schema configuration

The operator failed to initialize when the AIO pipeline runtime delivered the avroSchema configuration value as a JSON-string-wrapped object rather than a raw JSON object. The operator logged Failed to parse provided Avro schema: Invalid schema name and returned false from avro_init, preventing the dataflow graph from loading.

The root cause: Schema::parse_str received the entire schema JSON wrapped in an outer string literal (double-encoded). The Avro parser deserialized the outer string and then attempted to match the full JSON text as a type name, which failed the Avro name-validation regex.

A new unwrap_schema_value function detects and transparently unwraps JSON-string-encoded configuration values before passing them to the schema parser.

Confluent Schema Registry wire format prefix

After the schema initialization fix, the operator failed at runtime with Cannot convert i64 to usize: -56 when parsing Avro messages from Kafka. The messages included a 5-byte Confluent Schema Registry wire format prefix (magic byte 0x00 + 4-byte schema ID) before the actual Avro payload. The Avro parser interpreted the prefix bytes as part of the record data, producing a negative zigzag-decoded length.

The root cause: Kafka producers using Confluent serializers prepend a 5-byte header that is not part of the Avro encoding. The from_avro_datum call consumed these extra bytes, corrupting the field-length parsing.

A new strip_confluent_header function detects the prefix, and parse_with_schema now retries parsing after stripping the prefix when the initial attempt fails.

Related Issue

Fixes #335

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Blueprint modification or addition
  • Component modification or addition
  • Documentation update
  • CI/CD pipeline change
  • Other (please describe):

Implementation Details

All changes are in src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs.

Schema configuration unwrapping

Extracted unwrap_schema_value(raw: &str) -> Cow<'_, str> which:

  • Parses the raw configuration value using serde_json::from_str::<serde_json::Value>
  • Returns the inner string content when the result is Value::String (JSON-string-encoded schema)
  • Falls through to the raw value unchanged for any other case (raw JSON objects, non-JSON values)
  • Uses Cow<'_, str> to avoid heap allocation when no unwrapping is needed

The avro_init function calls unwrap_schema_value(v) before Schema::parse_str().

Confluent wire format handling

Added strip_confluent_header(data: &[u8]) -> Option<&[u8]> which detects the Confluent Schema Registry wire format prefix (magic byte 0x00 + 4-byte big-endian schema ID) and returns the payload after the 5-byte header.

Rewrote parse_with_schema to attempt raw Avro parsing first. On failure, if a Confluent header is detected, it retries parsing with the prefix stripped. This handles both raw Avro and Confluent-prefixed messages transparently.

Testing Performed

  • Terraform plan/apply
  • Blueprint deployment test
  • Unit tests
  • Integration tests
  • Bug fix includes regression test (see Test Policy)
  • Manual validation
  • Other:

Thirteen tests were added across both fixes:

  • unwrap_schema_value unit tests (4): raw JSON object passthrough, JSON-string-encoded unwrap, primitive schema unwrap, non-JSON passthrough
  • Schema parsing integration tests (4): raw and encoded simple records, raw and encoded nested records with namespaces
  • Regression test (1): schema_parse_json_string_encoded_fails_without_unwrap confirms Schema::parse_str fails on double-encoded input without the fix
  • Confluent wire format tests (4): header detection when present, absent, and too-short payloads; end-to-end parsing of a Confluent-prefixed multi-field record with automatic prefix stripping and retry

All 41 tests pass (28 existing + 13 new) via cargo test --target x86_64-unknown-linux-gnu.

Validation Steps

  1. Apply the changes to src/500-application/512-avro-to-json/operators/avro-to-json/src/lib.rs
  2. Run cd src/500-application/512-avro-to-json/operators/avro-to-json && cargo test --target x86_64-unknown-linux-gnu
  3. Verify all 41 tests pass with zero warnings
  4. Run cargo build --release to confirm the WASM binary compiles cleanly
  5. Optionally deploy the operator with Confluent-serialized Kafka messages and a JSON-string-encoded avroSchema configuration to confirm end-to-end processing succeeds

Checklist

  • I have updated the documentation accordingly
  • I have added tests to cover my changes
  • All new and existing tests passed
  • I have run terraform fmt on all Terraform code
  • I have run terraform validate on all Terraform code
  • I have run az bicep format on all Bicep code
  • I have run az bicep build to validate all Bicep code
  • I have checked for any sensitive data/tokens that should not be committed
  • Lint checks pass (run applicable linters for changed file types)

Security Review

  • No credentials, secrets, or tokens are hardcoded or logged
  • RBAC and identity changes follow least-privilege principles
  • No new network exposure or public endpoints introduced without justification
  • Dependency additions or updates have been reviewed for known vulnerabilities
  • Container image changes use pinned digests or SHA references

Additional Notes

  • No new dependencies were added; serde_json was already present in Cargo.toml
  • Both fixes are backward-compatible: raw JSON schemas and non-prefixed Avro payloads continue to work unchanged
  • The Confluent prefix retry is transparent: raw Avro data that parses successfully on the first attempt never triggers the fallback path
  • No Terraform, Bicep, or infrastructure changes are included in this PR

…coded schemas

- implement unwrap_schema_value to handle JSON string wrapping
- add tests for unwrap_schema_value covering various cases
- ensure schema parsing works correctly with unwrapped values
… in Avro parser

- add strip_confluent_header to detect and remove 5-byte Confluent prefix
- retry Avro parsing after stripping prefix when initial parse fails
- add tests for Confluent header stripping and prefixed record parsing

🔧 - Generated by Copilot
@katriendg katriendg changed the title fix(application): add unwrap_schema_value function for JSON-string-en… fix(application): avro-to-json handle JSON-string-encoded schema and Confluent wire format prefix Apr 2, 2026
@katriendg katriendg marked this pull request as ready for review April 2, 2026 13:02
@katriendg katriendg requested a review from a team as a code owner April 2, 2026 13:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix: avro-to-json operator fails to start when avroSchema property is JSON-string-encoded

2 participants