Skip to content

Commit 37b54ac

Browse files
otapieroCopilot
andauthored
Add session-enabled Azure Service Bus listener and DI registration (#10)
* Add Azure Service Bus session queue listener Introduce `AzureServiceBusSessionQueueListener<T>` class for handling messages from Azure Service Bus session queues. This class implements `IQueueListener<T>` and `IAsyncDisposable`, providing message processing, session lock renewal, and error handling. Also, add `AddSessionQueue<T, THandler>` extension method for easy registration in the service collection. * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Bump package version to 1.0.3 Updated the version number in `DotQueue.csproj` from 1.0.2 to 1.0.3. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent f061c70 commit 37b54ac

3 files changed

Lines changed: 183 additions & 1 deletion

File tree

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
using Azure.Messaging.ServiceBus;
2+
using Microsoft.Extensions.Logging;
3+
using System.Text.Json;
4+
5+
namespace DotQueue;
6+
7+
public class AzureServiceBusSessionQueueListener<T> : IQueueListener<T>, IAsyncDisposable
8+
{
9+
private readonly ServiceBusSessionProcessor _processor;
10+
private readonly IRetryPolicyProvider _retryPolicyProvider;
11+
private readonly QueueSettings _settings;
12+
private readonly ILogger<AzureServiceBusSessionQueueListener<T>> _logger;
13+
14+
public AzureServiceBusSessionQueueListener(
15+
ServiceBusClient client,
16+
string queueName,
17+
QueueSettings settings,
18+
IRetryPolicyProvider retryPolicyProvider,
19+
ILogger<AzureServiceBusSessionQueueListener<T>> logger)
20+
{
21+
_settings = settings;
22+
_retryPolicyProvider = retryPolicyProvider;
23+
_logger = logger;
24+
25+
_processor = client.CreateSessionProcessor(queueName, new ServiceBusSessionProcessorOptions
26+
{
27+
MaxConcurrentSessions = settings.MaxConcurrentCalls,
28+
PrefetchCount = settings.PrefetchCount,
29+
AutoCompleteMessages = false
30+
});
31+
}
32+
33+
public async Task StartAsync(Func<T, IReadOnlyDictionary<string, string>?, Func<Task>, CancellationToken, Task> handler, CancellationToken cancellationToken)
34+
{
35+
var retryPolicy = _retryPolicyProvider.Create(_settings, _logger);
36+
37+
_processor.ProcessMessageAsync += async args =>
38+
{
39+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
40+
41+
void ArmCancelAfterFromLockedUntil()
42+
{
43+
var now = DateTimeOffset.UtcNow;
44+
var timeout = args.SessionLockedUntil - now - TimeSpan.FromSeconds(2);
45+
if (timeout <= TimeSpan.Zero)
46+
{
47+
linkedCts.Cancel();
48+
}
49+
else
50+
{
51+
linkedCts.CancelAfter(timeout);
52+
}
53+
}
54+
55+
ArmCancelAfterFromLockedUntil();
56+
57+
var renewLock = async () =>
58+
{
59+
await args.RenewSessionLockAsync(linkedCts.Token);
60+
_logger.LogDebug("Lock renewed for session {SessionId}", args.SessionId);
61+
_logger.LogDebug("Session lock till: {Time}", args.SessionLockedUntil);
62+
ArmCancelAfterFromLockedUntil();
63+
};
64+
65+
try
66+
{
67+
var json = args.Message.Body.ToString();
68+
_logger.LogDebug("Raw message body: {Json}", json);
69+
var jsonOptions = new JsonSerializerOptions
70+
{
71+
PropertyNameCaseInsensitive = true,
72+
};
73+
var msg = JsonSerializer.Deserialize<T>(json, jsonOptions);
74+
if (msg == null)
75+
{
76+
_logger.LogWarning("Failed to deserialize message.");
77+
await args.DeadLetterMessageAsync(args.Message, cancellationToken: cancellationToken);
78+
return;
79+
}
80+
81+
var metadata = args.Message.ApplicationProperties
82+
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToString() ?? string.Empty);
83+
84+
await retryPolicy.ExecuteAsync(async () =>
85+
{
86+
await handler(msg, metadata, renewLock, linkedCts.Token);
87+
88+
if (_settings.ProcessingDelayMs > 0)
89+
{
90+
await Task.Delay(_settings.ProcessingDelayMs, linkedCts.Token);
91+
}
92+
});
93+
94+
await args.CompleteMessageAsync(args.Message, cancellationToken);
95+
}
96+
catch (RetryableException rex)
97+
{
98+
_logger.LogWarning(rex, "Retryable error. Abandoning message.");
99+
await args.AbandonMessageAsync(args.Message, cancellationToken: cancellationToken);
100+
}
101+
catch (NonRetryableException nex)
102+
{
103+
_logger.LogWarning(nex, "Non-retryable error. Dead-lettering message.");
104+
await args.DeadLetterMessageAsync(args.Message, cancellationToken: cancellationToken);
105+
}
106+
catch (OperationCanceledException) when (linkedCts.IsCancellationRequested)
107+
{
108+
_logger.LogWarning("Handler exceeded lock duration. Abandoning message.");
109+
await args.AbandonMessageAsync(args.Message, cancellationToken: cancellationToken);
110+
}
111+
catch (JsonException jex)
112+
{
113+
_logger.LogWarning(jex, "Error while deserializing message.");
114+
await args.DeadLetterMessageAsync(args.Message, cancellationToken: cancellationToken);
115+
}
116+
catch (Exception ex)
117+
{
118+
_logger.LogError(ex, "Unhandled error. Treating as retryable.");
119+
await args.AbandonMessageAsync(args.Message, cancellationToken: cancellationToken);
120+
}
121+
};
122+
123+
_processor.ProcessErrorAsync += args =>
124+
{
125+
if (args.Exception is ServiceBusException sbex)
126+
{
127+
if (sbex.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
128+
{
129+
_logger.LogWarning("Entity not found while processing.");
130+
}
131+
else if (sbex.Reason == ServiceBusFailureReason.ServiceCommunicationProblem)
132+
{
133+
_logger.LogWarning("Service communication problem - emulator not ready.");
134+
}
135+
else
136+
{
137+
_logger.LogWarning("Service Bus error: {Reason}", sbex.Reason);
138+
}
139+
}
140+
else
141+
{
142+
_logger.LogError(args.Exception, "Message handler error");
143+
}
144+
145+
return Task.CompletedTask;
146+
};
147+
148+
await _processor.StartProcessingAsync(cancellationToken);
149+
}
150+
151+
public async ValueTask DisposeAsync()
152+
{
153+
await _processor.DisposeAsync();
154+
GC.SuppressFinalize(this);
155+
}
156+
}

DotQueue.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Nullable>enable</Nullable>
77
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
88
<PackageId>DotQueue</PackageId>
9-
<Version>1.0.2</Version>
9+
<Version>1.0.3</Version>
1010
<Authors>Alexander Kulyabin</Authors>
1111
<Company>Zionet</Company>
1212
<Description>Generic queue listener</Description>

ServiceCollectionExtensions.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,30 @@ public static IServiceCollection AddQueue<T, THandler>(
3131

3232
return services;
3333
}
34+
35+
public static IServiceCollection AddSessionQueue<T, THandler>(
36+
this IServiceCollection services,
37+
string queueName,
38+
Action<QueueSettings>? configure = null)
39+
where THandler : class, IQueueHandler<T>
40+
{
41+
services.AddScoped<IQueueHandler<T>, THandler>();
42+
43+
var settings = new QueueSettings();
44+
configure?.Invoke(settings);
45+
services.AddSingleton(settings);
46+
services.AddSingleton<IRetryPolicyProvider, RetryPolicyProvider>();
47+
48+
services.AddSingleton<IQueueListener<T>>(sp =>
49+
new AzureServiceBusSessionQueueListener<T>(
50+
sp.GetRequiredService<ServiceBusClient>(),
51+
queueName,
52+
settings,
53+
sp.GetRequiredService<IRetryPolicyProvider>(),
54+
sp.GetRequiredService<ILogger<AzureServiceBusSessionQueueListener<T>>>()));
55+
56+
services.AddHostedService<QueueProcessor<T>>();
57+
58+
return services;
59+
}
3460
}

0 commit comments

Comments
 (0)