Skip to content
Merged
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<MicrosoftExtensionsVer>9.0.10</MicrosoftExtensionsVer>
</PropertyGroup>
<PropertyGroup Label="Testcontainers version">
<TestcontainersVersion>4.6.0</TestcontainersVersion>
<TestcontainersVersion>4.8.1</TestcontainersVersion>
</PropertyGroup>
<PropertyGroup>
<NpgsqlVersion>9.0.3</NpgsqlVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) {
TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue,
CorrelationId = message.Metadata?.GetCorrelationId(),
To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString()
ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString(),
ReplyToSessionId = options?.ReplyToSessionId,
};

// We set the SessionId only when a value is present because
// it overrides the PartitionKey, even if the SessionId is null.

if (options?.SessionId is not null) {
serviceBusMessage.SessionId = options.SessionId;
}

var reservedAttributes = attributes.ReservedNames();

foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public class ServiceBusProduceOptions {
/// </summary>
public string? ReplyTo { get; set; }

/// <summary>
/// Session ID to guarantee ordering on session-enabled entities.
/// </summary>
public string? SessionId { get; init; }

/// <summary>
/// The reply-to session ID attribute name for request-reply over sessions.
/// </summary>
public string? ReplyToSessionId { get; init; }

/// <summary>
/// Gets or sets the time interval after which the message expires.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ namespace Eventuous.Azure.ServiceBus.Subscriptions;
/// Represents a Service Bus subscription that processes messages from a queue or topic.
/// </summary>
public class ServiceBusSubscription : EventSubscription<ServiceBusSubscriptionOptions> {
readonly ServiceBusClient _client;
readonly Func<ProcessErrorEventArgs, Task> _defaultErrorHandler;
ServiceBusProcessor? _processor;
readonly IServiceBusProcessorStrategy _processorStrategy;

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusSubscription"/> class.
Expand All @@ -26,8 +25,11 @@ public class ServiceBusSubscription : EventSubscription<ServiceBusSubscriptionOp
/// <param name="eventSerializer">Event serializer (optional)</param>
public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) :
base(options, consumePipe, loggerFactory, eventSerializer) {
_client = client;
_defaultErrorHandler = Options.ErrorHandler ?? DefaultErrorHandler;

_processorStrategy = Options.SessionProcessorOptions is not null
? new SessionProcessorStrategy(client, Options, HandleSessionMessage, _defaultErrorHandler)
: new StandardProcessorStrategy(client, Options, HandleMessage, _defaultErrorHandler);
}

/// <summary>
Expand All @@ -36,60 +38,84 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
protected override ValueTask Subscribe(CancellationToken cancellationToken) {
_processor = Options.QueueOrTopic.MakeProcessor(_client, Options);

_processor.ProcessMessageAsync += HandleMessage;
_processor.ProcessErrorAsync += _defaultErrorHandler;

return new(_processor.StartProcessingAsync(cancellationToken));

async Task HandleMessage(ProcessMessageEventArgs arg) {
var ct = arg.CancellationToken;

if (ct.IsCancellationRequested) return;

var msg = arg.Message;

var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString()
?? throw new InvalidOperationException("Event type is missing in message properties");
var contentType = msg.ContentType;

// Should this be a stream name? or topic or something
var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString()
?? throw new InvalidOperationException("Stream name is missing in message properties");

Logger.Current = Log;

var evt = DeserializeData(contentType, eventType, msg.Body, streamName);

var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg));

var ctx = new MessageConsumeContext(
msg.MessageId,
eventType,
contentType,
streamName,
0,
0,
0,
Sequence++,
msg.EnqueuedTime.UtcDateTime,
evt,
AsMeta(applicationProperties),
SubscriptionId,
ct
);

try {
await Handler(ctx).NoContext();
await arg.CompleteMessageAsync(msg, ct).NoContext();
} catch (Exception ex) {
// Abandoning the message will make it available for reprocessing, or dead letter it?
await arg.AbandonMessageAsync(msg, null, ct).NoContext();
await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext();
Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId);
}
protected override ValueTask Subscribe(CancellationToken cancellationToken)
=> _processorStrategy.Start(cancellationToken);

Task HandleMessage(ProcessMessageEventArgs arg)
=> ProcessMessageAsync(
arg.Message,
msg => arg.CompleteMessageAsync(msg, arg.CancellationToken),
msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken),
arg.FullyQualifiedNamespace,
arg.EntityPath,
arg.Identifier,
arg.CancellationToken
);

Task HandleSessionMessage(ProcessSessionMessageEventArgs arg)
=> ProcessMessageAsync(
arg.Message,
msg => arg.CompleteMessageAsync(msg, arg.CancellationToken),
msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken),
arg.FullyQualifiedNamespace,
arg.EntityPath,
arg.Identifier,
arg.CancellationToken
);

async Task ProcessMessageAsync(
ServiceBusReceivedMessage msg,
Func<ServiceBusReceivedMessage, Task> completeMessage,
Func<ServiceBusReceivedMessage, Task> abandonMessage,
string fullyQualifiedNamespace,
string entityPath,
string identifier,
CancellationToken ct
) {
if (ct.IsCancellationRequested) return;

var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) && messageType is not null
? messageType.ToString()
: msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties");
var contentType = msg.ContentType;

// Should this be a stream name? or topic or something
var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) && stream is not null
? stream.ToString()
: Options.QueueOrTopic switch {
Queue queue => queue.Name,
Topic topic => topic.Name,
_ => null
}) ?? throw new InvalidOperationException("Stream name is missing in message properties");

Logger.Current = Log;
var evt = DeserializeData(contentType, eventType, msg.Body, streamName);
var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg));

var ctx = new MessageConsumeContext(
msg.MessageId,
eventType,
contentType,
streamName,
0,
0,
0,
Sequence++,
msg.EnqueuedTime.UtcDateTime,
evt,
AsMeta(applicationProperties),
SubscriptionId,
ct
);

try {
await Handler(ctx).NoContext();
await completeMessage(msg).NoContext();
} catch (Exception ex) {
// Abandoning the message will make it available for reprocessing, or dead letter it?
await abandonMessage(msg).NoContext();
await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext();
Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId);
}
}

Expand All @@ -115,21 +141,67 @@ IEnumerable<KeyValuePair<string, object>> MessageProperties(ServiceBusReceivedMe
static Metadata AsMeta(IEnumerable<KeyValuePair<string, object>> applicationProperties) =>
new(applicationProperties.ToDictionary(pair => pair.Key, object? (pair) => pair.Value));

async Task DefaultErrorHandler(ProcessErrorEventArgs arg) {
// Log the error
Task DefaultErrorHandler(ProcessErrorEventArgs arg) {
Log.ErrorLog?.Log(arg.Exception, "Error processing message: {Identifier}", arg.Identifier);

// Optionally, you can handle the error further, e.g., by sending to a dead-letter queue
await Task.CompletedTask;
return Task.CompletedTask;
}

/// <summary>
/// Unsubscribes from the Service Bus queue or topic and stops processing messages.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
if (_processor == null) return;
await _processor.StopProcessingAsync(cancellationToken).NoContext();
protected override ValueTask Unsubscribe(CancellationToken cancellationToken) => _processorStrategy.Stop(cancellationToken);

interface IServiceBusProcessorStrategy {
ValueTask Start(CancellationToken cancellationToken);
ValueTask Stop(CancellationToken cancellationToken);
}

sealed class StandardProcessorStrategy(
ServiceBusClient client,
ServiceBusSubscriptionOptions options,
Func<ProcessMessageEventArgs, Task> handleMessage,
Func<ProcessErrorEventArgs, Task> handleError
)
: IServiceBusProcessorStrategy {
ServiceBusProcessor? _processor;

public ValueTask Start(CancellationToken cancellationToken) {
_processor = options.QueueOrTopic.MakeProcessor(client, options);
_processor.ProcessMessageAsync += handleMessage;
_processor.ProcessErrorAsync += handleError;

return new(_processor.StartProcessingAsync(cancellationToken));
}

public ValueTask Stop(CancellationToken cancellationToken)
=> _processor is not null
? new(_processor.StopProcessingAsync(cancellationToken))
: ValueTask.CompletedTask;
}

sealed class SessionProcessorStrategy(
ServiceBusClient client,
ServiceBusSubscriptionOptions options,
Func<ProcessSessionMessageEventArgs, Task> handleSessionMessage,
Func<ProcessErrorEventArgs, Task> handleError
)
: IServiceBusProcessorStrategy {
ServiceBusSessionProcessor? _sessionProcessor;

public ValueTask Start(CancellationToken cancellationToken) {
_sessionProcessor = options.QueueOrTopic.MakeSessionProcessor(client, options);
_sessionProcessor.ProcessMessageAsync += handleSessionMessage;
_sessionProcessor.ProcessErrorAsync += handleError;

return new(_sessionProcessor.StartProcessingAsync(cancellationToken));
}

public ValueTask Stop(CancellationToken cancellationToken)
=> _sessionProcessor is not null
? new(_sessionProcessor.StopProcessingAsync(cancellationToken))
: ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public record ServiceBusSubscriptionOptions : SubscriptionOptions {
/// </summary>
public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new();

/// <summary>
/// Gets or sets the options for the session Service Bus processor.
/// If these options are specified, they take priority over <see cref="ProcessorOptions"/>
/// </summary>
public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; }

/// <summary>
/// Gets the message attributes for Service Bus messages.
/// </summary>
Expand All @@ -42,6 +48,14 @@ public interface IQueueOrTopic {
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusProcessor"/> instance.</returns>
ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the specfiied client and options.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> instance.</returns>
ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options);
}

/// <summary>
Expand All @@ -56,6 +70,15 @@ public record Queue(string Name) : IQueueOrTopic {
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the queue.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the queue.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the queue.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, options.SessionProcessorOptions);
}

/// <summary>
Expand All @@ -70,6 +93,15 @@ public record Topic(string Name) : IQueueOrTopic {
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the topic.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, options.SubscriptionId, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the topic and subscription ID from options.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the topic.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, options.SubscriptionId, options.SessionProcessorOptions);
}

/// <summary>
Expand All @@ -84,4 +116,13 @@ public record TopicAndSubscription(string Name, string Subscription) : IQueueOrT
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the topic and subscription.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, Subscription, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the topic and specified subscription.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the topic and subscription.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, Subscription, options.SessionProcessorOptions);
}
Loading