Skip to content

Commit 332280e

Browse files
authored
Alexandr/rabbitmq support (#14)
* Add integration tests and Service Bus emulator setup Introduces a new IntegrationTests project with a basic smoke test for DotQueue, adds configuration for Azure Service Bus emulator and Azure SQL Edge via docker-compose, and updates the solution file to include the new test project. This enables local integration testing against a Service Bus emulator. * Add RabbitMQ support and refactor Azure components Introduces RabbitMQ queue listener and service registration extensions, along with integration tests for RabbitMQ. Refactors Azure Service Bus components into a dedicated 'Azure' namespace and folder. Updates project dependencies and docker-compose to include RabbitMQ for local testing. * Update RabbitMQ credentials and header handling Changed RabbitMQ default user and password from 'guest' to 'admin' in docker-compose and integration tests for improved security and consistency. Refactored header extraction in RabbitMqQueueListener to inline logic and improved type handling. Adjusted queue readiness check in integration test to ensure consumer is attached before publishing. * Refactor AzureTest and remove redundant Test.cs Cleaned up AzureTest.cs formatting and logic for improved readability and reliability. Deleted Test.cs as it duplicated the smoke test functionality now present in AzureTest.cs. * Update RabbitMqServiceCollectionExtensions.cs * Update DotQueue.csproj
1 parent 1fd9fab commit 332280e

File tree

10 files changed

+279
-16
lines changed

10 files changed

+279
-16
lines changed

DotQueue.IntegrationTests/IntegrationTests/Test.cs renamed to DotQueue.IntegrationTests/IntegrationTests/AzureTest.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Text.Json;
4-
using System.Threading;
5-
using System.Threading.Tasks;
6-
using Azure.Messaging.ServiceBus;
1+
using Azure.Messaging.ServiceBus;
72
using DotQueue;
3+
using DotQueue.Azure;
84
using Microsoft.Extensions.DependencyInjection;
95
using Microsoft.Extensions.Hosting;
106
using Microsoft.Extensions.Logging;
11-
using Xunit;
7+
using System.Text.Json;
128

9+
namespace IntegrationTests;
1310
public class DotQueue_Smoke
1411
{
1512
private const string Conn =
1613
"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
1714

1815
[Fact(Timeout = 60000)]
19-
public async Task Message_is_received_and_completed()
16+
public async Task Message_is_received()
2017
{
21-
const string queueName = "demo-messages";
18+
const string queueName = "demo-messages";
2219
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
2320

2421
var gotIt = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -81,7 +78,7 @@ public async Task HandleAsync(SimpleMsg message, IReadOnlyDictionary<string, str
8178
{
8279
_log.LogInformation("Got: {Text}", message.Text);
8380
await complete();
84-
_tcs.TrySetResult(message.Text);
81+
_tcs.TrySetResult(message.Text);
8582
}
8683
}
8784
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text.Json;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using DotQueue;
7+
using DotQueue.Rabbit;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Hosting;
10+
using Microsoft.Extensions.Logging;
11+
using RabbitMQ.Client;
12+
using Xunit;
13+
14+
namespace IntegrationTests;
15+
16+
public class DotQueue_Rabbit_Smoke
17+
{
18+
private const string Amqp =
19+
"amqp://admin:admin@localhost:5673/";
20+
21+
[Fact(Timeout = 60000)]
22+
public async Task Message_is_received()
23+
{
24+
const string queueName = "demo-messages";
25+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
26+
27+
var gotIt = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
28+
29+
using var host = Host.CreateDefaultBuilder()
30+
.ConfigureServices(s =>
31+
{
32+
s.AddLogging(b => b.AddSimpleConsole(o =>
33+
{
34+
o.TimestampFormat = "HH:mm:ss ";
35+
o.SingleLine = true;
36+
}));
37+
38+
s.AddSingleton(gotIt);
39+
40+
s.AddRabbitMQQueue<SimpleMsg, SimpleHandler>(
41+
amqpConnectionString: Amqp,
42+
queueName: queueName,
43+
configure: o =>
44+
{
45+
o.MaxConcurrentCalls = 1;
46+
o.PrefetchCount = 0;
47+
o.MaxRetryAttempts = 0;
48+
});
49+
})
50+
.Build();
51+
52+
await host.StartAsync(cts.Token);
53+
54+
var factory = new ConnectionFactory { Uri = new Uri(Amqp) };
55+
await using var conn = await factory.CreateConnectionAsync(cts.Token);
56+
await using var ch = await conn.CreateChannelAsync(cancellationToken: cts.Token);
57+
58+
for (var i = 0; i < 100; i++)
59+
{
60+
await using var probe = await conn.CreateChannelAsync(cancellationToken: cts.Token);
61+
try
62+
{
63+
var q = await probe.QueueDeclarePassiveAsync(queueName, cts.Token);
64+
if (q.ConsumerCount > 0) break;
65+
}
66+
catch { }
67+
await Task.Delay(100, cts.Token);
68+
}
69+
70+
var payload = new SimpleMsg("hello");
71+
var body = JsonSerializer.SerializeToUtf8Bytes(payload);
72+
await ch.BasicPublishAsync("", queueName, false,
73+
new BasicProperties { ContentType = "application/json" }, body, cts.Token);
74+
75+
var receivedText = await gotIt.Task.WaitAsync(cts.Token);
76+
Assert.Equal("hello", receivedText);
77+
78+
await host.StopAsync(TimeSpan.FromSeconds(5));
79+
}
80+
81+
private sealed record SimpleMsg(string Text);
82+
83+
private sealed class SimpleHandler : IQueueHandler<SimpleMsg>
84+
{
85+
private readonly TaskCompletionSource<string> _tcs;
86+
private readonly ILogger<SimpleHandler> _log;
87+
88+
public SimpleHandler(TaskCompletionSource<string> tcs, ILogger<SimpleHandler> log)
89+
{
90+
_tcs = tcs;
91+
_log = log;
92+
}
93+
94+
public async Task HandleAsync(SimpleMsg message, IReadOnlyDictionary<string, string>? _, Func<Task> complete, CancellationToken ct)
95+
{
96+
_log.LogInformation("Got: {Text}", message.Text);
97+
await complete();
98+
_tcs.TrySetResult(message.Text);
99+
}
100+
}
101+
}

DotQueue.Tests/ServiceCollectionExtensionsTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Azure.Messaging.ServiceBus;
2+
using DotQueue.Azure;
23
using FluentAssertions;
34
using Microsoft.Extensions.DependencyInjection;
45
using Xunit;

DotQueue/AzureServiceBusQueueListener.cs renamed to DotQueue/Azure/AzureServiceBusQueueListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using Microsoft.Extensions.Logging;
33
using System.Text.Json;
44

5-
namespace DotQueue;
5+
namespace DotQueue.Azure;
66

77
public class AzureServiceBusQueueListener<T> : IQueueListener<T>, IAsyncDisposable
88
{

DotQueue/AzureServiceBusSessionQueueListener.cs renamed to DotQueue/Azure/AzureServiceBusSessionQueueListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using Microsoft.Extensions.Logging;
33
using System.Text.Json;
44

5-
namespace DotQueue;
5+
namespace DotQueue.Azure;
66

77
public class AzureServiceBusSessionQueueListener<T> : IQueueListener<T>, IAsyncDisposable
88
{

DotQueue/ServiceCollectionExtensions.cs renamed to DotQueue/Azure/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using Microsoft.Extensions.DependencyInjection;
33
using Microsoft.Extensions.Logging;
44

5-
namespace DotQueue;
5+
namespace DotQueue.Azure;
66

77
public static class ServiceCollectionExtensions
88
{

DotQueue/DotQueue.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Nullable>enable</Nullable>
77
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
88
<PackageId>DotQueue</PackageId>
9-
<Version>1.0.6</Version>
9+
<Version>1.1.0</Version>
1010
<Authors>Alexander Kulyabin</Authors>
1111
<Company>Zionet</Company>
1212
<Description>Generic queue listener</Description>
@@ -20,6 +20,7 @@
2020
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.8" />
2121
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.8" />
2222
<PackageReference Include="Polly" Version="8.6.2" />
23+
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
2324
</ItemGroup>
2425

2526
</Project>
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using DotQueue;
2+
using Microsoft.Extensions.Logging;
3+
using RabbitMQ.Client;
4+
using RabbitMQ.Client.Events;
5+
using System.Text;
6+
using System.Text.Json;
7+
8+
public sealed class RabbitMqQueueListener<T> : IQueueListener<T>, IAsyncDisposable
9+
{
10+
private readonly IConnectionFactory _factory;
11+
private IConnection? _conn;
12+
private IChannel? _ch;
13+
private readonly string _queue;
14+
private readonly QueueSettings _settings;
15+
private readonly IRetryPolicyProvider _retry;
16+
private readonly ILogger<RabbitMqQueueListener<T>> _log;
17+
private readonly JsonSerializerOptions _json = new() { PropertyNameCaseInsensitive = true };
18+
19+
public RabbitMqQueueListener(
20+
IConnectionFactory factory,
21+
string queueName,
22+
QueueSettings settings,
23+
IRetryPolicyProvider retryPolicyProvider,
24+
ILogger<RabbitMqQueueListener<T>> log)
25+
{
26+
_factory = factory;
27+
_queue = queueName;
28+
_settings = settings;
29+
_retry = retryPolicyProvider;
30+
_log = log;
31+
}
32+
33+
public async Task StartAsync(
34+
Func<T, IReadOnlyDictionary<string, string>?, Func<Task>, CancellationToken, Task> handler,
35+
CancellationToken ct)
36+
{
37+
_conn = await _factory.CreateConnectionAsync(ct).ConfigureAwait(false);
38+
_ch = await _conn.CreateChannelAsync(cancellationToken: ct).ConfigureAwait(false);
39+
40+
if (_settings.PrefetchCount > 0)
41+
await _ch.BasicQosAsync(0, (ushort)_settings.PrefetchCount, false, ct).ConfigureAwait(false);
42+
43+
var consumer = new AsyncEventingBasicConsumer(_ch);
44+
var gate = new SemaphoreSlim(Math.Max(1, _settings.MaxConcurrentCalls));
45+
46+
consumer.ReceivedAsync += async (_, ea) =>
47+
{
48+
await gate.WaitAsync(ct).ConfigureAwait(false);
49+
try
50+
{
51+
var bodyBytes = ea.Body.ToArray();
52+
53+
var msg = JsonSerializer.Deserialize<T>(bodyBytes, _json)
54+
?? throw new NonRetryableException("Deserialize failed");
55+
56+
var meta = ea.BasicProperties?.Headers?
57+
.ToDictionary(
58+
kvp => kvp.Key,
59+
kvp => kvp.Value switch
60+
{
61+
null => string.Empty,
62+
byte[] b => Encoding.UTF8.GetString(b),
63+
ReadOnlyMemory<byte> rom => Encoding.UTF8.GetString(rom.Span),
64+
_ => kvp.Value.ToString() ?? string.Empty
65+
});
66+
67+
var policy = _retry.Create(_settings, _log);
68+
await policy.ExecuteAsync(async () =>
69+
{
70+
await handler(msg, meta, () => Task.CompletedTask, ct).ConfigureAwait(false);
71+
await _ch!.BasicAckAsync(ea.DeliveryTag, multiple: false, ct).ConfigureAwait(false);
72+
}).ConfigureAwait(false);
73+
}
74+
catch (RetryableException ex)
75+
{
76+
_log.LogWarning(ex, "Retryable, requeue");
77+
await _ch!.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: true, ct).ConfigureAwait(false);
78+
}
79+
catch (Exception ex)
80+
{
81+
_log.LogError(ex, "Dead-letter");
82+
await _ch!.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false, ct).ConfigureAwait(false);
83+
}
84+
finally
85+
{
86+
gate.Release();
87+
}
88+
};
89+
90+
await _ch.BasicConsumeAsync(
91+
queue: _queue,
92+
autoAck: false,
93+
consumer: consumer,
94+
cancellationToken: ct
95+
).ConfigureAwait(false);
96+
}
97+
public async ValueTask DisposeAsync()
98+
{
99+
try { if (_ch is not null) await _ch.DisposeAsync().ConfigureAwait(false); } catch { }
100+
try { if (_conn is not null) await _conn.DisposeAsync().ConfigureAwait(false); } catch { }
101+
}
102+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using Microsoft.Extensions.Logging;
3+
using RabbitMQ.Client;
4+
5+
namespace DotQueue.Rabbit;
6+
7+
public static class RabbitMqServiceCollectionExtensions
8+
{
9+
public static IServiceCollection AddRabbitMQQueue<T, THandler>(
10+
this IServiceCollection services,
11+
string amqpConnectionString,
12+
string queueName,
13+
Action<QueueSettings>? configure = null)
14+
where THandler : class, IQueueHandler<T>
15+
{
16+
services.AddScoped<IQueueHandler<T>, THandler>();
17+
18+
var settings = new QueueSettings();
19+
configure?.Invoke(settings);
20+
services.AddSingleton(settings);
21+
services.AddSingleton<IRetryPolicyProvider, RetryPolicyProvider>();
22+
23+
services.AddSingleton<IConnectionFactory>(_ => new ConnectionFactory
24+
{
25+
Uri = new Uri(amqpConnectionString),
26+
});
27+
services.AddSingleton<IQueueListener<T>>(sp =>
28+
new RabbitMqQueueListener<T>(
29+
sp.GetRequiredService<IConnectionFactory>(),
30+
queueName,
31+
sp.GetRequiredService<QueueSettings>(),
32+
sp.GetRequiredService<IRetryPolicyProvider>(),
33+
sp.GetRequiredService<ILogger<RabbitMqQueueListener<T>>>()
34+
));
35+
36+
services.AddHostedService<QueueProcessor<T>>();
37+
return services;
38+
}
39+
}

docker-compose.yml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,33 @@ services:
4747
interval: 5s
4848
timeout: 5s
4949
retries: 15
50+
# ------------------------------ RabbitMQ ------------------------------
51+
rabbitmq:
52+
image: rabbitmq:3.13-management
53+
container_name: rabbitmq-test
54+
ports:
55+
- "5673:5672"
56+
- "15672:15672"
57+
environment:
58+
RABBITMQ_DEFAULT_USER: admin
59+
RABBITMQ_DEFAULT_PASS: admin
60+
healthcheck:
61+
test: ["CMD", "rabbitmq-diagnostics", "check_running"]
62+
interval: 5s
63+
timeout: 5s
64+
retries: 20
5065

66+
rabbit-init:
67+
image: curlimages/curl:8.9.1
68+
depends_on:
69+
rabbitmq:
70+
condition: service_healthy
71+
command: >
72+
sh -c 'until curl -su admin:admin http://rabbitmq:15672/api/overview >/dev/null 2>&1; do sleep 1; done;
73+
curl -su admin:admin -H "content-type: application/json" -X PUT "http://rabbitmq:15672/api/queues/%2f/demo-messages"
74+
-d "{\"durable\":false,\"auto_delete\":false,\"arguments\":{}}";'
5175
5276
# ------------------------ Shared Networks ---------------------------
5377
networks:
54-
microservice-net:
55-
driver: bridge
5678
sb-emulator:
5779
driver: bridge

0 commit comments

Comments
 (0)