Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Kurrent.Replicator.slnx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<Solution>
<Folder Name="/test/">
<Project Path="test\Kurrent.Replicator.Kafka.Tests\Kurrent.Replicator.Kafka.Tests.csproj" />
<Project Path="test\Kurrent.Replicator.Tests\Kurrent.Replicator.Tests.csproj" />
</Folder>
<Project Path="src\Kurrent.Replicator.EventStore\Kurrent.Replicator.EventStore.csproj" />
Expand Down
39 changes: 39 additions & 0 deletions src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System.Text;
using System.Text.Json;
using Kurrent.Replicator.Shared.Contracts;
using Kurrent.Replicator.Shared.Logging;

namespace Kurrent.Replicator.Kafka;

public static class KafkaHeadersBuilder {
static readonly ILog Log = LogProvider.GetCurrentClassLogger();

public static Headers? BuildHeaders(string contentType, byte[]? metadata) {
if (contentType != ContentTypes.Json || metadata is not { Length: > 0 })
return null;

try {
using var doc = JsonDocument.Parse(metadata);
var root = doc.RootElement;
if (root.ValueKind != JsonValueKind.Object)
return null;

var headers = new Headers();
foreach (var prop in root.EnumerateObject()) {
var valueBytes = prop.Value.ValueKind switch {
JsonValueKind.String => Encoding.UTF8.GetBytes(prop.Value.GetString() ?? string.Empty),
_ => Encoding.UTF8.GetBytes(prop.Value.GetRawText())
};
headers.Add(prop.Name, valueBytes);
}

return headers.Count > 0 ? headers : null;
} catch (JsonException e) {
Log.Warn("Malformed json, skipping metadata to header mapping", e);
return null;
}
}
}
8 changes: 6 additions & 2 deletions src/Kurrent.Replicator.Kafka/KafkaWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ async Task<long> Append(ProposedEvent p) {
]
);

// TODO: Map meta to headers, but only for JSON
var message = new Message<string, byte[]> {
Key = partitionKey,
Value = p.Data
};


// Map metadata to Kafka headers when the payload is JSON
var headers = KafkaHeadersBuilder.BuildHeaders(p.EventDetails.ContentType, p.Metadata);
if (headers is { Count: > 0 })
message.Headers = headers;

var result = await _producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false);

return result.Offset.Value;
Expand Down
74 changes: 74 additions & 0 deletions test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.Text.Json;
using Kurrent.Replicator.Shared.Contracts;
using NUnit.Framework;

namespace Kurrent.Replicator.Kafka.Tests;

[TestFixture]
public class KafkaHeadersBuilderMetadataHeaderTests {
[Test]
public void Should_map_top_level_json_properties_to_headers() {
var metadataObj = new {
s = "hello",
n = 123,
b = true,
obj = new { a = 1 },
arr = new[] { 1, 2 }
};

var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj);

var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, metadataBytes);

Assert.That(headers, Is.Not.Null);
Assert.That(headers!.Count, Is.EqualTo(5));

var sHeader = headers.First(h => h.Key == "s");
var nHeader = headers.First(h => h.Key == "n");
var bHeader = headers.First(h => h.Key == "b");
var objHeader = headers.First(h => h.Key == "obj");
var arrHeader = headers.First(h => h.Key == "arr");

Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray()));
Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray()));
Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray()));
Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray()));
Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray()));
}

[Test]
public void Should_ignore_when_content_type_not_json() {
var metadata = "{\"x\":1}"u8.ToArray();
var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Binary, metadata);
Assert.That(headers, Is.Null);
}

[Test]
public void Should_ignore_when_metadata_is_null_or_empty() {
var nullHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, null);
var emptyHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, Array.Empty<byte>());
Assert.That(nullHeaders, Is.Null);
Assert.That(emptyHeaders, Is.Null);
}

[Test]
public void Should_ignore_when_malformed_json() {
var bad = "{not json}"u8.ToArray();
var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, bad);
Assert.That(headers, Is.Null);
}

[Test]
public void Should_ignore_when_root_is_not_object() {
var arr = "[1,2,3]"u8.ToArray();
var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, arr);
Assert.That(headers, Is.Null);
}

[Test]
public void Should_ignore_when_object_has_no_properties() {
var emptyObj = "{}"u8.ToArray();
var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, emptyObj);
Assert.That(headers, Is.Null);
}
}
219 changes: 219 additions & 0 deletions test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
#nullable enable
using System.Reflection;
using System.Text.Json;
using Confluent.Kafka;
using Kurrent.Replicator.Shared.Observe;
using Kurrent.Replicator.Shared.Contracts;
using NUnit.Framework;
using Ubiquitous.Metrics;

namespace Kurrent.Replicator.Kafka.Tests;

[TestFixture]
public class KafkaWriterHeadersTests {
[OneTimeSetUp]
public void OneTimeSetup() {
// Configure metrics to avoid NREs in Metrics.Measure during tests
ReplicationMetrics.Configure(Metrics.CreateUsing());
}

[Test]
public async Task Should_map_metadata_to_headers_for_json_events() {
var writer = CreateWriterWithFakeProducer(out var producer);

var metadataObj = new {
s = "hello",
n = 123,
b = true,
obj = new { a = 1 },
arr = new[] { 1, 2 }
};

var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj);

var proposedEvent = new ProposedEvent(
new("category-stream", Guid.NewGuid(), "TestEvent", ContentTypes.Json),
"data"u8.ToArray(),
metadataBytes,
new(0L, 0UL),
0L
);

var result = await writer.WriteEvent(proposedEvent, CancellationToken.None);

Assert.That(result, Is.EqualTo(42));

Assert.That(producer.LastTopic, Is.EqualTo("category"));
Assert.That(producer.LastMessage, Is.Not.Null);

var headers = producer.LastMessage!.Headers;
Assert.That(headers, Is.Not.Null);
Assert.That(headers!.Count, Is.EqualTo(5));

var sHeader = headers.First(h => h.Key == "s");
var nHeader = headers.First(h => h.Key == "n");
var bHeader = headers.First(h => h.Key == "b");
var objHeader = headers.First(h => h.Key == "obj");
var arrHeader = headers.First(h => h.Key == "arr");

Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray()));
Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray()));
Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray()));
Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray()));
Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray()));
}

[Test]
public async Task Should_not_set_headers_for_non_json_content_type() {
var writer = CreateWriterWithFakeProducer(out var producer);

var metadata = "{\"x\":1}"u8.ToArray();

var proposedEvent = new ProposedEvent(
new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Binary),
"data"u8.ToArray(),
metadata,
new(0L, 0UL),
0L
);

await writer.WriteEvent(proposedEvent, CancellationToken.None);

Assert.That(producer.LastMessage, Is.Not.Null);
Assert.That(producer.LastMessage!.Headers, Is.Null);
}

[Test]
public async Task Should_not_set_headers_when_metadata_is_null_or_empty() {
var writer = CreateWriterWithFakeProducer(out var producer);

var withNullMetadata = new ProposedEvent(
new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json),
"data"u8.ToArray(),
null,
new(0L, 0UL),
0L
);

await writer.WriteEvent(withNullMetadata, CancellationToken.None);

Assert.That(producer.LastMessage, Is.Not.Null);
Assert.That(producer.LastMessage!.Headers, Is.Null);

var withEmptyMetadata = new ProposedEvent(
new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json),
"data"u8.ToArray(),
Array.Empty<byte>(),
new(0L, 0UL),
0L
);

await writer.WriteEvent(withEmptyMetadata, CancellationToken.None);

Assert.That(producer.LastMessage, Is.Not.Null);
Assert.That(producer.LastMessage!.Headers, Is.Null);
}

[Test]
public async Task Should_not_set_headers_when_metadata_is_malformed_json() {
var writer = CreateWriterWithFakeProducer(out var producer);

var badMetadata = "{not json}"u8.ToArray();

var proposedEvent = new ProposedEvent(
new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json),
"data"u8.ToArray(),
badMetadata,
new(0L, 0UL),
0L
);

await writer.WriteEvent(proposedEvent, CancellationToken.None);

Assert.That(producer.LastMessage, Is.Not.Null);
Assert.That(producer.LastMessage!.Headers, Is.Null);
}

static KafkaWriter CreateWriterWithFakeProducer(out CapturingProducer producer) {
var config = new ProducerConfig {
BootstrapServers = "dummy:9092"
};

var writer = new KafkaWriter(config, null);

var field = typeof(KafkaWriter).GetField("_producer", BindingFlags.Instance | BindingFlags.NonPublic);
Assert.That(field, Is.Not.Null);

producer = new();
field!.SetValue(writer, producer);

return writer;
}

class CapturingProducer : IProducer<string, byte[]> {
public string? LastTopic { get; private set; }
public Message<string, byte[]>? LastMessage { get; private set; }

public string Name => "capturing-producer";

public Handle Handle => throw new NotSupportedException();

public Task<DeliveryResult<string, byte[]>> ProduceAsync(string topic, Message<string, byte[]> message, CancellationToken cancellationToken = default) {
LastTopic = topic;
LastMessage = message;

var result = new DeliveryResult<string, byte[]> {
Topic = topic,
Message = message,
Offset = new(42),
Partition = new(0),
Status = PersistenceStatus.Persisted
};

return Task.FromResult(result);
}

public Task<DeliveryResult<string, byte[]>> ProduceAsync(TopicPartition topicPartition, Message<string, byte[]> message, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();

public void Produce(string topic, Message<string, byte[]> message, Action<DeliveryReport<string, byte[]>>? deliveryHandler = null)
=> throw new NotSupportedException();

public void Produce(TopicPartition topicPartition, Message<string, byte[]> message, Action<DeliveryReport<string, byte[]>>? deliveryHandler = null)
=> throw new NotSupportedException();

public int Flush(TimeSpan timeout) => 0;

public void Flush(CancellationToken cancellationToken) { }

public int Poll(TimeSpan timeout) => 0;

public int AddBrokers(string brokers) => 0;

public Metadata GetMetadata(string topic, TimeSpan timeout) => throw new NotSupportedException();

public Metadata GetMetadata(TimeSpan timeout) => throw new NotSupportedException();

public void InitTransactions(TimeSpan timeout) => throw new NotSupportedException();

public void BeginTransaction() => throw new NotSupportedException();

public void CommitTransaction(TimeSpan timeout) => throw new NotSupportedException();

public void CommitTransaction() => throw new NotSupportedException();

public void AbortTransaction(TimeSpan timeout) => throw new NotSupportedException();

public void AbortTransaction() => throw new NotSupportedException();

public void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout)
=> throw new NotSupportedException();

public void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();

public void SetSaslCredentials(string username, string password) => throw new NotSupportedException();

public void Dispose() { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<RepoRoot>$([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.FullName)</RepoRoot>
<TargetFramework>net9.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsPackable>false</IsPackable>
<ImplicitUsings>enable</ImplicitUsings>
<IsTestProject>true</IsTestProject>
<UseMicrosoftTestingPlatformRunner>true</UseMicrosoftTestingPlatformRunner>
<EnableMSTestRunner>true</EnableMSTestRunner>
<TestingPlatformDotnetTestSupport>true</TestingPlatformDotnetTestSupport>
<TestingPlatformCommandLineArguments>--report-trx --results-directory $(RepoRoot)/test-results/$(TargetFramework)</TestingPlatformCommandLineArguments>
<TestingPlatformCaptureOutput>false</TestingPlatformCaptureOutput>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.2"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
<PackageReference Include="NUnit" Version="4.2.2"/>
<PackageReference Include="NUnit.Analyzers" Version="4.4.0"/>
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../../src/Kurrent.Replicator.Kafka/Kurrent.Replicator.Kafka.csproj" />
<ProjectReference Include="../../src/Kurrent.Replicator.Shared/Kurrent.Replicator.Shared.csproj" />
</ItemGroup>

</Project>