From 2ec455041a57a8135a0eb19e6e88cc93e4b86ed5 Mon Sep 17 00:00:00 2001 From: Valerio Balsamo Date: Mon, 8 Dec 2025 16:32:36 +0000 Subject: [PATCH] Map JSON metadata to Kafka message headers, add unit tests --- Kurrent.Replicator.slnx | 1 + .../KafkaHeadersBuilder.cs | 39 ++++ src/Kurrent.Replicator.Kafka/KafkaWriter.cs | 8 +- .../KafkaHeaderBuilderTests.cs | 74 ++++++ .../KafkaWriterTests.cs | 219 ++++++++++++++++++ .../Kurrent.Replicator.Kafka.Tests.csproj | 30 +++ 6 files changed, 369 insertions(+), 2 deletions(-) create mode 100644 src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj diff --git a/Kurrent.Replicator.slnx b/Kurrent.Replicator.slnx index 145ea9c5..a1643eb6 100644 --- a/Kurrent.Replicator.slnx +++ b/Kurrent.Replicator.slnx @@ -1,5 +1,6 @@ + diff --git a/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs b/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs new file mode 100644 index 00000000..af284bc9 --- /dev/null +++ b/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs @@ -0,0 +1,39 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Text; +using System.Text.Json; +using Kurrent.Replicator.Shared.Contracts; +using Kurrent.Replicator.Shared.Logging; + +namespace Kurrent.Replicator.Kafka; + +public static class KafkaHeadersBuilder { + static readonly ILog Log = LogProvider.GetCurrentClassLogger(); + + public static Headers? BuildHeaders(string contentType, byte[]? metadata) { + if (contentType != ContentTypes.Json || metadata is not { Length: > 0 }) + return null; + + try { + using var doc = JsonDocument.Parse(metadata); + var root = doc.RootElement; + if (root.ValueKind != JsonValueKind.Object) + return null; + + var headers = new Headers(); + foreach (var prop in root.EnumerateObject()) { + var valueBytes = prop.Value.ValueKind switch { + JsonValueKind.String => Encoding.UTF8.GetBytes(prop.Value.GetString() ?? string.Empty), + _ => Encoding.UTF8.GetBytes(prop.Value.GetRawText()) + }; + headers.Add(prop.Name, valueBytes); + } + + return headers.Count > 0 ? headers : null; + } catch (JsonException e) { + Log.Warn("Malformed json, skipping metadata to header mapping", e); + return null; + } + } +} diff --git a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs index a60426a7..592ac936 100644 --- a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs +++ b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs @@ -50,12 +50,16 @@ async Task Append(ProposedEvent p) { ] ); - // TODO: Map meta to headers, but only for JSON var message = new Message { Key = partitionKey, Value = p.Data }; - + + // Map metadata to Kafka headers when the payload is JSON + var headers = KafkaHeadersBuilder.BuildHeaders(p.EventDetails.ContentType, p.Metadata); + if (headers is { Count: > 0 }) + message.Headers = headers; + var result = await _producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); return result.Offset.Value; diff --git a/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs b/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs new file mode 100644 index 00000000..1da997f5 --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs @@ -0,0 +1,74 @@ +using System.Text.Json; +using Kurrent.Replicator.Shared.Contracts; +using NUnit.Framework; + +namespace Kurrent.Replicator.Kafka.Tests; + +[TestFixture] +public class KafkaHeadersBuilderMetadataHeaderTests { + [Test] + public void Should_map_top_level_json_properties_to_headers() { + var metadataObj = new { + s = "hello", + n = 123, + b = true, + obj = new { a = 1 }, + arr = new[] { 1, 2 } + }; + + var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj); + + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, metadataBytes); + + Assert.That(headers, Is.Not.Null); + Assert.That(headers!.Count, Is.EqualTo(5)); + + var sHeader = headers.First(h => h.Key == "s"); + var nHeader = headers.First(h => h.Key == "n"); + var bHeader = headers.First(h => h.Key == "b"); + var objHeader = headers.First(h => h.Key == "obj"); + var arrHeader = headers.First(h => h.Key == "arr"); + + Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray())); + Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray())); + Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray())); + Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray())); + Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray())); + } + + [Test] + public void Should_ignore_when_content_type_not_json() { + var metadata = "{\"x\":1}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Binary, metadata); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_metadata_is_null_or_empty() { + var nullHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, null); + var emptyHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, Array.Empty()); + Assert.That(nullHeaders, Is.Null); + Assert.That(emptyHeaders, Is.Null); + } + + [Test] + public void Should_ignore_when_malformed_json() { + var bad = "{not json}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, bad); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_root_is_not_object() { + var arr = "[1,2,3]"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, arr); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_object_has_no_properties() { + var emptyObj = "{}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, emptyObj); + Assert.That(headers, Is.Null); + } +} diff --git a/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs new file mode 100644 index 00000000..586d846b --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs @@ -0,0 +1,219 @@ +#nullable enable +using System.Reflection; +using System.Text.Json; +using Confluent.Kafka; +using Kurrent.Replicator.Shared.Observe; +using Kurrent.Replicator.Shared.Contracts; +using NUnit.Framework; +using Ubiquitous.Metrics; + +namespace Kurrent.Replicator.Kafka.Tests; + +[TestFixture] +public class KafkaWriterHeadersTests { + [OneTimeSetUp] + public void OneTimeSetup() { + // Configure metrics to avoid NREs in Metrics.Measure during tests + ReplicationMetrics.Configure(Metrics.CreateUsing()); + } + + [Test] + public async Task Should_map_metadata_to_headers_for_json_events() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var metadataObj = new { + s = "hello", + n = 123, + b = true, + obj = new { a = 1 }, + arr = new[] { 1, 2 } + }; + + var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj); + + var proposedEvent = new ProposedEvent( + new("category-stream", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + metadataBytes, + new(0L, 0UL), + 0L + ); + + var result = await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(result, Is.EqualTo(42)); + + Assert.That(producer.LastTopic, Is.EqualTo("category")); + Assert.That(producer.LastMessage, Is.Not.Null); + + var headers = producer.LastMessage!.Headers; + Assert.That(headers, Is.Not.Null); + Assert.That(headers!.Count, Is.EqualTo(5)); + + var sHeader = headers.First(h => h.Key == "s"); + var nHeader = headers.First(h => h.Key == "n"); + var bHeader = headers.First(h => h.Key == "b"); + var objHeader = headers.First(h => h.Key == "obj"); + var arrHeader = headers.First(h => h.Key == "arr"); + + Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray())); + Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray())); + Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray())); + Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray())); + Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray())); + } + + [Test] + public async Task Should_not_set_headers_for_non_json_content_type() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var metadata = "{\"x\":1}"u8.ToArray(); + + var proposedEvent = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Binary), + "data"u8.ToArray(), + metadata, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + [Test] + public async Task Should_not_set_headers_when_metadata_is_null_or_empty() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var withNullMetadata = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + null, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(withNullMetadata, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + + var withEmptyMetadata = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + Array.Empty(), + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(withEmptyMetadata, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + [Test] + public async Task Should_not_set_headers_when_metadata_is_malformed_json() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var badMetadata = "{not json}"u8.ToArray(); + + var proposedEvent = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + badMetadata, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + static KafkaWriter CreateWriterWithFakeProducer(out CapturingProducer producer) { + var config = new ProducerConfig { + BootstrapServers = "dummy:9092" + }; + + var writer = new KafkaWriter(config, null); + + var field = typeof(KafkaWriter).GetField("_producer", BindingFlags.Instance | BindingFlags.NonPublic); + Assert.That(field, Is.Not.Null); + + producer = new(); + field!.SetValue(writer, producer); + + return writer; + } + + class CapturingProducer : IProducer { + public string? LastTopic { get; private set; } + public Message? LastMessage { get; private set; } + + public string Name => "capturing-producer"; + + public Handle Handle => throw new NotSupportedException(); + + public Task> ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) { + LastTopic = topic; + LastMessage = message; + + var result = new DeliveryResult { + Topic = topic, + Message = message, + Offset = new(42), + Partition = new(0), + Status = PersistenceStatus.Persisted + }; + + return Task.FromResult(result); + } + + public Task> ProduceAsync(TopicPartition topicPartition, Message message, CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public void Produce(string topic, Message message, Action>? deliveryHandler = null) + => throw new NotSupportedException(); + + public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) + => throw new NotSupportedException(); + + public int Flush(TimeSpan timeout) => 0; + + public void Flush(CancellationToken cancellationToken) { } + + public int Poll(TimeSpan timeout) => 0; + + public int AddBrokers(string brokers) => 0; + + public Metadata GetMetadata(string topic, TimeSpan timeout) => throw new NotSupportedException(); + + public Metadata GetMetadata(TimeSpan timeout) => throw new NotSupportedException(); + + public void InitTransactions(TimeSpan timeout) => throw new NotSupportedException(); + + public void BeginTransaction() => throw new NotSupportedException(); + + public void CommitTransaction(TimeSpan timeout) => throw new NotSupportedException(); + + public void CommitTransaction() => throw new NotSupportedException(); + + public void AbortTransaction(TimeSpan timeout) => throw new NotSupportedException(); + + public void AbortTransaction() => throw new NotSupportedException(); + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) + => throw new NotSupportedException(); + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public void SetSaslCredentials(string username, string password) => throw new NotSupportedException(); + + public void Dispose() { } + } +} diff --git a/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj b/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj new file mode 100644 index 00000000..6dc0d36f --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj @@ -0,0 +1,30 @@ + + + + $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.FullName) + net9.0 + latest + false + enable + true + true + true + true + --report-trx --results-directory $(RepoRoot)/test-results/$(TargetFramework) + false + + + + + + + + + + + + + + + +