diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs index 171d8ba5..0264d9be 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs @@ -35,9 +35,17 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) { TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue, CorrelationId = message.Metadata?.GetCorrelationId(), To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(), - ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString() + ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString(), + ReplyToSessionId = options?.ReplyToSessionId, }; + // We set the SessionId only when a value is present because + // it overrides the PartitionKey, even if the SessionId is null. + + if (options?.SessionId is not null) { + serviceBusMessage.SessionId = options.SessionId; + } + var reservedAttributes = attributes.ReservedNames(); foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) { diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs index f82d8be1..119ea98d 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs @@ -22,6 +22,16 @@ public class ServiceBusProduceOptions { /// public string? ReplyTo { get; set; } + /// + /// Session ID to guarantee ordering on session-enabled entities. + /// + public string? SessionId { get; init; } + + /// + /// The reply-to session ID attribute name for request-reply over sessions. + /// + public string? ReplyToSessionId { get; init; } + /// /// Gets or sets the time interval after which the message expires. /// diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs index 3cc8c1c2..441f47e0 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs @@ -15,6 +15,7 @@ public class ServiceBusSubscription : EventSubscription _defaultErrorHandler; ServiceBusProcessor? _processor; + ServiceBusSessionProcessor? _sessionProcessor; /// /// Initializes a new instance of the class. @@ -37,12 +38,21 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt /// /// protected override ValueTask Subscribe(CancellationToken cancellationToken) { - _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); + if (Options.SessionProcessorOptions is not null) { + _sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options); - _processor.ProcessMessageAsync += HandleMessage; - _processor.ProcessErrorAsync += _defaultErrorHandler; + _sessionProcessor.ProcessMessageAsync += HandleSessionMessage; + _sessionProcessor.ProcessErrorAsync += _defaultErrorHandler; - return new(_processor.StartProcessingAsync(cancellationToken)); + return new ValueTask(_sessionProcessor.StartProcessingAsync(cancellationToken)); + } else { + _processor = Options.QueueOrTopic.MakeProcessor(_client, Options); + + _processor.ProcessMessageAsync += HandleMessage; + _processor.ProcessErrorAsync += _defaultErrorHandler; + + return new(_processor.StartProcessingAsync(cancellationToken)); + } async Task HandleMessage(ProcessMessageEventArgs arg) { var ct = arg.CancellationToken; @@ -91,6 +101,54 @@ async Task HandleMessage(ProcessMessageEventArgs arg) { Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); } } + + async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) { + var ct = arg.CancellationToken; + + if (ct.IsCancellationRequested) return; + + var msg = arg.Message; + + var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() + ?? throw new InvalidOperationException("Event type is missing in message properties"); + var contentType = msg.ContentType; + + // Should this be a stream name? or topic or something + var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() + ?? throw new InvalidOperationException("Stream name is missing in message properties"); + + Logger.Current = Log; + + var evt = DeserializeData(contentType, eventType, msg.Body, streamName); + + var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); + + var ctx = new MessageConsumeContext( + msg.MessageId, + eventType, + contentType, + streamName, + 0, + 0, + 0, + Sequence++, + msg.EnqueuedTime.UtcDateTime, + evt, + AsMeta(applicationProperties), + SubscriptionId, + ct + ); + + try { + await Handler(ctx).NoContext(); + await arg.CompleteMessageAsync(msg, ct).NoContext(); + } catch (Exception ex) { + // Abandoning the message will make it available for reprocessing, or dead letter it? + await arg.AbandonMessageAsync(msg, null, ct).NoContext(); + await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext(); + Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); + } + } } IEnumerable> MessageProperties(ServiceBusReceivedMessage msg) { @@ -129,7 +187,11 @@ async Task DefaultErrorHandler(ProcessErrorEventArgs arg) { /// /// protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { - if (_processor == null) return; - await _processor.StopProcessingAsync(cancellationToken).NoContext(); + if (_sessionProcessor is not null) { + await _sessionProcessor.StopProcessingAsync(cancellationToken).NoContext(); + } + else if (_processor is not null) { + await _processor.StopProcessingAsync(cancellationToken).NoContext(); + } } } diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs index 6ce02918..2659ca53 100644 --- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs +++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs @@ -20,6 +20,12 @@ public record ServiceBusSubscriptionOptions : SubscriptionOptions { /// public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new(); + /// + /// Gets or sets the options for the session Service Bus processor. + /// If these options are specified, they take priority over + /// + public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; } + /// /// Gets the message attributes for Service Bus messages. /// @@ -42,6 +48,14 @@ public interface IQueueOrTopic { /// The subscription options. /// A configured instance. ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); + + /// + /// Creates a for the specfiied client and options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured instance. + ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options); } /// @@ -56,6 +70,15 @@ public record Queue(string Name) : IQueueOrTopic { /// A configured for the queue. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.ProcessorOptions); + + /// + /// Creates a for the queue. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the queue. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SessionProcessorOptions); } /// @@ -70,6 +93,15 @@ public record Topic(string Name) : IQueueOrTopic { /// A configured for the topic. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, options.SubscriptionId, options.ProcessorOptions); + + /// + /// Creates a for the topic and subscription ID from options. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, options.SubscriptionId, options.SessionProcessorOptions); } /// @@ -84,4 +116,13 @@ public record TopicAndSubscription(string Name, string Subscription) : IQueueOrT /// A configured for the topic and subscription. public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) => client.CreateProcessor(Name, Subscription, options.ProcessorOptions); + + /// + /// Creates a for the topic and specified subscription. + /// + /// The Service Bus client. + /// The subscription options. + /// A configured for the topic and subscription. + public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options) + => client.CreateSessionProcessor(Name, Subscription, options.SessionProcessorOptions); }