Skip to content

Commit 2f6e643

Browse files
authored
Refactor message producers
* Naming change * Refactor logic of sending messages, to simplify and apply formatting for each sending message * Update readme.md
1 parent 0ba6d61 commit 2f6e643

11 files changed

Lines changed: 187 additions & 228 deletions

File tree

readme.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
* Override GUID, DateTime values in a message before sending
1515
* Ability to compress and encode a message before sending
1616
* Send a **single message**
17-
* Send a **batch of identical messages**
18-
* Send a **batch of identical messages** with a **time delay** between each message
17+
* Send a **batch of messages**
18+
* Send a **batch of messages** with a **time delay** between each message
1919

2020
### Receiving Messages from Event Hubs
2121

src/Application/Services/EventHubConsumerService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public async IAsyncEnumerable<EventHubMessage> StartReceiveMessageAsync(
3939

4040
await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken))
4141
{
42-
logger.LogInformation("Received message {MsgData}", message);
42+
logger.LogInformation("Received message {MsgData}", message.Message);
4343
message.Message = textProcessingPipeline.Process(message.Message);
4444

4545
yield return message;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using Domain.Interfaces.Providers;
2+
using Domain.Interfaces.Services;
3+
using Domain.Models;
4+
5+
namespace Application.Services.MessageProducers;
6+
7+
public abstract class BaseMessageProducer<T> : IMessageProducerService
8+
{
9+
private readonly IMessageProducerProvider messageProducerProvider;
10+
private readonly MessageOptions? messageOptions;
11+
private bool disposed;
12+
13+
14+
protected BaseMessageProducer(
15+
IMessageProducerProvider messageProducerProvider,
16+
MessageOptions? messageOptions = null
17+
)
18+
{
19+
this.messageProducerProvider = messageProducerProvider;
20+
this.messageOptions = messageOptions;
21+
}
22+
23+
24+
public async Task SendMessagesAsync(
25+
string? messageText, uint numberOfMessages = 1, TimeSpan? delayToSend = null, CancellationToken cancellationToken = default
26+
)
27+
{
28+
if (string.IsNullOrWhiteSpace(messageText))
29+
return;
30+
31+
var messageModifier = CreateMessageModifier();
32+
33+
if (numberOfMessages <= 1)
34+
{
35+
await messageProducerProvider
36+
.SendMessageAsync(messageText, messageModifier, cancellationToken)
37+
.ConfigureAwait(false);
38+
}
39+
else if (delayToSend is null || delayToSend.Value <= TimeSpan.Zero)
40+
{
41+
await messageProducerProvider
42+
.SendMessagesAsync(messageText, messageModifier, numberOfMessages, cancellationToken)
43+
.ConfigureAwait(false);
44+
}
45+
else
46+
{
47+
await messageProducerProvider.SendMessagesWithDelayAsync(
48+
messageText, messageModifier, numberOfMessages, delayToSend.Value, cancellationToken
49+
).ConfigureAwait(false);
50+
}
51+
}
52+
53+
54+
private Func<string, BinaryData> CreateMessageModifier()
55+
{
56+
return messageInput =>
57+
{
58+
var formatterMessage = ApplyFormattingOptions(messageInput);
59+
var messageToSend = ApplyEncodingOptions(formatterMessage);
60+
return EncodeToBinaryData(messageToSend);
61+
};
62+
}
63+
64+
protected virtual string ApplyFormattingOptions(string messageText)
65+
{
66+
return messageOptions?.TextProcessingPipeline?.Process(messageText) ?? messageText;
67+
}
68+
69+
protected abstract T ApplyEncodingOptions(string message);
70+
protected abstract BinaryData EncodeToBinaryData(T message);
71+
72+
73+
public async ValueTask DisposeAsync()
74+
{
75+
if (disposed)
76+
return;
77+
78+
await messageProducerProvider.DisposeAsync().ConfigureAwait(false);
79+
GC.SuppressFinalize(this);
80+
disposed = true;
81+
}
82+
83+
~BaseMessageProducer() => DisposeAsync().AsTask().GetAwaiter().GetResult();
84+
}
Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
using Application.Utils;
22
using Domain.Interfaces.Providers;
33
using Domain.Models;
4-
using Microsoft.Extensions.Logging;
54

65
namespace Application.Services.MessageProducers;
76

8-
public class BytesMessageProducer : MessageProducerBase<byte[]>
7+
public class BytesMessageProducer : BaseMessageProducer<byte[]>
98
{
10-
private readonly ILogger<BytesMessageProducer> logger;
119
private readonly MessageOptions? messageOptions;
1210

1311
public BytesMessageProducer(
14-
ILogger<BytesMessageProducer> logger,
1512
IMessageProducerProvider messageProducerProvider,
1613
MessageOptions? messageOptions = null
1714
) : base(messageProducerProvider, messageOptions)
1815
{
19-
this.logger = logger;
2016
this.messageOptions = messageOptions;
2117
}
2218

@@ -28,31 +24,8 @@ protected override byte[] ApplyEncodingOptions(string message)
2824
return message.Compress();
2925
}
3026

31-
protected override async Task SendSingleMessageAsync(byte[] message, CancellationToken cancellationToken)
27+
protected override BinaryData EncodeToBinaryData(byte[] message)
3228
{
33-
await MessageProducerProvider.SendMessageAsync(message, cancellationToken);
34-
}
35-
36-
protected override async Task SendBatchMessagesAsync(
37-
byte[] message, int numberOfMessages, CancellationToken cancellationToken
38-
)
39-
{
40-
var messages = MultipleMessages(message, numberOfMessages);
41-
await MessageProducerProvider.SendMessagesAsync(messages, cancellationToken);
42-
}
43-
44-
protected override async Task SendMessagesWithDelayAsync(
45-
byte[] message, int numberOfMessages, TimeSpan timeDelay, CancellationToken cancellationToken
46-
)
47-
{
48-
try
49-
{
50-
var messages = MultipleMessages(message, numberOfMessages);
51-
await MessageProducerProvider.SendMessagesAsync(messages, timeDelay, cancellationToken);
52-
}
53-
catch (OperationCanceledException)
54-
{
55-
logger.LogWarning("Task SendMessagesWithDelayAsync was canceled");
56-
}
29+
return BinaryData.FromBytes(message);
5730
}
5831
}

src/Application/Services/MessageProducers/MessageProducerBase.cs

Lines changed: 0 additions & 65 deletions
This file was deleted.
Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
using Application.Utils;
22
using Domain.Interfaces.Providers;
33
using Domain.Models;
4-
using Microsoft.Extensions.Logging;
54

65
namespace Application.Services.MessageProducers;
76

8-
public class StringMessageProducer : MessageProducerBase<string>
7+
public class StringMessageProducer : BaseMessageProducer<string>
98
{
10-
private readonly ILogger<StringMessageProducer> logger;
119
private readonly MessageOptions? messageOptions;
1210

1311
public StringMessageProducer(
14-
ILogger<StringMessageProducer> logger,
1512
IMessageProducerProvider messageProducerProvider,
1613
MessageOptions? messageOptions = null
1714
) : base(messageProducerProvider, messageOptions)
1815
{
19-
this.logger = logger;
2016
this.messageOptions = messageOptions;
2117
}
2218

@@ -28,31 +24,8 @@ protected override string ApplyEncodingOptions(string message)
2824
return message.Compress().EncodeBase64();
2925
}
3026

31-
protected override async Task SendSingleMessageAsync(string message, CancellationToken cancellationToken)
27+
protected override BinaryData EncodeToBinaryData(string message)
3228
{
33-
await MessageProducerProvider.SendMessageAsync(message, cancellationToken);
34-
}
35-
36-
protected override async Task SendBatchMessagesAsync(
37-
string message, int numberOfMessages, CancellationToken cancellationToken
38-
)
39-
{
40-
var messages = MultipleMessages(message, numberOfMessages);
41-
await MessageProducerProvider.SendMessagesAsync(messages, cancellationToken);
42-
}
43-
44-
protected override async Task SendMessagesWithDelayAsync(
45-
string message, int numberOfMessages, TimeSpan timeDelay, CancellationToken cancellationToken
46-
)
47-
{
48-
try
49-
{
50-
var messages = MultipleMessages(message, numberOfMessages);
51-
await MessageProducerProvider.SendMessagesAsync(messages, timeDelay, cancellationToken);
52-
}
53-
catch (OperationCanceledException)
54-
{
55-
logger.LogWarning("Task SendMessagesWithDelayAsync was canceled");
56-
}
29+
return BinaryData.FromString(message);
5730
}
5831
}

src/Domain/Domain.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@
66
<Nullable>enable</Nullable>
77
</PropertyGroup>
88

9+
<ItemGroup>
10+
<PackageReference Include="System.Memory.Data" Version="6.0.1" />
11+
</ItemGroup>
12+
913
</Project>

src/Domain/Interfaces/Providers/IMessageProducerProvider.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,22 @@ namespace Domain.Interfaces.Providers;
22

33
public interface IMessageProducerProvider : IAsyncDisposable
44
{
5-
Task SendMessageAsync(string message, CancellationToken cancellationToken);
6-
Task SendMessageAsync(byte[] message, CancellationToken cancellationToken);
7-
8-
Task SendMessagesAsync(IReadOnlyList<string> messages, CancellationToken cancellationToken);
9-
Task SendMessagesAsync(IReadOnlyList<byte[]> messages, CancellationToken cancellationToken);
10-
11-
Task SendMessagesAsync(IReadOnlyList<string> messages, TimeSpan timeDelay, CancellationToken cancellationToken);
12-
Task SendMessagesAsync(IReadOnlyList<byte[]> messages, TimeSpan timeDelay, CancellationToken cancellationToken);
5+
Task SendMessageAsync(
6+
string message, Func<string, BinaryData>? messageModifier = null, CancellationToken cancellationToken = default
7+
);
8+
9+
Task SendMessagesAsync(
10+
string message,
11+
Func<string, BinaryData>? messageModifier = null,
12+
uint numberOfMessages = 1,
13+
CancellationToken cancellationToken = default
14+
);
15+
16+
Task SendMessagesWithDelayAsync(
17+
string message,
18+
Func<string, BinaryData>? messageModifier = null,
19+
uint numberOfMessages = 1,
20+
TimeSpan sendDelay = default,
21+
CancellationToken cancellationToken = default
22+
);
1323
}

src/Domain/Interfaces/Services/IMessageProducerService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ namespace Domain.Interfaces.Services;
33
public interface IMessageProducerService : IAsyncDisposable
44
{
55
Task SendMessagesAsync(
6-
string? messageText, int numberOfMessages = 1, TimeSpan? delayToSend = null, CancellationToken cancellationToken = default
6+
string? messageText, uint numberOfMessages = 1, TimeSpan? delayToSend = null, CancellationToken cancellationToken = default
77
);
88
}

0 commit comments

Comments
 (0)