Skip to content

Commit 06a21a0

Browse files
committed
Updated documentation of subscribing to all in CQRS flow sample.
1 parent a3a1f62 commit 06a21a0

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed

CQRS_Flow/.NET/Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
9797

9898
if (streamEvent == null)
9999
{
100-
// that can happen if we're sharing database between modules
101-
// if we're subscribing to all then we might get events that are from other module
102-
// and we might not be able to deserialize them
100+
// That can happen if we're sharing database between modules.
101+
// If we're subscribing to all and not filtering out events from other modules,
102+
// then we might get events that are from other module and we might not be able to deserialize them.
103+
// In that case it's safe to ignore deserialization error.
104+
// You may add more sophisticated logic checking if it should be ignored or not.
103105
logger.LogWarning("Couldn't deserialize event with id: {EventId}", resolvedEvent.Event.EventId);
104106

105107
if (!subscriptionOptions.IgnoreDeserializationErrors)
@@ -119,6 +121,8 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
119121
{
120122
logger.LogError("Error consuming message: {ExceptionMessage}{ExceptionStackTrace}", e.Message,
121123
e.StackTrace);
124+
// if you're fine with dropping some events instead of stopping subscription
125+
// then you can add some logic if error should be ignored
122126
throw;
123127
}
124128
}
@@ -137,13 +141,18 @@ private void HandleDrop(StreamSubscription _, SubscriptionDroppedReason reason,
137141

138142
private void Resubscribe()
139143
{
144+
// You may consider adding a max resubscribe count if you want to fail process
145+
// instead of retrying until database is up
140146
while (true)
141147
{
142148
var resubscribed = false;
143149
try
144150
{
145151
Monitor.Enter(resubscribeLock);
146152

153+
// No synchronization context is needed to disable synchronization context.
154+
// That enables running asynchronous method not causing deadlocks.
155+
// As this is a background process then we don't need to have async context here.
147156
using (NoSynchronizationContextScope.Enter())
148157
{
149158
SubscribeToAll(subscriptionOptions, cancellationToken).Wait(cancellationToken);
@@ -165,7 +174,9 @@ private void Resubscribe()
165174
if (resubscribed)
166175
break;
167176

168-
Thread.Sleep(1000);
177+
// Sleep between reconnections to not flood the database or not kill the CPU with infinite loop
178+
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
179+
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));
169180
}
170181
}
171182

CQRS_Flow/.NET/Core/Core/Events/EventBus.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ IServiceProvider serviceProvider
2222

2323
private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
2424
{
25+
// You can consider adding here a retry policy for event handling
2526
using var scope = serviceProvider.CreateScope();
2627

2728
var eventHandlers =

CQRS_Flow/.NET/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@ It uses:
4141

4242
## Read Model
4343
- Read models are rebuilt with eventual consistency using subscribe to all EventStoreDB feature,
44-
- Added hosted service [SubscribeToAllBackgroundWorker](./Core/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
44+
- Added class to manage subscriptions: [EventStoreDBSubscriptionToAll](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped. It's run inside the [BackgroundWorker](./Core/Core/BackgroundWorkers/BackgroundWorker.cs) that provides the abstraction for starting and disposing hosted service.
4545
- Added [ISubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
4646
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
47-
- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. MongoDB, EntityFramework) can be implemented the same way.
47+
- Added [ElasticSearchProjection](./Core/Core.ElasticSearch/Projections/ElasticSearchProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. It supports idempotency through "external version" ElasticSearch mechanism. Another (e.g. MongoDB, EntityFramework) can be implemented the same way.
4848

4949
## Tests
5050
- Added sample of unit testing in [`Carts.Tests`](./Carts/Carts.Tests):
5151
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartTests.cs)
5252
- [Command handler unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartCommandHandlerTests.cs)
5353
- Added sample of integration testing in [`Carts.Api.Tests`](./Carts/Carts.Api.Tests)
54-
- [API acceptance tests](./Carts/Carts.Api.Tests/Carts/InitializingCart/InitializeCartTests.cs)
54+
- [API acceptance tests](./Carts/Carts.Api.Tests/Carts/)
5555

5656
## Other
5757
- [EventTypeMapper](./Core/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,

0 commit comments

Comments
 (0)