diff --git a/Directory.Packages.props b/Directory.Packages.props index dfcedb82..6621085a 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -15,7 +15,7 @@ 9.0.10 - 4.6.0 + 4.8.1 9.0.3 diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs index 171d8ba5..0264d9be 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs @@ -35,9 +35,17 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) { TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue, CorrelationId = message.Metadata?.GetCorrelationId(), To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(), - ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString() + ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString(), + ReplyToSessionId = options?.ReplyToSessionId, }; + // We set the SessionId only when a value is present because + // it overrides the PartitionKey, even if the SessionId is null. + + if (options?.SessionId is not null) { + serviceBusMessage.SessionId = options.SessionId; + } + var reservedAttributes = attributes.ReservedNames(); foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) { diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs index f82d8be1..119ea98d 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs @@ -22,6 +22,16 @@ public class ServiceBusProduceOptions { /// public string? ReplyTo { get; set; } + /// + /// Session ID to guarantee ordering on session-enabled entities. + /// + public string? SessionId { get; init; } + + /// + /// The reply-to session ID attribute name for request-reply over sessions. + /// + public string? ReplyToSessionId { get; init; } + /// /// Gets or sets the time interval after which the message expires. /// diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 3cc8c1c2..daa30259 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -12,9 +12,8 @@ namespace Eventuous.Azure.ServiceBus.Subscriptions; /// Represents a Service Bus subscription that processes messages from a queue or topic. /// public class ServiceBusSubscription : EventSubscription { - readonly ServiceBusClient _client; readonly Func _defaultErrorHandler; - ServiceBusProcessor? _processor; + readonly IServiceBusProcessorStrategy _processorStrategy; /// /// Initializes a new instance of the class. @@ -26,8 +25,11 @@ public class ServiceBusSubscription : EventSubscriptionEvent serializer (optional) public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) : base(options, consumePipe, loggerFactory, eventSerializer) { - _client = client; _defaultErrorHandler = Options.ErrorHandler ?? DefaultErrorHandler; + + _processorStrategy = Options.SessionProcessorOptions is not null + ? new SessionProcessorStrategy(client, Options, HandleSessionMessage, _defaultErrorHandler) + : new StandardProcessorStrategy(client, Options, HandleMessage, _defaultErrorHandler); } /// @@ -36,60 +38,84 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt /// /// /// - protected override ValueTask Subscribe(CancellationToken cancellationToken) { - _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); - - _processor.ProcessMessageAsync += HandleMessage; - _processor.ProcessErrorAsync += _defaultErrorHandler; - - return new(_processor.StartProcessingAsync(cancellationToken)); - - async Task HandleMessage(ProcessMessageEventArgs arg) { - var ct = arg.CancellationToken; - - if (ct.IsCancellationRequested) return; - - var msg = arg.Message; - - var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() - ?? throw new InvalidOperationException("Event type is missing in message properties"); - var contentType = msg.ContentType; - - // Should this be a stream name? or topic or something - var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() - ?? throw new InvalidOperationException("Stream name is missing in message properties"); - - Logger.Current = Log; - - var evt = DeserializeData(contentType, eventType, msg.Body, streamName); - - var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); - - var ctx = new MessageConsumeContext( - msg.MessageId, - eventType, - contentType, - streamName, - 0, - 0, - 0, - Sequence++, - msg.EnqueuedTime.UtcDateTime, - evt, - AsMeta(applicationProperties), - SubscriptionId, - ct - ); - - try { - await Handler(ctx).NoContext(); - await arg.CompleteMessageAsync(msg, ct).NoContext(); - } catch (Exception ex) { - // Abandoning the message will make it available for reprocessing, or dead letter it? - await arg.AbandonMessageAsync(msg, null, ct).NoContext(); - await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext(); - Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); - } + protected override ValueTask Subscribe(CancellationToken cancellationToken) + => _processorStrategy.Start(cancellationToken); + + Task HandleMessage(ProcessMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier, + arg.CancellationToken + ); + + Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) + => ProcessMessageAsync( + arg.Message, + msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), + msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), + arg.FullyQualifiedNamespace, + arg.EntityPath, + arg.Identifier, + arg.CancellationToken + ); + + async Task ProcessMessageAsync( + ServiceBusReceivedMessage msg, + Func completeMessage, + Func abandonMessage, + string fullyQualifiedNamespace, + string entityPath, + string identifier, + CancellationToken ct + ) { + if (ct.IsCancellationRequested) return; + + var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) && messageType is not null + ? messageType.ToString() + : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties"); + var contentType = msg.ContentType; + + // Should this be a stream name? or topic or something + var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) && stream is not null + ? stream.ToString() + : Options.QueueOrTopic switch { + Queue queue => queue.Name, + Topic topic => topic.Name, + _ => null + }) ?? throw new InvalidOperationException("Stream name is missing in message properties"); + + Logger.Current = Log; + var evt = DeserializeData(contentType, eventType, msg.Body, streamName); + var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); + + var ctx = new MessageConsumeContext( + msg.MessageId, + eventType, + contentType, + streamName, + 0, + 0, + 0, + Sequence++, + msg.EnqueuedTime.UtcDateTime, + evt, + AsMeta(applicationProperties), + SubscriptionId, + ct + ); + + try { + await Handler(ctx).NoContext(); + await completeMessage(msg).NoContext(); + } catch (Exception ex) { + // Abandoning the message will make it available for reprocessing, or dead letter it? + await abandonMessage(msg).NoContext(); + await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext(); + Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); } } @@ -115,12 +141,10 @@ IEnumerable> MessageProperties(ServiceBusReceivedMe static Metadata AsMeta(IEnumerable> applicationProperties) => new(applicationProperties.ToDictionary(pair => pair.Key, object? (pair) => pair.Value)); - async Task DefaultErrorHandler(ProcessErrorEventArgs arg) { - // Log the error + Task DefaultErrorHandler(ProcessErrorEventArgs arg) { Log.ErrorLog?.Log(arg.Exception, "Error processing message: {Identifier}", arg.Identifier); - // Optionally, you can handle the error further, e.g., by sending to a dead-letter queue - await Task.CompletedTask; + return Task.CompletedTask; } /// @@ -128,8 +152,56 @@ async Task DefaultErrorHandler(ProcessErrorEventArgs arg) { /// /// /// - protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { - if (_processor == null) return; - await _processor.StopProcessingAsync(cancellationToken).NoContext(); + protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => _processorStrategy.Stop(cancellationToken); + + interface IServiceBusProcessorStrategy { + ValueTask Start(CancellationToken cancellationToken); + ValueTask Stop(CancellationToken cancellationToken); + } + + sealed class StandardProcessorStrategy( + ServiceBusClient client, + ServiceBusSubscriptionOptions options, + Func handleMessage, + Func handleError + ) + : IServiceBusProcessorStrategy { + ServiceBusProcessor? _processor; + + public ValueTask Start(CancellationToken cancellationToken) { + _processor = options.QueueOrTopic.MakeProcessor(client, options); + _processor.ProcessMessageAsync += handleMessage; + _processor.ProcessErrorAsync += handleError; + + return new(_processor.StartProcessingAsync(cancellationToken)); + } + + public ValueTask Stop(CancellationToken cancellationToken) + => _processor is not null + ? new(_processor.StopProcessingAsync(cancellationToken)) + : ValueTask.CompletedTask; + } + + sealed class SessionProcessorStrategy( + ServiceBusClient client, + ServiceBusSubscriptionOptions options, + Func handleSessionMessage, + Func handleError + ) + : IServiceBusProcessorStrategy { + ServiceBusSessionProcessor? _sessionProcessor; + + public ValueTask Start(CancellationToken cancellationToken) { + _sessionProcessor = options.QueueOrTopic.MakeSessionProcessor(client, options); + _sessionProcessor.ProcessMessageAsync += handleSessionMessage; + _sessionProcessor.ProcessErrorAsync += handleError; + + return new(_sessionProcessor.StartProcessingAsync(cancellationToken)); + } + + public ValueTask Stop(CancellationToken cancellationToken) + => _sessionProcessor is not null + ? new(_sessionProcessor.StopProcessingAsync(cancellationToken)) + : ValueTask.CompletedTask; } } diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs index 6ce02918..2659ca53 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs @@ -20,6 +20,12 @@ public record ServiceBusSubscriptionOptions : SubscriptionOptions { /// public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new(); + /// + /// Gets or sets the options for the session Service Bus processor. + /// If these options are specified, they take priority over + /// + public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; } + /// /// Gets the message attributes for Service Bus messages. /// @@ -42,6 +48,14 @@ public interface IQueueOrTopic { /// The subscription options. /// A configured instance. ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); + + /// + /// Creates a for the specfiied client and options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured instance. + ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); } /// @@ -56,6 +70,15 @@ public record Queue(string Name) : IQueueOrTopic { /// A configured for the queue. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.ProcessorOptions); + + /// + /// Creates a for the queue. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the queue. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SessionProcessorOptions); } /// @@ -70,6 +93,15 @@ public record Topic(string Name) : IQueueOrTopic { /// A configured for the topic. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.SubscriptionId, options.ProcessorOptions); + + /// + /// Creates a for the topic and subscription ID from options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SubscriptionId, options.SessionProcessorOptions); } /// @@ -84,4 +116,13 @@ public record TopicAndSubscription(string Name, string Subscription) : IQueueOrT /// A configured for the topic and subscription. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, Subscription, options.ProcessorOptions); + + /// + /// Creates a for the topic and specified subscription. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic and subscription. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, Subscription, options.SessionProcessorOptions); }