Skip to content

Commit f061c70

Browse files
authored
Adding metadata to IQueueHandler (#8)
* Adding metadata to IQueueHandler * update version ---------
1 parent 5272baa commit f061c70

6 files changed

Lines changed: 16 additions & 12 deletions

File tree

.DS_Store

6 KB
Binary file not shown.

AzureServiceBusQueueListener.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public AzureServiceBusQueueListener(
3030
});
3131
}
3232

33-
public async Task StartAsync(Func<T, Func<Task>, CancellationToken, Task> handler, CancellationToken cancellationToken)
33+
public async Task StartAsync(Func<T, IReadOnlyDictionary<string, string>?, Func<Task>, CancellationToken, Task> handler, CancellationToken cancellationToken)
3434
{
3535
var retryPolicy = _retryPolicyProvider.Create(_settings, _logger);
3636

@@ -80,9 +80,13 @@ void ArmCancelAfterFromLockedUntil()
8080
return;
8181
}
8282

83+
// Extract application properties as metadata
84+
var metadata = args.Message.ApplicationProperties
85+
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToString() ?? string.Empty);
86+
8387
await retryPolicy.ExecuteAsync(async () =>
8488
{
85-
await handler(msg, renewLock, linkedCts.Token);
89+
await handler(msg, metadata, renewLock, linkedCts.Token);
8690

8791
if (_settings.ProcessingDelayMs > 0)
8892
{

DotQueue.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Nullable>enable</Nullable>
77
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
88
<PackageId>DotQueue</PackageId>
9-
<Version>1.0.1</Version>
9+
<Version>1.0.2</Version>
1010
<Authors>Alexander Kulyabin</Authors>
1111
<Company>Zionet</Company>
1212
<Description>Generic queue listener</Description>
@@ -18,7 +18,7 @@
1818
<ItemGroup>
1919
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
2020
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.8" />
21-
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.0-preview.6.25358.103" />
21+
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.8" />
2222
<PackageReference Include="Polly" Version="8.6.2" />
2323
</ItemGroup>
2424

IQueueHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22

33
public interface IQueueHandler<T>
44
{
5-
Task HandleAsync(T message, Func<Task> renewLock, CancellationToken cancellationToken);
5+
Task HandleAsync(T message, IReadOnlyDictionary<string, string>? metadataCallback, Func<Task> renewLock, CancellationToken cancellationToken);
66
}

IQueueListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22

33
public interface IQueueListener<T>
44
{
5-
Task StartAsync(Func<T, Func<Task>, CancellationToken, Task> handler, CancellationToken cancellationToken);
5+
Task StartAsync(Func<T, IReadOnlyDictionary<string, string>?, Func<Task>, CancellationToken, Task> handler, CancellationToken cancellationToken);
66
}

QueueProcessor.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ public QueueProcessor(IQueueListener<T> listener, IServiceScopeFactory scopeFact
1515
}
1616

1717
protected override Task ExecuteAsync(CancellationToken stoppingToken) =>
18-
_listener.StartAsync(async (msg, renewLock, token) =>
19-
{
20-
using var scope = _scopeFactory.CreateScope();
21-
var handler = scope.ServiceProvider.GetRequiredService<IQueueHandler<T>>();
22-
await handler.HandleAsync(msg, renewLock, token);
23-
}, stoppingToken);
18+
_listener.StartAsync(async (msg, metadata, renewLock, token) =>
19+
{
20+
using var scope = _scopeFactory.CreateScope();
21+
var handler = scope.ServiceProvider.GetRequiredService<IQueueHandler<T>>();
22+
await handler.HandleAsync(msg, metadata, renewLock, token);
23+
}, stoppingToken);
2424
}

0 commit comments

Comments
 (0)