-
-
Notifications
You must be signed in to change notification settings - Fork 94
SessionId support for Eventuous.Azure.ServiceBus #470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SessionId support for Eventuous.Azure.ServiceBus #470
Conversation
alexey-troshkin-xpress
commented
Nov 20, 2025
•
edited by qodo-free-for-open-source-projects
bot
Loading
edited by qodo-free-for-open-source-projects
bot
…cer, along with support for ServiceBusSessionProcessor for the consumer
PR Compliance Guide 🔍(Compliance updated until commit adb53f4)Below is a summary of compliance checks for this PR:
Compliance status legend🟢 - Fully Compliant🟡 - Partial Compliant 🔴 - Not Compliant ⚪ - Requires Further Human Verification 🏷️ - Compliance label Previous compliance checksCompliance check up to commit f43b587
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PR Code Suggestions ✨Latest suggestions up to adb53f4
Previous suggestions✅ Suggestions up to commit f43b587
|
|||||||||||||||||||||
| return new(_processor.StartProcessingAsync(cancellationToken)); | ||
| } | ||
|
|
||
| async Task HandleMessage(ProcessMessageEventArgs arg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Refactor duplicated logic into a single method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a suggestion ready to commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Implementation 🛠️
Implementation: Extract the duplicated message handling logic from HandleMessage and HandleSessionMessage into a shared private method to reduce code duplication and improve maintainability.
| async Task HandleMessage(ProcessMessageEventArgs arg) { | |
| async Task HandleMessage(ProcessMessageEventArgs arg) { | |
| await ProcessMessageAsync( | |
| arg.Message, | |
| arg.CancellationToken, | |
| msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), | |
| msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), | |
| arg.FullyQualifiedNamespace, | |
| arg.EntityPath, | |
| arg.Identifier | |
| ).NoContext(); | |
| } | |
| async Task HandleSessionMessage(ProcessSessionMessageEventArgs arg) { | |
| await ProcessMessageAsync( | |
| arg.Message, | |
| arg.CancellationToken, | |
| msg => arg.CompleteMessageAsync(msg, arg.CancellationToken), | |
| msg => arg.AbandonMessageAsync(msg, null, arg.CancellationToken), | |
| arg.FullyQualifiedNamespace, | |
| arg.EntityPath, | |
| arg.Identifier | |
| ).NoContext(); | |
| } | |
| async Task ProcessMessageAsync( | |
| ServiceBusReceivedMessage msg, | |
| CancellationToken ct, | |
| Func<ServiceBusReceivedMessage, Task> completeMessage, | |
| Func<ServiceBusReceivedMessage, Task> abandonMessage, | |
| string fullyQualifiedNamespace, | |
| string entityPath, | |
| string identifier) { | |
| if (ct.IsCancellationRequested) return; | |
| var eventType = msg.ApplicationProperties[Options.AttributeNames.MessageType].ToString() | |
| ?? throw new InvalidOperationException("Event type is missing in message properties"); | |
| var contentType = msg.ContentType; | |
| // Should this be a stream name? or topic or something | |
| var streamName = msg.ApplicationProperties[Options.AttributeNames.StreamName].ToString() | |
| ?? throw new InvalidOperationException("Stream name is missing in message properties"); | |
| Logger.Current = Log; | |
| var evt = DeserializeData(contentType, eventType, msg.Body, streamName); | |
| var applicationProperties = msg.ApplicationProperties.Concat(MessageProperties(msg)); | |
| var ctx = new MessageConsumeContext( | |
| msg.MessageId, | |
| eventType, | |
| contentType, | |
| streamName, | |
| 0, | |
| 0, | |
| 0, | |
| Sequence++, | |
| msg.EnqueuedTime.UtcDateTime, | |
| evt, | |
| AsMeta(applicationProperties), | |
| SubscriptionId, | |
| ct | |
| ); | |
| try { | |
| await Handler(ctx).NoContext(); | |
| await completeMessage(msg).NoContext(); | |
| } catch (Exception ex) { | |
| // Abandoning the message will make it available for reprocessing, or dead letter it? | |
| await abandonMessage(msg).NoContext(); | |
| await _defaultErrorHandler(new(ex, ServiceBusErrorSource.Abandon, fullyQualifiedNamespace, entityPath, identifier, ct)).NoContext(); | |
| Log.ErrorLog?.Log(ex, "Error processing message: {MessageId}", msg.MessageId); | |
| } | |
| } |
📄 References
- No matching references available
See review comment here
Test Results 51 files + 34 51 suites +34 32m 1s ⏱️ + 19m 59s For more details on these failures, see this check. Results for commit adb53f4. ± Comparison against base commit 20e513c. This pull request removes 5 and adds 15 tests. Note that renamed tests count towards both. |
|
Replaced by #474 |
User description
Added the ability to specify a SessionId when publishing events for parallel processing with guaranteed order within the SessionId on the consumer side
Auto-created Ticket
#471
PR Type
Enhancement
Description
Add SessionId and ReplyToSessionId support for message producers
Implement ServiceBusSessionProcessor for ordered message consumption
Enable session-based message processing with guaranteed ordering
Support both standard and session-based subscription modes
Diagram Walkthrough
File Walkthrough
ServiceBusMessageBuilder.cs
Add SessionId and ReplyToSessionId to messagessrc/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusMessageBuilder.cs
ReplyToSessionIdproperty assignment to ServiceBusMessageSessionIdassignment with null-check to avoidoverriding PartitionKey
ServiceBusProduceOptions.cs
Add session properties to produce optionssrc/Azure/src/Eventuous.Azure.ServiceBus/Producers/ServiceBusProduceOptions.cs
SessionIdproperty for specifying session identifier on messagesReplyToSessionIdproperty for request-reply session patternsServiceBusSubscription.cs
Implement session processor support in subscriptionssrc/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs
_sessionProcessorfield to support session-based messageprocessing
SessionProcessorOptionsis configuredHandleSessionMessagemethod to process session-basedmessages with identical logic to standard handler
handlers
ServiceBusSubscriptionOptions.cs
Add session processor configuration optionssrc/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscriptionOptions.cs
SessionProcessorOptionsproperty to enable session-basedprocessing
MakeSessionProcessormethod toIQueueOrTopicinterfaceMakeSessionProcessorin Queue, Topic, andTopicAndSubscription classes
when specified