Skip to content

Commit 7192bec

Browse files
authored
Daniel Strauss/550/dotQueue repeated logic (#12)
* fix and add test * Bump version to 1.0.5 in DotQueue.csproj Updated the package version from 1.0.4 to 1.0.5.
1 parent c009050 commit 7192bec

File tree

3 files changed

+154
-1
lines changed

3 files changed

+154
-1
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using DotQueue;
5+
using FluentAssertions;
6+
using Microsoft.Extensions.Logging;
7+
using Moq;
8+
using Xunit;
9+
10+
namespace DotQueue.Tests;
11+
12+
public class RoutedQueueHandlerTests
13+
{
14+
private enum Act { A, B }
15+
16+
private sealed class TestMessage
17+
{
18+
public Act Action { get; init; }
19+
}
20+
21+
private sealed class TestHandler : RoutedQueueHandler<TestMessage, Act>
22+
{
23+
public int ACalls { get; private set; }
24+
public int BCalls { get; private set; }
25+
public int RenewCalls { get; private set; }
26+
public IReadOnlyDictionary<string, string>? LastMetadata { get; private set; }
27+
28+
public TestHandler(ILogger logger) : base(logger) { }
29+
30+
protected override Act GetAction(TestMessage message) => message.Action;
31+
32+
protected override void Configure(RouteBuilder r) => r
33+
.On(Act.A, HandleA)
34+
.On(Act.B, HandleB);
35+
36+
private Task HandleA(
37+
TestMessage m,
38+
IReadOnlyDictionary<string, string>? meta,
39+
Func<Task> renew,
40+
CancellationToken ct)
41+
{
42+
LastMetadata = meta;
43+
ACalls++;
44+
return Task.CompletedTask;
45+
}
46+
47+
private async Task HandleB(
48+
TestMessage m,
49+
IReadOnlyDictionary<string, string>? meta,
50+
Func<Task> renew,
51+
CancellationToken ct)
52+
{
53+
LastMetadata = meta;
54+
await renew();
55+
RenewCalls++;
56+
BCalls++;
57+
}
58+
}
59+
60+
[Fact]
61+
public async Task Routes_To_Registered_Handlers_And_Passes_Metadata_And_RenewLock()
62+
{
63+
var logger = Mock.Of<ILogger>();
64+
var handler = new TestHandler(logger);
65+
66+
var meta = new Dictionary<string, string> { ["k"] = "v" };
67+
int renewCount = 0;
68+
Task Renew() { renewCount++; return Task.CompletedTask; }
69+
70+
await handler.HandleAsync(new TestMessage { Action = Act.A }, meta, Renew, CancellationToken.None);
71+
await handler.HandleAsync(new TestMessage { Action = Act.B }, meta, Renew, CancellationToken.None);
72+
73+
handler.ACalls.Should().Be(1);
74+
handler.BCalls.Should().Be(1);
75+
handler.RenewCalls.Should().Be(1);
76+
handler.LastMetadata.Should().NotBeNull();
77+
handler.LastMetadata!["k"].Should().Be("v");
78+
renewCount.Should().Be(1);
79+
}
80+
81+
[Fact]
82+
public async Task Unknown_Action_Is_NonRetryable_From_DotQueue_Base()
83+
{
84+
var logger = Mock.Of<ILogger>();
85+
var handler = new TestHandler(logger);
86+
87+
var act = () => handler.HandleAsync(
88+
new TestMessage { Action = (Act)1234 }, // not registered
89+
null,
90+
() => Task.CompletedTask,
91+
CancellationToken.None);
92+
93+
await act.Should().ThrowAsync<NonRetryableException>()
94+
.WithMessage("*No handler registered for action*");
95+
}
96+
}

DotQueue/DotQueue.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Nullable>enable</Nullable>
77
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
88
<PackageId>DotQueue</PackageId>
9-
<Version>1.0.4</Version>
9+
<Version>1.0.5</Version>
1010
<Authors>Alexander Kulyabin</Authors>
1111
<Company>Zionet</Company>
1212
<Description>Generic queue listener</Description>

DotQueue/RoutedQueueHandler.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace DotQueue;
4+
5+
public abstract class RoutedQueueHandler<TMessage, TAction> : IQueueHandler<TMessage>
6+
where TAction : notnull
7+
{
8+
private readonly ILogger _logger;
9+
private readonly Dictionary<TAction, HandlerDelegate> _routes = new();
10+
11+
protected RoutedQueueHandler(ILogger logger)
12+
{
13+
_logger = logger;
14+
Configure(new RouteBuilder(this));
15+
}
16+
17+
protected abstract TAction GetAction(TMessage message);
18+
19+
protected virtual void Configure(RouteBuilder routes) { }
20+
21+
protected delegate Task HandlerDelegate(
22+
TMessage message,
23+
IReadOnlyDictionary<string, string>? metadata,
24+
Func<Task> renewLock,
25+
CancellationToken ct);
26+
27+
protected void Register(TAction action, HandlerDelegate handler) => _routes[action] = handler;
28+
29+
protected sealed class RouteBuilder
30+
{
31+
private readonly RoutedQueueHandler<TMessage, TAction> _owner;
32+
internal RouteBuilder(RoutedQueueHandler<TMessage, TAction> owner) => _owner = owner;
33+
34+
public RouteBuilder On(TAction action, HandlerDelegate handler)
35+
{
36+
_owner.Register(action, handler);
37+
return this;
38+
}
39+
}
40+
41+
public Task HandleAsync(
42+
TMessage message,
43+
IReadOnlyDictionary<string, string>? metadata,
44+
Func<Task> renewLock,
45+
CancellationToken ct)
46+
{
47+
var action = GetAction(message);
48+
49+
if (_routes.TryGetValue(action, out var handler))
50+
{
51+
return handler(message, metadata, renewLock, ct);
52+
}
53+
54+
_logger.LogWarning("No handler registered for action {Action}", action);
55+
throw new NonRetryableException($"No handler registered for action {action}");
56+
}
57+
}

0 commit comments

Comments
 (0)