From f43b58760b1a5918426c32bb328db6731384e9bb Mon Sep 17 00:00:00 2001 From: alexey-troshkin-xpress Date: Fri, 21 Nov 2025 01:46:54 +0300 Subject: [PATCH 1/8] added ability to specify SessionId and ReplyToSessionId for the producer, along with support for ServiceBusSessionProcessor for the consumer --- .../Producers/ServiceBusMessageBuilder.cs | 10 ++- .../Producers/ServiceBusProduceOptions.cs | 10 +++ .../Subscriptions/ServiceBusSubscription.cs | 66 +++++++++++++++++-- .../ServiceBusSubscriptionOptions.cs | 41 ++++++++++++ 4 files changed, 122 insertions(+), 5 deletions(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs index 171d8ba5..0264d9be 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs @@ -35,9 +35,17 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) { TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue, CorrelationId = message.Metadata?.GetCorrelationId(), To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(), - ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString() + ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString(), + ReplyToSessionId = options?.ReplyToSessionId, }; + // We set the SessionId only when a value is present because + // it overrides the PartitionKey, even if the SessionId is null. + + if (options?.SessionId is not null) { + serviceBusMessage.SessionId = options.SessionId; + } + var reservedAttributes = attributes.ReservedNames(); foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) { diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs index f82d8be1..119ea98d 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs @@ -22,6 +22,16 @@ public class ServiceBusProduceOptions { /// public string? ReplyTo { get; set; } + /// + /// Session ID to guarantee ordering on session-enabled entities. + /// + public string? SessionId { get; init; } + + /// + /// The reply-to session ID attribute name for request-reply over sessions. + /// + public string? ReplyToSessionId { get; init; } + /// /// Gets or sets the time interval after which the message expires. /// diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 3cc8c1c2..41bb3386 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -15,6 +15,7 @@ public class ServiceBusSubscription : EventSubscription _defaultErrorHandler; ServiceBusProcessor? _processor; + ServiceBusSessionProcessor? _sessionProcessor; /// /// Initializes a new instance of the class. @@ -37,12 +38,21 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt /// /// protected override ValueTask Subscribe(CancellationToken cancellationToken) { - _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); + if (Options.SessionProcessorOptions is not null) { + _sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options); - _processor.ProcessMessageAsync += HandleMessage; - _processor.ProcessErrorAsync += _defaultErrorHandler; + _sessionProcessor.ProcessMessageAsync += HandleSessionMessage; + _sessionProcessor.ProcessErrorAsync += _defaultErrorHandler; - return new(_processor.StartProcessingAsync(cancellationToken)); + return new ValueTask(_sessionProcessor.StartProcessingAsync(cancellationToken)); + } else { + _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); + + _processor.ProcessMessageAsync += HandleMessage; + _processor.ProcessErrorAsync += _defaultErrorHandler; + + return new(_processor.StartProcessingAsync(cancellationToken)); + } async Task HandleMessage(ProcessMessageEventArgs arg) { var ct = arg.CancellationToken; @@ -91,6 +101,54 @@ async Task HandleMessage(ProcessMessageEventArgs arg) { Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); } } + + async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) { + var ct = arg.CancellationToken; + + if (ct.IsCancellationRequested) return; + + var msg = arg.Message; + + var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() + ?? throw new InvalidOperationException("Event type is missing in message properties"); + var contentType = msg.ContentType; + + // Should this be a stream name? or topic or something + var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() + ?? throw new InvalidOperationException("Stream name is missing in message properties"); + + Logger.Current = Log; + + var evt = DeserializeData(contentType, eventType, msg.Body, streamName); + + var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); + + var ctx = new MessageConsumeContext( + msg.MessageId, + eventType, + contentType, + streamName, + 0, + 0, + 0, + Sequence++, + msg.EnqueuedTime.UtcDateTime, + evt, + AsMeta(applicationProperties), + SubscriptionId, + ct + ); + + try { + await Handler(ctx).NoContext(); + await arg.CompleteMessageAsync(msg, ct).NoContext(); + } catch (Exception ex) { + // Abandoning the message will make it available for reprocessing, or dead letter it? + await arg.AbandonMessageAsync(msg, null, ct).NoContext(); + await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext(); + Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); + } + } } IEnumerable> MessageProperties(ServiceBusReceivedMessage msg) { diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs index 6ce02918..2659ca53 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs @@ -20,6 +20,12 @@ public record ServiceBusSubscriptionOptions : SubscriptionOptions { /// public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new(); + /// + /// Gets or sets the options for the session Service Bus processor. + /// If these options are specified, they take priority over + /// + public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; } + /// /// Gets the message attributes for Service Bus messages. /// @@ -42,6 +48,14 @@ public interface IQueueOrTopic { /// The subscription options. /// A configured instance. ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); + + /// + /// Creates a for the specfiied client and options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured instance. + ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); } /// @@ -56,6 +70,15 @@ public record Queue(string Name) : IQueueOrTopic { /// A configured for the queue. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.ProcessorOptions); + + /// + /// Creates a for the queue. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the queue. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SessionProcessorOptions); } /// @@ -70,6 +93,15 @@ public record Topic(string Name) : IQueueOrTopic { /// A configured for the topic. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.SubscriptionId, options.ProcessorOptions); + + /// + /// Creates a for the topic and subscription ID from options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SubscriptionId, options.SessionProcessorOptions); } /// @@ -84,4 +116,13 @@ public record TopicAndSubscription(string Name, string Subscription) : IQueueOrT /// A configured for the topic and subscription. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, Subscription, options.ProcessorOptions); + + /// + /// Creates a for the topic and specified subscription. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic and subscription. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, Subscription, options.SessionProcessorOptions); } From adb53f42258260932de3fc7c76b6896a60744c05 Mon Sep 17 00:00:00 2001 From: alexey-troshkin-xpress Date: Fri, 21 Nov 2025 01:59:18 +0300 Subject: [PATCH 2/8] handle unsubscribe for session processor --- .../Subscriptions/ServiceBusSubscription.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 41bb3386..441f47e0 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -187,7 +187,11 @@ async Task DefaultErrorHandler(ProcessErrorEventArgs arg) { /// /// protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { - if (_processor == null) return; - await _processor.StopProcessingAsync(cancellationToken).NoContext(); + if (_sessionProcessor is not null) { + await _sessionProcessor.StopProcessingAsync(cancellationToken).NoContext(); + } + else if (_processor is not null) { + await _processor.StopProcessingAsync(cancellationToken).NoContext(); + } } } From 191b92069d5510b8fc4cc1b1c2c8aab6a051241a Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Sun, 23 Nov 2025 01:28:23 +1100 Subject: [PATCH 3/8] make verified file paths shorter (#472) --- ...eCommandsTests.MapContractExplicitly.verified.txt} | 0 ...ts.MapContractExplicitlyWithoutRoute.verified.txt} | 0 ...xplicitlyWithoutRouteWithGenericAttr.verified.txt} | 0 .../AggregateCommandsTests.cs | 11 ++++++----- 4 files changed, 6 insertions(+), 5 deletions(-) rename src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/{AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRouteWithGenericAttr_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt => AggregateCommandsTests.MapContractExplicitly.verified.txt} (100%) rename src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/{AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRoute_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt => AggregateCommandsTests.MapContractExplicitlyWithoutRoute.verified.txt} (100%) rename src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/{AggregateCommandsTests.MapAggregateContractToCommandExplicitly_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt => AggregateCommandsTests.MapContractExplicitlyWithoutRouteWithGenericAttr.verified.txt} (100%) diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRouteWithGenericAttr_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitly.verified.txt similarity index 100% rename from src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRouteWithGenericAttr_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt rename to src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitly.verified.txt diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRoute_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitlyWithoutRoute.verified.txt similarity index 100% rename from src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitlyWithoutRoute_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt rename to src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitlyWithoutRoute.verified.txt diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitly_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitlyWithoutRouteWithGenericAttr.verified.txt similarity index 100% rename from src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapAggregateContractToCommandExplicitly_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt rename to src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapContractExplicitlyWithoutRouteWithGenericAttr.verified.txt diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs index 646dca2f..bd02fa94 100644 --- a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs +++ b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs @@ -9,7 +9,7 @@ namespace Eventuous.Tests.Extensions.AspNetCore; [ClassDataSource>] public class AggregateCommandsTests(WebApplicationFactory factory) : TestBaseWithLogs { [Test] - public async Task MapAggregateContractToCommandExplicitly() { + public async Task MapContractExplicitly() { var fixture = new ServerFixture( factory, _ => { }, @@ -22,7 +22,7 @@ public async Task MapAggregateContractToCommandExplicitly() { } [Test] - public async Task MapAggregateContractToCommandExplicitlyWithoutRoute() { + public async Task MapContractExplicitlyWithoutRoute() { var fixture = new ServerFixture( factory, _ => { }, @@ -35,7 +35,7 @@ public async Task MapAggregateContractToCommandExplicitlyWithoutRoute() { } [Test] - public async Task MapAggregateContractToCommandExplicitlyWithoutRouteWithGenericAttr() { + public async Task MapContractExplicitlyWithoutRouteWithGenericAttr() { var fixture = new ServerFixture( factory, _ => { }, @@ -48,7 +48,7 @@ public async Task MapAggregateContractToCommandExplicitlyWithoutRouteWithGeneric } [Test] - public void MapAggregateContractToCommandExplicitlyWithoutRouteWithWrongGenericAttr() { + public void MapContractExplicitlyWithoutRouteWithWrongGenericAttr() { Assert.Throws(Act); return; @@ -92,6 +92,7 @@ static async Task Execute(ServerFixture fixture, string route) { ); var content = await fixture.ExecuteRequest(import, route, bookRoom.BookingId); - await VerifyJson(content); + await VerifyJson(content) + .IgnoreParameters(); } } From eb1f925b1b3d3a1d37d80860778602d081c0d993 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Sat, 22 Nov 2025 15:38:57 +0100 Subject: [PATCH 4/8] Refactor one more test for shorter name --- ...> AggregateCommandsTests.MapEnrichedCommand.verified.txt} | 0 .../AggregateCommandsTests.cs | 5 ++--- 2 files changed, 2 insertions(+), 3 deletions(-) rename src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/{AggregateCommandsTests.MapEnrichedCommand_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt => AggregateCommandsTests.MapEnrichedCommand.verified.txt} (100%) diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapEnrichedCommand_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapEnrichedCommand.verified.txt similarity index 100% rename from src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapEnrichedCommand_factory=Microsoft.AspNetCore.Mvc.Testing.WebApplicationFactory`1[Program].verified.txt rename to src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.MapEnrichedCommand.verified.txt diff --git a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs index bd02fa94..a0c7449b 100644 --- a/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs +++ b/src/Extensions/test/Eventuous.Tests.Extensions.AspNetCore/AggregateCommandsTests.cs @@ -77,7 +77,7 @@ public async Task MapEnrichedCommand() { ); var cmd = ServerFixture.GetBookRoom(); var content = await fixture.ExecuteRequest(cmd, "book", cmd.BookingId); - await VerifyJson(content); + await VerifyJson(content).IgnoreParameters(); } static async Task Execute(ServerFixture fixture, string route) { @@ -92,7 +92,6 @@ static async Task Execute(ServerFixture fixture, string route) { ); var content = await fixture.ExecuteRequest(import, route, bookRoom.BookingId); - await VerifyJson(content) - .IgnoreParameters(); + await VerifyJson(content).IgnoreParameters(); } } From f832640b912cdfc5e4102c302c639cf19c9244ae Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Sat, 22 Nov 2025 16:04:11 +0100 Subject: [PATCH 5/8] * Update TestContainers for the emulator * Remove duplicate code * Check if properties exist and add a fall back --- Directory.Packages.props | 2 +- .../Subscriptions/ServiceBusSubscription.cs | 118 ++++++++---------- 2 files changed, 50 insertions(+), 70 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index dfcedb82..6621085a 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -15,7 +15,7 @@ 9.0.10 - 4.6.0 + 4.8.1 9.0.3 diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 441f47e0..bd9a44e2 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -44,83 +44,65 @@ protected override ValueTask Subscribe(CancellationToken cancellationToken) { _sessionProcessor.ProcessMessageAsync += HandleSessionMessage; _sessionProcessor.ProcessErrorAsync += _defaultErrorHandler; - return new ValueTask(_sessionProcessor.StartProcessingAsync(cancellationToken)); - } else { - _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); - - _processor.ProcessMessageAsync += HandleMessage; - _processor.ProcessErrorAsync += _defaultErrorHandler; - - return new(_processor.StartProcessingAsync(cancellationToken)); + return new(_sessionProcessor.StartProcessingAsync(cancellationToken)); } - async Task HandleMessage(ProcessMessageEventArgs arg) { - var ct = arg.CancellationToken; - - if (ct.IsCancellationRequested) return; - - var msg = arg.Message; + _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); - var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() - ?? throw new InvalidOperationException("Event type is missing in message properties"); - var contentType = msg.ContentType; - - // Should this be a stream name? or topic or something - var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() - ?? throw new InvalidOperationException("Stream name is missing in message properties"); - - Logger.Current = Log; + _processor.ProcessMessageAsync += HandleMessage; + _processor.ProcessErrorAsync += _defaultErrorHandler; - var evt = DeserializeData(contentType, eventType, msg.Body, streamName); + return new(_processor.StartProcessingAsync(cancellationToken)); - var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); - - var ctx = new MessageConsumeContext( - msg.MessageId, - eventType, - contentType, - streamName, - 0, - 0, - 0, - Sequence++, - msg.EnqueuedTime.UtcDateTime, - evt, - AsMeta(applicationProperties), - SubscriptionId, - ct + Task HandleMessage(ProcessMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + arg.CancellationToken, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier ); - try { - await Handler(ctx).NoContext(); - await arg.CompleteMessageAsync(msg, ct).NoContext(); - } catch (Exception ex) { - // Abandoning the message will make it available for reprocessing, or dead letter it? - await arg.AbandonMessageAsync(msg, null, ct).NoContext(); - await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext(); - Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); - } - } - - async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) { - var ct = arg.CancellationToken; + Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + arg.CancellationToken, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier + ); + async Task ProcessMessageAsync( + ServiceBusReceivedMessage msg, + CancellationToken ct, + Func completeMessage, + Func abandonMessage, + string fullyQualifiedNamespace, + string entityPath, + string identifier + ) { if (ct.IsCancellationRequested) return; - var msg = arg.Message; - - var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() - ?? throw new InvalidOperationException("Event type is missing in message properties"); + var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) + ? messageType.ToString() + : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties"); var contentType = msg.ContentType; // Should this be a stream name? or topic or something - var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() - ?? throw new InvalidOperationException("Stream name is missing in message properties"); + var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) + ? stream.ToString() + : Options.QueueOrTopic switch { + Queue queue => queue.Name, + Topic topic => topic.Name, + _ => null + }) ?? throw new InvalidOperationException("Stream name is missing in message properties"); Logger.Current = Log; - - var evt = DeserializeData(contentType, eventType, msg.Body, streamName); - + var evt = DeserializeData(contentType, eventType, msg.Body, streamName); var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); var ctx = new MessageConsumeContext( @@ -141,11 +123,11 @@ async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) { try { await Handler(ctx).NoContext(); - await arg.CompleteMessageAsync(msg, ct).NoContext(); + await completeMessage(msg).NoContext(); } catch (Exception ex) { // Abandoning the message will make it available for reprocessing, or dead letter it? - await arg.AbandonMessageAsync(msg, null, ct).NoContext(); - await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext(); + await abandonMessage(msg).NoContext(); + await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext(); Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); } } @@ -173,12 +155,10 @@ IEnumerable> MessageProperties(ServiceBusReceivedMe static Metadata AsMeta(IEnumerable> applicationProperties) => new(applicationProperties.ToDictionary(pair => pair.Key, object? (pair) => pair.Value)); - async Task DefaultErrorHandler(ProcessErrorEventArgs arg) { - // Log the error + Task DefaultErrorHandler(ProcessErrorEventArgs arg) { Log.ErrorLog?.Log(arg.Exception, "Error processing message: {Identifier}", arg.Identifier); - // Optionally, you can handle the error further, e.g., by sending to a dead-letter queue - await Task.CompletedTask; + return Task.CompletedTask; } /// From 35baa5a6f07cb3cc8e2602bc98ee23f0783fd98c Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Sat, 22 Nov 2025 16:16:10 +0100 Subject: [PATCH 6/8] Replace with composition --- .../Subscriptions/ServiceBusSubscription.cs | 234 ++++++++++-------- 1 file changed, 132 insertions(+), 102 deletions(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index bd9a44e2..5799d91c 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -12,10 +12,8 @@ namespace Eventuous.Azure.ServiceBus.Subscriptions; /// Represents a Service Bus subscription that processes messages from a queue or topic. /// public class ServiceBusSubscription : EventSubscription { - readonly ServiceBusClient _client; readonly Func _defaultErrorHandler; - ServiceBusProcessor? _processor; - ServiceBusSessionProcessor? _sessionProcessor; + readonly IServiceBusProcessorStrategy _processorStrategy; /// /// Initializes a new instance of the class. @@ -27,8 +25,11 @@ public class ServiceBusSubscription : EventSubscriptionEvent serializer (optional) public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) : base(options, consumePipe, loggerFactory, eventSerializer) { - _client = client; _defaultErrorHandler = Options.ErrorHandler ?? DefaultErrorHandler; + + _processorStrategy = Options.SessionProcessorOptions is not null + ? new SessionProcessorStrategy(client, Options, HandleSessionMessage, _defaultErrorHandler) + : new StandardProcessorStrategy(client, Options, HandleMessage, _defaultErrorHandler); } /// @@ -37,99 +38,84 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt /// /// /// - protected override ValueTask Subscribe(CancellationToken cancellationToken) { - if (Options.SessionProcessorOptions is not null) { - _sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options); - - _sessionProcessor.ProcessMessageAsync += HandleSessionMessage; - _sessionProcessor.ProcessErrorAsync += _defaultErrorHandler; - - return new(_sessionProcessor.StartProcessingAsync(cancellationToken)); - } - - _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); - - _processor.ProcessMessageAsync += HandleMessage; - _processor.ProcessErrorAsync += _defaultErrorHandler; - - return new(_processor.StartProcessingAsync(cancellationToken)); - - Task HandleMessage(ProcessMessageEventArgs arg) - => ProcessMessageAsync( - arg.Message, - arg.CancellationToken, - msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), - msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), - arg.FullyQualifiedNamespace, - arg.EntityPath, - arg.Identifier - ); - - Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) - => ProcessMessageAsync( - arg.Message, - arg.CancellationToken, - msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), - msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), - arg.FullyQualifiedNamespace, - arg.EntityPath, - arg.Identifier - ); - - async Task ProcessMessageAsync( - ServiceBusReceivedMessage msg, - CancellationToken ct, - Func completeMessage, - Func abandonMessage, - string fullyQualifiedNamespace, - string entityPath, - string identifier - ) { - if (ct.IsCancellationRequested) return; - - var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) - ? messageType.ToString() - : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties"); - var contentType = msg.ContentType; - - // Should this be a stream name? or topic or something - var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) - ? stream.ToString() - : Options.QueueOrTopic switch { - Queue queue => queue.Name, - Topic topic => topic.Name, - _ => null - }) ?? throw new InvalidOperationException("Stream name is missing in message properties"); - - Logger.Current = Log; - var evt = DeserializeData(contentType, eventType, msg.Body, streamName); - var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); - - var ctx = new MessageConsumeContext( - msg.MessageId, - eventType, - contentType, - streamName, - 0, - 0, - 0, - Sequence++, - msg.EnqueuedTime.UtcDateTime, - evt, - AsMeta(applicationProperties), - SubscriptionId, - ct - ); - - try { - await Handler(ctx).NoContext(); - await completeMessage(msg).NoContext(); - } catch (Exception ex) { - // Abandoning the message will make it available for reprocessing, or dead letter it? - await abandonMessage(msg).NoContext(); - await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext(); - Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); - } + protected override ValueTask Subscribe(CancellationToken cancellationToken) + => _processorStrategy.Start(cancellationToken); + + Task HandleMessage(ProcessMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier, + arg.CancellationToken + ); + + Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier, + arg.CancellationToken + ); + + async Task ProcessMessageAsync( + ServiceBusReceivedMessage msg, + Func completeMessage, + Func abandonMessage, + string fullyQualifiedNamespace, + string entityPath, + string identifier, + CancellationToken ct + ) { + if (ct.IsCancellationRequested) return; + + var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) + ? messageType.ToString() + : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties"); + var contentType = msg.ContentType; + + // Should this be a stream name? or topic or something + var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) + ? stream.ToString() + : Options.QueueOrTopic switch { + Queue queue => queue.Name, + Topic topic => topic.Name, + _ => null + }) ?? throw new InvalidOperationException("Stream name is missing in message properties"); + + Logger.Current = Log; + var evt = DeserializeData(contentType, eventType, msg.Body, streamName); + var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); + + var ctx = new MessageConsumeContext( + msg.MessageId, + eventType, + contentType, + streamName, + 0, + 0, + 0, + Sequence++, + msg.EnqueuedTime.UtcDateTime, + evt, + AsMeta(applicationProperties), + SubscriptionId, + ct + ); + + try { + await Handler(ctx).NoContext(); + await completeMessage(msg).NoContext(); + } catch (Exception ex) { + // Abandoning the message will make it available for reprocessing, or dead letter it? + await abandonMessage(msg).NoContext(); + await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext(); + Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); } } @@ -166,12 +152,56 @@ Task DefaultErrorHandler(ProcessErrorEventArgs arg) { /// /// /// - protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { - if (_sessionProcessor is not null) { - await _sessionProcessor.StopProcessingAsync(cancellationToken).NoContext(); + protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => _processorStrategy.Stop(cancellationToken); + + interface IServiceBusProcessorStrategy { + ValueTask Start(CancellationToken cancellationToken); + ValueTask Stop(CancellationToken cancellationToken); + } + + sealed class StandardProcessorStrategy( + ServiceBusClient client, + ServiceBusSubscriptionOptions options, + Func handleMessage, + Func handleError + ) + : IServiceBusProcessorStrategy { + ServiceBusProcessor? _processor; + + public ValueTask Start(CancellationToken cancellationToken) { + _processor = options.QueueOrTopic.MakeProcessor(client, options); + _processor.ProcessMessageAsync += handleMessage; + _processor.ProcessErrorAsync += handleError; + + return new(_processor.StartProcessingAsync(cancellationToken)); } - else if (_processor is not null) { - await _processor.StopProcessingAsync(cancellationToken).NoContext(); + + public ValueTask Stop(CancellationToken cancellationToken) + => _processor is not null + ? new(_processor.StopProcessingAsync(cancellationToken)) + : ValueTask.CompletedTask; + } + + sealed class SessionProcessorStrategy( + ServiceBusClient client, + ServiceBusSubscriptionOptions options, + Func handleSessionMessage, + Func handleError + ) + : IServiceBusProcessorStrategy { + ServiceBusSessionProcessor? _sessionProcessor; + + public ValueTask Start(CancellationToken cancellationToken) { + _sessionProcessor = options.QueueOrTopic.MakeSessionProcessor(client, options); + _sessionProcessor.ProcessMessageAsync += handleSessionMessage; + _sessionProcessor.ProcessErrorAsync += handleError; + + return new(_sessionProcessor.StartProcessingAsync(cancellationToken)); } + + public ValueTask Stop(CancellationToken cancellationToken) + => _sessionProcessor is not null + ? new(_sessionProcessor.StopProcessingAsync(cancellationToken)) + : ValueTask.CompletedTask; } } From 5b1331d1505dc2d1afd487045cdc661b53b14e09 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Sat, 22 Nov 2025 16:50:35 +0100 Subject: [PATCH 7/8] Update src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com> --- .../Subscriptions/ServiceBusSubscription.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 5799d91c..141abfc3 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -74,7 +74,7 @@ CancellationToken ct ) { if (ct.IsCancellationRequested) return; - var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) + var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) && messageType is not null ? messageType.ToString() : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties"); var contentType = msg.ContentType; From c08bd14a0cf12aee3d514e4649d4a7cb65f922e3 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Sat, 22 Nov 2025 16:51:17 +0100 Subject: [PATCH 8/8] Update src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com> --- .../Subscriptions/ServiceBusSubscription.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 141abfc3..daa30259 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -80,7 +80,7 @@ CancellationToken ct var contentType = msg.ContentType; // Should this be a stream name? or topic or something - var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) + var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) && stream is not null ? stream.ToString() : Options.QueueOrTopic switch { Queue queue => queue.Name,