Skip to content

Commit ac4894b

Browse files
committed
fix: bug on cleaner
1 parent a7c0cc3 commit ac4894b

6 files changed

Lines changed: 76 additions & 24 deletions

File tree

CacheManagerClear.Rabbit/CachePublisher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,14 @@ public CachePublisher(IConnection connection, string exchange)
3535
/// </summary>
3636
public async Task StopAsync()
3737
{
38+
if (_disposed)
39+
{
40+
return;
41+
}
42+
3843
await _connection.DisposeAsync().ConfigureAwait(false);
3944
await _connection.CloseAsync().ConfigureAwait(false);
45+
_disposed = true;
4046
}
4147

4248
/// <summary>

CacheManagerClear.Rabbit/CacheSubscriber.cs

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class CacheSubscriber : ICacheSubscriber
2323
/// <param name="cacheManager">Cache manager instance</param>
2424
/// <param name="exchange">RabbitMQ exchange name</param>
2525
/// <param name="queue">RabbitMQ queue name</param>
26-
public CacheSubscriber(IConnection connection, IEasyCacheManager cacheManager, string exchange, string queue)
26+
public CacheSubscriber(IConnection connection, string exchange, string queue, IEasyCacheManager cacheManager)
2727
{
2828
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
2929
_cacheManager = cacheManager ?? throw new ArgumentNullException(nameof(cacheManager));
@@ -36,41 +36,45 @@ public CacheSubscriber(IConnection connection, IEasyCacheManager cacheManager, s
3636
/// </summary>
3737
public async Task SubscribeAsync(CancellationToken cancellationToken)
3838
{
39-
using var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
40-
41-
await channel.ExchangeDeclareAsync(exchange: _exchange, type: ExchangeType.Fanout, durable: true, cancellationToken: cancellationToken).ConfigureAwait(false);
42-
_ = await channel.QueueDeclareAsync(queue: _queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken).ConfigureAwait(false);
43-
await channel.QueueBindAsync(queue: _queue, exchange: _exchange, routingKey: string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
39+
try
40+
{
41+
using var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
4442

45-
var consumer = new AsyncEventingBasicConsumer(channel);
43+
await channel.ExchangeDeclareAsync(exchange: _exchange, type: ExchangeType.Fanout, durable: true, cancellationToken: cancellationToken).ConfigureAwait(false);
44+
_ = await channel.QueueDeclareAsync(queue: _queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken).ConfigureAwait(false);
45+
await channel.QueueBindAsync(queue: _queue, exchange: _exchange, routingKey: string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
4646

47-
consumer.ReceivedAsync += async (_, ea) =>
48-
{
49-
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
47+
var consumer = new AsyncEventingBasicConsumer(channel);
5048

51-
if (message == "*")
52-
{
53-
await _cacheManager.ClearAllCacheAsync().ConfigureAwait(false);
54-
}
55-
else
56-
{
57-
await _cacheManager.ClearCacheAsync(message).ConfigureAwait(false);
58-
}
49+
consumer.ReceivedAsync += async (_, ea) => await ProcessAsync(ea, channel, cancellationToken).ConfigureAwait(false);
5950

60-
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken).ConfigureAwait(false);
61-
};
51+
// Start consuming messages
52+
_ = await channel.BasicConsumeAsync(queue: _queue, autoAck: false, consumer: consumer, cancellationToken: cancellationToken).ConfigureAwait(false);
53+
}
54+
catch (TaskCanceledException)
55+
{
56+
}
57+
catch (Exception e)
58+
{
59+
Console.WriteLine(e);
6260

63-
// Start consuming messages
64-
_ = await channel.BasicConsumeAsync(queue: _queue, autoAck: false, consumer: consumer, cancellationToken: cancellationToken).ConfigureAwait(false);
61+
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
62+
}
6563
}
6664

6765
/// <summary>
6866
/// Stops the Kafka subscription process.
6967
/// </summary>
7068
public async Task StopAsync()
7169
{
70+
if (_disposed)
71+
{
72+
return;
73+
}
74+
7275
await _connection.DisposeAsync().ConfigureAwait(false);
7376
await _connection.CloseAsync().ConfigureAwait(false);
77+
_disposed = true;
7478
}
7579

7680
/// <summary>
@@ -88,4 +92,32 @@ public async ValueTask DisposeAsync()
8892

8993
_disposed = true;
9094
}
95+
96+
private async Task ProcessAsync(BasicDeliverEventArgs ea, IChannel channel, CancellationToken cancellationToken)
97+
{
98+
try
99+
{
100+
var key = Encoding.UTF8.GetString(ea.Body.ToArray());
101+
102+
if (key.Equals(StaticData.ClearAllKey, StringComparison.Ordinal))
103+
{
104+
await _cacheManager.ClearAllCacheAsync().ConfigureAwait(false);
105+
}
106+
else
107+
{
108+
await _cacheManager.ClearCacheAsync(key).ConfigureAwait(false);
109+
}
110+
111+
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken).ConfigureAwait(false);
112+
}
113+
catch (TaskCanceledException)
114+
{
115+
}
116+
catch (Exception e)
117+
{
118+
Console.WriteLine(e);
119+
120+
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
121+
}
122+
}
91123
}

CacheManagerClear.Rabbit/RabbitCacheClearBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public static CacheManagerClearBuilder UseRabbitSubscriber(this CacheManagerClea
8989
}
9090
#endif
9191

92-
var subscriber = new CacheSubscriber(connection, cacheManager, exchange, queue);
92+
var subscriber = new CacheSubscriber(connection, exchange, queue, cacheManager);
9393

9494
_ = builder.BuildSubscriber(subscriber);
9595

@@ -141,7 +141,7 @@ public static CacheManagerClearBuilder UseRabbitPublisherAndSubscriber(this Cach
141141
#endif
142142

143143
var publisher = new CachePublisher(connection, exchange);
144-
var subscriber = new CacheSubscriber(connection, cacheManager, exchange, queue);
144+
var subscriber = new CacheSubscriber(connection, exchange, queue, cacheManager);
145145

146146
_ = builder.BuildPublisherAndSubscriber(publisher, subscriber);
147147

CacheManagerClear.Redis/CachePublisher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,14 @@ public async Task PublishClearAllCacheAsync(CancellationToken? cancellationToken
5555
/// </summary>
5656
public async Task StopAsync()
5757
{
58+
if (_disposed)
59+
{
60+
return;
61+
}
62+
5863
await _redis.DisposeAsync().ConfigureAwait(false);
5964
await _redis.CloseAsync().ConfigureAwait(false);
65+
_disposed = true;
6066
}
6167

6268
/// <summary>

CacheManagerClear.Redis/CacheSubscriber.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,14 @@ await subscriber.SubscribeAsync(channel, async (_, message) =>
6060
/// </summary>
6161
public async Task StopAsync()
6262
{
63+
if (_disposed)
64+
{
65+
return;
66+
}
67+
6368
await _redis.DisposeAsync().ConfigureAwait(false);
6469
await _redis.CloseAsync().ConfigureAwait(false);
70+
_disposed = true;
6571
}
6672

6773
/// <summary>

CacheManagerUnitTest/StaticData.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ internal static class StaticData
66
internal const string Value = "test_value";
77
public const string ClearAllKey = "*";
88
public const string Topic = "cache_clear_topic";
9+
public const string Exchange = "cache_clear_exchange";
10+
public const string Queue = "cache_clear_queue";
911
}

0 commit comments

Comments
 (0)