From d97fe01966ccb4f432a7a7df14d208e5379ac4e4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 10:04:35 +0000 Subject: [PATCH 1/3] Initial plan From 020944b81d43dfd83f25a4811abc7ead11e55325 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 10:11:41 +0000 Subject: [PATCH 2/3] Fix AddCompositionEventHandler to properly invoke handler factory function Co-authored-by: alexeyzimarev <2821205+alexeyzimarev@users.noreply.github.com> --- .../Registrations/SubscriptionBuilder.cs | 2 +- .../CompositionHandlerTests.cs | 124 ++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs diff --git a/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs b/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs index edefe999..57dc4c4a 100644 --- a/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs +++ b/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs @@ -76,7 +76,7 @@ public SubscriptionBuilder AddCompositionEventHandler getInnerHandler, Func getWrappingHandler ) where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler { - Services.TryAddKeyedSingleton(SubscriptionId, getInnerHandler); + Services.TryAddKeyedSingleton(SubscriptionId, (sp, _) => getInnerHandler(sp)); AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService(SubscriptionId))); return this; diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs new file mode 100644 index 00000000..b8cb03df --- /dev/null +++ b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs @@ -0,0 +1,124 @@ +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Context; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; + +namespace Eventuous.Tests.Subscriptions; + +public class CompositionHandlerTests { + TestServer _server = null!; + IHost _host = null!; + + [Before(Test)] + public async Task Setup() { + _host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .UseStartup() + ) + .Build(); + await _host.StartAsync(); + + _server = _host.GetTestServer(); + } + + [After(Test)] + public async Task Teardown() { + _server.Dispose(); + await _host.StopAsync(); + _host.Dispose(); + } + + [Test] + public void ShouldResolveCompositionHandlerWithFactory() { + // This test validates that AddCompositionEventHandler correctly registers + // handlers when using a factory function + var handler = _server.Services.GetRequiredKeyedService("sub-with-factory"); + handler.ShouldNotBeNull(); + handler.Dependency.ShouldNotBeNull(); + } + + [Test] + public async Task ShouldHandleEventWithCompositionHandler() { + var logger = _server.Services.GetRequiredService(); + var subs = _server.Services.GetServices().ToArray(); + var sub = subs.FirstOrDefault(x => x.SubscriptionId == "sub-with-factory"); + + sub.ShouldNotBeNull(); + + var ctx = new MessageConsumeContext( + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + 0, + 0, + 0, + 0, + DateTime.UtcNow, + new TestEvent(), + new(), + sub.SubscriptionId, + default + ) { LogContext = new(sub.SubscriptionId, NullLoggerFactory.Instance) }; + + await sub.Pipe.Send(ctx); + + var handled = logger.Records.Where(x => x.Context.SubscriptionId == sub.SubscriptionId).ToArray(); + handled.Length.ShouldBe(1); + handled[0].HandlerType.ShouldBe(typeof(CompositionWrapper)); + handled[0].Context.MessageId.ShouldBe(ctx.MessageId); + } + + class Startup { + public static void ConfigureServices(IServiceCollection services) { + services.AddSingleton(new TestHandlerLogger()); + services.AddSingleton(); + + // Test the AddCompositionEventHandler with a factory function + services.AddSubscription( + "sub-with-factory", + builder => builder.AddCompositionEventHandler( + sp => new TestHandler(sp.GetRequiredService(), sp.GetRequiredService()), + handler => new CompositionWrapper(handler, sp => sp.GetRequiredService()) + ) + ); + } + + public void Configure(IApplicationBuilder app) { } + } + + record TestOptions : SubscriptionOptions; + + class TestSub(TestOptions options, ConsumePipe consumePipe) + : EventSubscription(options, consumePipe, NullLoggerFactory.Instance, null) { + protected override ValueTask Subscribe(CancellationToken cancellationToken) => default; + + protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => default; + } + + public class TestDependency { + public string Value { get; } = "test-value"; + } + + public class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { + public TestDependency Dependency { get; } = dependency; + + public override ValueTask HandleEvent(IMessageConsumeContext ctx) + => logger.EventReceived(GetType(), ctx); + } + + public class CompositionWrapper(IEventHandler innerHandler, Func getLogger) : BaseEventHandler { + public override ValueTask HandleEvent(IMessageConsumeContext ctx) { + // Wrap the inner handler call - this simulates what PollyEventHandler does + return getLogger(ctx.Services).EventReceived(GetType(), ctx); + } + } + + record TestEvent; +} From 6295dc54f1d4fad5adf7c7eae92a1294b1c2748a Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 14 Nov 2025 11:59:15 +0100 Subject: [PATCH 3/3] Cleanup --- .../Producers/ServiceBusMessageBuilder.cs | 48 ++++++--------- .../Shared/ServiceBusHelper.cs | 51 ++++++++-------- .../IsSerialisableByServiceBus.cs | 35 ++++++----- .../Registrations/SubscriptionBuilder.cs | 58 ++++++++++++++++++- .../CompositionHandlerTests.cs | 24 ++++---- .../Eventuous.EventStore/EsdbEventStore.cs | 3 +- 6 files changed, 129 insertions(+), 90 deletions(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs index 42078a73..171d8ba5 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs @@ -8,27 +8,13 @@ namespace Eventuous.Azure.ServiceBus.Producers; using Shared; -class ServiceBusMessageBuilder { - readonly IEventSerializer _serializer; - readonly string _streamName; - readonly ServiceBusProduceOptions? _options; - readonly ServiceBusMessageAttributeNames _attributes; - readonly Action? _setActivityMessageType; - - public ServiceBusMessageBuilder( - IEventSerializer serializer, - string streamName, - ServiceBusMessageAttributeNames attributes, - ServiceBusProduceOptions? options = null, - Action? setActivityMessageType = null - ) { - _serializer = serializer; - _streamName = streamName; - _options = options; - _attributes = attributes; - _setActivityMessageType = setActivityMessageType; - } - +class ServiceBusMessageBuilder( + IEventSerializer serializer, + string streamName, + ServiceBusMessageAttributeNames attributes, + ServiceBusProduceOptions? options = null, + Action? setActivityMessageType = null + ) { /// /// Creates a from the provided . /// This method serializes the event, sets the necessary properties, and adds custom application properties @@ -37,22 +23,22 @@ public ServiceBusMessageBuilder( /// /// internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) { - var (messageType, contentType, payload) = _serializer.SerializeEvent(message.Message); - _setActivityMessageType?.Invoke(messageType); + var (messageType, contentType, payload) = serializer.SerializeEvent(message.Message); + setActivityMessageType?.Invoke(messageType); var metadata = message.Metadata; var serviceBusMessage = new ServiceBusMessage(payload) { ContentType = contentType, - MessageId = metadata?.GetValueOrDefault(_attributes.MessageId, message.MessageId)?.ToString(), - Subject = metadata?.GetValueOrDefault(_attributes.Subject, _options?.Subject)?.ToString(), - TimeToLive = _options?.TimeToLive ?? TimeSpan.MaxValue, + MessageId = metadata?.GetValueOrDefault(attributes.MessageId, message.MessageId)?.ToString(), + Subject = metadata?.GetValueOrDefault(attributes.Subject, options?.Subject)?.ToString(), + TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue, CorrelationId = message.Metadata?.GetCorrelationId(), - To = metadata?.GetValueOrDefault(_attributes.To, _options?.To)?.ToString(), - ReplyTo = metadata?.GetValueOrDefault(_attributes.ReplyTo, _options?.ReplyTo)?.ToString() + To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(), + ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString() }; - var reservedAttributes = _attributes.ReservedNames(); + var reservedAttributes = attributes.ReservedNames(); foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) { serviceBusMessage.ApplicationProperties.Add(property); @@ -64,8 +50,8 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) { IEnumerable> GetCustomApplicationProperties(ProducedMessage message, string messageType, HashSet reservedAttributes) => (message.Metadata ?? []) .Concat(message.AdditionalHeaders ?? []) - .Concat([new(_attributes.MessageType, messageType), new(_attributes.StreamName, _streamName)]) + .Concat([new(attributes.MessageType, messageType), new(attributes.StreamName, streamName)]) .Where(pair => !reservedAttributes.Contains(pair.Key)) .Where(pair => IsSerialisableByServiceBus(pair.Value)) - .Select(pair => new KeyValuePair(pair.Key, pair.Value!)); + .Select(pair => new KeyValuePair(pair.Key, pair.Value!)); } \ No newline at end of file diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Shared/ServiceBusHelper.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Shared/ServiceBusHelper.cs index adf45973..b857e885 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Shared/ServiceBusHelper.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Shared/ServiceBusHelper.cs @@ -1,31 +1,28 @@ -using System; -using System.IO; +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. namespace Eventuous.Azure.ServiceBus.Shared; -public static class ServiceBusHelper -{ - public static bool IsSerialisableByServiceBus(object? value) => - value is not null && ( - value is string || - value is bool || - value is byte || - value is sbyte || - value is short || - value is ushort || - value is int || - value is uint || - value is long || - value is ulong || - value is float || - value is double || - value is decimal || - value is char || - value is Guid || - value is DateTime || - value is DateTimeOffset || - value is Stream || - value is Uri || - value is TimeSpan - ); +public static class ServiceBusHelper { + public static bool IsSerialisableByServiceBus(object? value) => + value is string + or bool + or byte + or sbyte + or short + or ushort + or int + or uint + or long + or ulong + or float + or double + or decimal + or char + or Guid + or DateTime + or DateTimeOffset + or Stream + or Uri + or TimeSpan; } diff --git a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs index f191d560..8d03c0b0 100644 --- a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs +++ b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/IsSerialisableByServiceBus.cs @@ -1,39 +1,38 @@ using static Eventuous.Azure.ServiceBus.Shared.ServiceBusHelper; -using System.Threading.Tasks; namespace Eventuous.Tests.Azure.ServiceBus; public class IsSerialisableByServiceBus { public static IEnumerable> PassingTestData() { yield return () => "string"; - yield return () => 123; // int - yield return () => 123L; // long + yield return () => 123; + yield return () => 123L; yield return () => (short)1; yield return () => (byte)1; - yield return () => 123u; // uint - yield return () => 123UL; // ulong - yield return () => 1.23f; // float - yield return () => 1.23d; // double - yield return () => 12.34m; // decimal - yield return () => true; // bool - yield return () => 'c'; // char + yield return () => 123u; + yield return () => 123UL; + yield return () => 1.23f; + yield return () => 1.23d; + yield return () => 12.34m; + yield return () => true; + yield return () => 'c'; yield return () => Guid.NewGuid(); yield return () => DateTime.UtcNow; yield return () => DateTimeOffset.UtcNow; yield return () => TimeSpan.FromMinutes(5); yield return () => new Uri("https://example.com"); - yield return () => new MemoryStream(); // Stream + yield return () => new MemoryStream(); } public static IEnumerable> FailingTestData() { yield return () => null; - yield return () => new object(); // plain object - yield return () => new Dictionary { [new object()] = "v" }; // dictionary with non-string key - yield return () => new Dictionary { ["k"] = new object() }; // dictionary with non-serializable value - yield return () => new List { new() }; // list with non-serializable item - yield return () => Task.CompletedTask; // Task - yield return () => new Action(() => { }); // delegate - yield return () => new WeakReference(new object()); // complex type + yield return () => new(); // plain object + yield return () => new Dictionary { [new()] = "v" }; // dictionary with non-string key + yield return () => new Dictionary { ["k"] = new() }; // dictionary with non-serializable value + yield return () => new List { new() }; // list with non-serializable item + yield return () => Task.CompletedTask; // Task + yield return () => new Action(() => { }); // delegate + yield return () => new WeakReference(new()); // complex type } [Test] diff --git a/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs b/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs index 57dc4c4a..a04f40a7 100644 --- a/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs +++ b/src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionBuilder.cs @@ -63,8 +63,19 @@ public SubscriptionBuilder AddEventHandler(THandler handler) where THa return this; } - public SubscriptionBuilder AddCompositionEventHandler - <[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] THandler, TWrappingHandler>(Func getWrappingHandler) + /// + /// Adds a composition event handler to the subscription. + /// The inner handler of type will be resolved from the container + /// (keyed by ), and then wrapped by + /// using the provided factory. + /// + /// Inner event handler type to be resolved from the service provider + /// Wrapping event handler type produced by the factory + /// Factory that takes the resolved inner handler and returns the wrapping handler + /// The current instance + public SubscriptionBuilder AddCompositionEventHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] THandler, TWrappingHandler>( + Func getWrappingHandler + ) where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler { Services.TryAddKeyedSingleton(SubscriptionId); AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService(SubscriptionId))); @@ -72,6 +83,16 @@ public SubscriptionBuilder AddCompositionEventHandler return this; } + /// + /// Adds a composition event handler to the subscription with a custom inner handler resolver. + /// The inner handler is created via and then wrapped into + /// using . + /// + /// Inner event handler type + /// Wrapping event handler type + /// Function that resolves or creates the inner handler using the service provider + /// Factory that produces the wrapping handler from the inner handler + /// The current instance public SubscriptionBuilder AddCompositionEventHandler( Func getInnerHandler, Func getWrappingHandler @@ -82,6 +103,26 @@ Func getWrappingHandler return this; } + /// + /// Adds a composition event handler to the subscription with a custom inner handler resolver. + /// The inner handler is created via and then wrapped into + /// using . + /// + /// Inner event handler type + /// Wrapping event handler type + /// Function that resolves or creates the inner handler using the service provider + /// Factory that produces the wrapping handler from the inner handler + /// The current instance + public SubscriptionBuilder AddCompositionEventHandler( + Func getInnerHandler, + Func getWrappingHandler + ) where THandler : class, IEventHandler where TWrappingHandler : class, IEventHandler { + Services.TryAddKeyedSingleton(SubscriptionId, (sp, _) => getInnerHandler(sp)); + AddHandlerResolve(sp => getWrappingHandler(sp.GetRequiredKeyedService(SubscriptionId), sp)); + + return this; + } + /// /// Allows using a custom consumer instead of the one. /// Can also be used to change the default consumer instantiation. @@ -141,6 +182,11 @@ public class SubscriptionBuilder TOptions> : SubscriptionBuilder where T : EventSubscription where TOptions : SubscriptionOptions { + /// + /// Creates a new subscription builder for a specific subscription id. + /// + /// The service collection to register handlers and dependencies with + /// The subscription identifier used to key registrations public SubscriptionBuilder(IServiceCollection services, string subscriptionId) : base(services, subscriptionId) { ResolveConsumer = ResolveDefaultConsumer; ConfigureOptions = options => options.SubscriptionId = subscriptionId; @@ -182,6 +228,14 @@ IMessageConsumer ResolveDefaultConsumer(IServiceProvider sp) { return _resolvedConsumer; } + /// + /// Resolves and builds the subscription instance of type . + /// Applies tracing and consumer filters to the consume pipe when diagnostics are enabled, + /// resolves the configured consumer, and creates the subscription using options keyed by + /// . + /// + /// Service provider used to resolve dependencies + /// The resolved and configured subscription instance public T ResolveSubscription(IServiceProvider sp) { if (_resolvedSubscription != null) { return _resolvedSubscription; diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs index b8cb03df..8fba7024 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs @@ -1,5 +1,6 @@ using Eventuous.Subscriptions; using Eventuous.Subscriptions.Context; +using Eventuous.Subscriptions.Filters; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.TestHost; @@ -35,12 +36,12 @@ public async Task Teardown() { } [Test] - public void ShouldResolveCompositionHandlerWithFactory() { + public async Task ShouldResolveCompositionHandlerWithFactory() { // This test validates that AddCompositionEventHandler correctly registers // handlers when using a factory function var handler = _server.Services.GetRequiredKeyedService("sub-with-factory"); - handler.ShouldNotBeNull(); - handler.Dependency.ShouldNotBeNull(); + await Assert.That(handler).IsNotNull(); + await Assert.That(handler.Dependency.Value).IsEqualTo("test-value"); } [Test] @@ -70,9 +71,10 @@ public async Task ShouldHandleEventWithCompositionHandler() { await sub.Pipe.Send(ctx); var handled = logger.Records.Where(x => x.Context.SubscriptionId == sub.SubscriptionId).ToArray(); - handled.Length.ShouldBe(1); - handled[0].HandlerType.ShouldBe(typeof(CompositionWrapper)); - handled[0].Context.MessageId.ShouldBe(ctx.MessageId); + await Assert.That(handled.Length).IsEqualTo(1); + var handledMessage = handled[0]; + await Assert.That(handledMessage.HandlerType).IsEqualTo(typeof(CompositionWrapper)); + await Assert.That(handledMessage.Context.MessageId).IsEqualTo(ctx.MessageId); } class Startup { @@ -84,8 +86,8 @@ public static void ConfigureServices(IServiceCollection services) { services.AddSubscription( "sub-with-factory", builder => builder.AddCompositionEventHandler( - sp => new TestHandler(sp.GetRequiredService(), sp.GetRequiredService()), - handler => new CompositionWrapper(handler, sp => sp.GetRequiredService()) + sp => new(sp.GetRequiredService(), sp.GetRequiredService()), + (handler, sp) => new(handler, sp.GetRequiredService()) ) ); } @@ -109,14 +111,14 @@ public class TestDependency { public class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { public TestDependency Dependency { get; } = dependency; - public override ValueTask HandleEvent(IMessageConsumeContext ctx) + public override ValueTask HandleEvent(IMessageConsumeContext ctx) => logger.EventReceived(GetType(), ctx); } - public class CompositionWrapper(IEventHandler innerHandler, Func getLogger) : BaseEventHandler { + public class CompositionWrapper(IEventHandler innerHandler, TestHandlerLogger logger) : BaseEventHandler { public override ValueTask HandleEvent(IMessageConsumeContext ctx) { // Wrap the inner handler call - this simulates what PollyEventHandler does - return getLogger(ctx.Services).EventReceived(GetType(), ctx); + return logger.EventReceived(GetType(), ctx); } } diff --git a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs index d014d53e..5004ad08 100644 --- a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs +++ b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs @@ -123,7 +123,8 @@ public async Task ReadEvents(StreamName stream, StreamReadPositio try { return await TryExecute( async () => { - var resolvedEvents = await read.ToArrayAsync(cancellationToken).NoContext(); + var resolvedEvents = await read.ToArrayAsync(cancellationToken).NoContext(); + bool x = true; return ToStreamEvents(resolvedEvents); },