Skip to content

Commit fdec902

Browse files
authored
Remove messages history for removed hubs
* Small optimizations in Consumer service and factory * Add removing message history from file when remove EventHub configuration
1 parent 86b6a2d commit fdec902

6 files changed

Lines changed: 30 additions & 46 deletions

File tree

src/Application/Services/EventHubConsumerService.cs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ public class EventHubConsumerService : IMessageConsumerService
1313
private readonly IMessageConsumerProvider messageConsumerProvider;
1414
private readonly ITextProcessingPipeline textProcessingPipeline;
1515

16-
private readonly Channel<EventHubMessage> channel = Channel.CreateUnbounded<EventHubMessage>();
17-
private readonly Lock startLock = new();
16+
private readonly Channel<EventHubMessage> channel = Channel
17+
.CreateBounded<EventHubMessage>(new BoundedChannelOptions(100)
18+
{
19+
FullMode = BoundedChannelFullMode.Wait
20+
});
1821
private bool isProcessing;
1922

2023

@@ -47,23 +50,15 @@ public async IAsyncEnumerable<EventHubMessage> StartReceiveMessageAsync(
4750

4851
public async Task StopReceiveMessageAsync()
4952
{
50-
lock (startLock)
51-
{
52-
isProcessing = false;
53-
}
53+
Interlocked.Exchange(ref isProcessing, false);
5454
await messageConsumerProvider.StopReceiveMessageAsync();
5555
}
5656

5757

5858
private async Task ReadMessageAsync(CancellationToken cancellationToken)
5959
{
60-
lock (startLock)
61-
{
62-
if (isProcessing)
63-
return;
64-
65-
isProcessing = true;
66-
}
60+
if (Interlocked.CompareExchange(ref isProcessing, true, false))
61+
return;
6762

6863
var taskResult = messageConsumerProvider.StartReceiveMessageAsync(async message =>
6964
{

src/Application/Services/FileBasedMessageHistory.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,15 @@ public async Task RemoveMessageAsync(Guid input, string message)
7474
await messagesStorageProvider.SaveDataAsync(fullHistory);
7575
}
7676
}
77+
78+
public async Task RemoveAllAsync(Guid input)
79+
{
80+
MessagesHistory? fullHistory = await messagesStorageProvider.GetDataAsync();
81+
82+
if (fullHistory is null)
83+
return;
84+
85+
if (fullHistory.Messages.Remove(input))
86+
await messagesStorageProvider.SaveDataAsync(fullHistory);
87+
}
7788
}

src/Domain/Interfaces/Services/IMessageHistory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ public interface IMessageHistory<in TInput, TResult>
55
Task<TResult> GetHistoryAsync(TInput input);
66
Task AddMessageAsync(TInput input, string message);
77
Task RemoveMessageAsync(TInput input, string message);
8+
Task RemoveAllAsync(Guid input);
89
}

src/Infrastructure/Factories/MessageConsumerFactory.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ IServiceProvider serviceProvider
3030

3131
public IMessageConsumerService CreateConsumer(Guid configId)
3232
{
33-
logger.LogInformation("Creating producer for configId: {ConfigId}", configId);
33+
logger.LogInformation("Creating consumer for configId: {ConfigId}", configId);
3434
var eventHubConfig = config.CurrentValue.EventHubsConfigs.First(x => x.Id == configId);
3535

36-
IMessageConsumerProvider ehConsumerProvider = eventHubConfig.UseCheckpoints
36+
IMessageConsumerProvider ehConsumerProvider = eventHubConfig.UseCheckpoints
3737
? CreateConsumerWithStorage(eventHubConfig)
3838
: CreateConsumerWithoutStorage(eventHubConfig);
3939

@@ -46,18 +46,12 @@ public IMessageConsumerService CreateConsumer(Guid configId)
4646

4747
private EventHubConsumerProviderWithStorage CreateConsumerWithStorage(EventHubConfig eventHubConfig)
4848
{
49-
return ActivatorUtilities.CreateInstance<EventHubConsumerProviderWithStorage>(
50-
serviceProvider,
51-
eventHubConfig
52-
);
49+
return ActivatorUtilities.CreateInstance<EventHubConsumerProviderWithStorage>(serviceProvider, eventHubConfig);
5350
}
5451

5552
private EventHubConsumerProviderWithoutStorage CreateConsumerWithoutStorage(EventHubConfig eventHubConfig)
5653
{
57-
return ActivatorUtilities.CreateInstance<EventHubConsumerProviderWithoutStorage>(
58-
serviceProvider,
59-
eventHubConfig
60-
);
54+
return ActivatorUtilities.CreateInstance<EventHubConsumerProviderWithoutStorage>(serviceProvider, eventHubConfig);
6155
}
6256

6357

@@ -76,7 +70,7 @@ private IMessageFormatter[] GetActiveMessageFormatters(EventHubConfig eventHubCo
7670
.MessageFormatters
7771
.Where(x => x.Value)
7872
.Select(s => s.Key)
79-
.ToArray();
73+
.ToHashSet();
8074

8175
var messageFormattersList = serviceProvider
8276
.GetServices<IMessageFormatter>()

src/WebUI/Components/Pages/Configuration.razor

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
@using Domain.Interfaces.Services
55

66
@inject IFileStorageProvider<AppConfiguration> ConfigurationProvider
7+
@inject IMessageHistory<Guid, List<string>> MessagesHistoryService
78

89
@rendermode InteractiveServer
910

@@ -181,6 +182,10 @@ else
181182

182183
private void RemoveConfig(int index)
183184
{
185+
var configId = appConfiguration?.EventHubsConfigs[index].Id;
186+
if (configId is not null)
187+
MessagesHistoryService.RemoveAllAsync(configId.Value);
188+
184189
appConfiguration?.EventHubsConfigs.RemoveAt(index);
185190
}
186191

src/WebUI/Components/Pages/EventHub.razor

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
@using Microsoft.Extensions.Options
1010
@rendermode InteractiveServer
1111
@inject IJSRuntime JsRuntime
12-
@implements IDisposable
1312
@implements IAsyncDisposable
1413

1514
<PageTitle>EventHub - @title</PageTitle>
@@ -349,27 +348,6 @@
349348
_ = toast?.ShowToast($"{infoMessage}: {ex.Message}", ToastType.Error, TimeSpan.FromSeconds(10));
350349
}
351350

352-
353-
public void Dispose()
354-
{
355-
// Cancel any pending operations first
356-
receiverCts.Cancel();
357-
delayedSenderCts.Cancel();
358-
359-
// Dispose of services
360-
messageProducer?.DisposeAsync().GetAwaiter().GetResult();
361-
messageConsumer?.DisposeAsync().GetAwaiter().GetResult();
362-
363-
// Dispose of cancellation tokens
364-
receiverCts.Dispose();
365-
delayedSenderCts.Dispose();
366-
367-
// Clear collections
368-
receivedMessages.Clear();
369-
messagesHistoryList.Clear();
370-
371-
GC.SuppressFinalize(this);
372-
}
373351

374352
public async ValueTask DisposeAsync()
375353
{

0 commit comments

Comments
 (0)