Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ internal ServiceBusMessage CreateServiceBusMessage(ProducedMessage message) {
TimeToLive = options?.TimeToLive ?? TimeSpan.MaxValue,
CorrelationId = message.Metadata?.GetCorrelationId(),
To = metadata?.GetValueOrDefault(attributes.To, options?.To)?.ToString(),
ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString()
ReplyTo = metadata?.GetValueOrDefault(attributes.ReplyTo, options?.ReplyTo)?.ToString(),
ReplyToSessionId = options?.ReplyToSessionId,
};

// We set the SessionId only when a value is present because
// it overrides the PartitionKey, even if the SessionId is null.

if (options?.SessionId is not null) {
serviceBusMessage.SessionId = options.SessionId;
}

var reservedAttributes = attributes.ReservedNames();

foreach (var property in GetCustomApplicationProperties(message, messageType, reservedAttributes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public class ServiceBusProduceOptions {
/// </summary>
public string? ReplyTo { get; set; }

/// <summary>
/// Session ID to guarantee ordering on session-enabled entities.
/// </summary>
public string? SessionId { get; init; }

/// <summary>
/// The reply-to session ID attribute name for request-reply over sessions.
/// </summary>
public string? ReplyToSessionId { get; init; }

/// <summary>
/// Gets or sets the time interval after which the message expires.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ServiceBusSubscription : EventSubscription<ServiceBusSubscriptionOp
readonly ServiceBusClient _client;
readonly Func<ProcessErrorEventArgs, Task> _defaultErrorHandler;
ServiceBusProcessor? _processor;
ServiceBusSessionProcessor? _sessionProcessor;

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusSubscription"/> class.
Expand All @@ -37,12 +38,21 @@ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOpt
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
protected override ValueTask Subscribe(CancellationToken cancellationToken) {
_processor = Options.QueueOrTopic.MakeProcessor(_client, Options);
if (Options.SessionProcessorOptions is not null) {
_sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options);

_processor.ProcessMessageAsync += HandleMessage;
_processor.ProcessErrorAsync += _defaultErrorHandler;
_sessionProcessor.ProcessMessageAsync += HandleSessionMessage;
_sessionProcessor.ProcessErrorAsync += _defaultErrorHandler;

return new(_processor.StartProcessingAsync(cancellationToken));
return new ValueTask(_sessionProcessor.StartProcessingAsync(cancellationToken));
} else {
_processor = Options.QueueOrTopic.MakeProcessor(_client, Options);

_processor.ProcessMessageAsync += HandleMessage;
_processor.ProcessErrorAsync += _defaultErrorHandler;

return new(_processor.StartProcessingAsync(cancellationToken));
}

async Task HandleMessage(ProcessMessageEventArgs arg) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Refactor duplicated logic into a single method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a suggestion ready to commit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Implementation 🛠️

Implementation: Extract the duplicated message handling logic from HandleMessage and HandleSessionMessage into a shared private method to reduce code duplication and improve maintainability.

Suggested change
async Task HandleMessage(ProcessMessageEventArgs arg) {
async Task HandleMessage(ProcessMessageEventArgs arg) {
await ProcessMessageAsync(
arg.Message,
arg.CancellationToken,
msg => arg.CompleteMessageAsync(msg, arg.CancellationToken),
msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken),
arg.FullyQualifiedNamespace,
arg.EntityPath,
arg.Identifier
).NoContext();
}
async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) {
await ProcessMessageAsync(
arg.Message,
arg.CancellationToken,
msg => arg.CompleteMessageAsync(msg, arg.CancellationToken),
msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken),
arg.FullyQualifiedNamespace,
arg.EntityPath,
arg.Identifier
).NoContext();
}
async Task ProcessMessageAsync(
ServiceBusReceivedMessage msg,
CancellationToken ct,
Func<ServiceBusReceivedMessage, Task> completeMessage,
Func<ServiceBusReceivedMessage, Task> abandonMessage,
string fullyQualifiedNamespace,
string entityPath,
string identifier) {
if (ct.IsCancellationRequested) return;
var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString()
?? throw new InvalidOperationException("Event type is missing in message properties");
var contentType = msg.ContentType;
// Should this be a stream name? or topic or something
var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString()
?? throw new InvalidOperationException("Stream name is missing in message properties");
Logger.Current = Log;
var evt = DeserializeData(contentType, eventType, msg.Body, streamName);
var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg));
var ctx = new MessageConsumeContext(
msg.MessageId,
eventType,
contentType,
streamName,
0,
0,
0,
Sequence++,
msg.EnqueuedTime.UtcDateTime,
evt,
AsMeta(applicationProperties),
SubscriptionId,
ct
);
try {
await Handler(ctx).NoContext();
await completeMessage(msg).NoContext();
} catch (Exception ex) {
// Abandoning the message will make it available for reprocessing, or dead letter it?
await abandonMessage(msg).NoContext();
await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext();
Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId);
}
}
📄 References
  • No matching references available

See review comment here

var ct = arg.CancellationToken;
Expand Down Expand Up @@ -91,6 +101,54 @@ async Task HandleMessage(ProcessMessageEventArgs arg) {
Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId);
}
}

async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) {
var ct = arg.CancellationToken;

if (ct.IsCancellationRequested) return;

var msg = arg.Message;

var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString()
?? throw new InvalidOperationException("Event type is missing in message properties");
var contentType = msg.ContentType;

// Should this be a stream name? or topic or something
var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString()
?? throw new InvalidOperationException("Stream name is missing in message properties");

Logger.Current = Log;

var evt = DeserializeData(contentType, eventType, msg.Body, streamName);

var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg));

var ctx = new MessageConsumeContext(
msg.MessageId,
eventType,
contentType,
streamName,
0,
0,
0,
Sequence++,
msg.EnqueuedTime.UtcDateTime,
evt,
AsMeta(applicationProperties),
SubscriptionId,
ct
);

try {
await Handler(ctx).NoContext();
await arg.CompleteMessageAsync(msg, ct).NoContext();
} catch (Exception ex) {
// Abandoning the message will make it available for reprocessing, or dead letter it?
await arg.AbandonMessageAsync(msg, null, ct).NoContext();
await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, arg.FullyQualifiedNamespace, arg.EntityPath, arg.Identifier, arg.CancellationToken)).NoContext();
Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId);
}
}
}

IEnumerable<KeyValuePair<string, object>> MessageProperties(ServiceBusReceivedMessage msg) {
Expand Down Expand Up @@ -129,7 +187,11 @@ async Task DefaultErrorHandler(ProcessErrorEventArgs arg) {
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
if (_processor == null) return;
await _processor.StopProcessingAsync(cancellationToken).NoContext();
if (_sessionProcessor is not null) {
await _sessionProcessor.StopProcessingAsync(cancellationToken).NoContext();
}
else if (_processor is not null) {
await _processor.StopProcessingAsync(cancellationToken).NoContext();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public record ServiceBusSubscriptionOptions : SubscriptionOptions {
/// </summary>
public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new();

/// <summary>
/// Gets or sets the options for the session Service Bus processor.
/// If these options are specified, they take priority over <see cref="ProcessorOptions"/>
/// </summary>
public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; }

/// <summary>
/// Gets the message attributes for Service Bus messages.
/// </summary>
Expand All @@ -42,6 +48,14 @@ public interface IQueueOrTopic {
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusProcessor"/> instance.</returns>
ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the specfiied client and options.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> instance.</returns>
ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options);
}

/// <summary>
Expand All @@ -56,6 +70,15 @@ public record Queue(string Name) : IQueueOrTopic {
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the queue.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the queue.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the queue.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, options.SessionProcessorOptions);
}

/// <summary>
Expand All @@ -70,6 +93,15 @@ public record Topic(string Name) : IQueueOrTopic {
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the topic.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, options.SubscriptionId, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the topic and subscription ID from options.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the topic.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, options.SubscriptionId, options.SessionProcessorOptions);
}

/// <summary>
Expand All @@ -84,4 +116,13 @@ public record TopicAndSubscription(string Name, string Subscription) : IQueueOrT
/// <returns>A configured <see cref="ServiceBusProcessor"/> for the topic and subscription.</returns>
public ServiceBusProcessor MakeProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateProcessor(Name, Subscription, options.ProcessorOptions);

/// <summary>
/// Creates a <see cref="ServiceBusSessionProcessor"/> for the topic and specified subscription.
/// </summary>
/// <param name="client">The Service Bus client.</param>
/// <param name="options">The subscription options.</param>
/// <returns>A configured <see cref="ServiceBusSessionProcessor"/> for the topic and subscription.</returns>
public ServiceBusSessionProcessor MakeSessionProcessor(ServiceBusClient client, ServiceBusSubscriptionOptions options)
=> client.CreateSessionProcessor(Name, Subscription, options.SessionProcessorOptions);
}
Loading