From e253527c50068f739de7dc29891d9ec2c49d0e79 Mon Sep 17 00:00:00 2001 From: mathantunes Date: Mon, 10 Nov 2025 23:52:01 +0100 Subject: [PATCH 1/5] Checkpoint initial position enum --- .../Eventuous.Subscriptions/CheckpointInitialPosition.cs | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs diff --git a/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs new file mode 100644 index 00000000..999d8216 --- /dev/null +++ b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs @@ -0,0 +1,6 @@ +namespace Eventuous.Subscriptions; + +public enum CheckpointInitialPosition { + Beginning, + End +} \ No newline at end of file From cc228edbaca0673ac6d6f4f18b4f84471f32aaa2 Mon Sep 17 00:00:00 2001 From: mathantunes Date: Mon, 10 Nov 2025 23:52:46 +0100 Subject: [PATCH 2/5] Add CheckpointInitialPosition to SubscriptionWithCheckpointOptions --- src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs index 01fe5eeb..18a015a3 100644 --- a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs +++ b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs @@ -19,4 +19,5 @@ public abstract record SubscriptionOptions { public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions { public int CheckpointCommitBatchSize { get; set; } = 100; public int CheckpointCommitDelayMs { get; set; } = 5000; + public CheckpointInitialPosition InitialPosition { get; set; } = CheckpointInitialPosition.Beginning; } From a42fc1e469eb1f8abe96509c684dbff70914bd47 Mon Sep 17 00:00:00 2001 From: mathantunes Date: Tue, 11 Nov 2025 00:24:29 +0100 Subject: [PATCH 3/5] Set checkpoint to end on subscription --- .../EventSubscriptionWithCheckpoint.cs | 3 ++- .../Subscriptions/AllStreamSubscription.cs | 8 +++++++- .../Subscriptions/StreamSubscription.cs | 8 +++++++- .../Subscriptions/SqlSubscriptionBase.cs | 9 +++++++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 6ab5d210..f107caa0 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -38,7 +38,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit) EventPosition? LastProcessed { get; set; } CheckpointCommitHandler? CheckpointCommitHandler { get; set; } - ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore); + + protected ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore); protected SubscriptionKind Kind { get; } = kind; diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs index 18508924..c72c289f 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs @@ -84,7 +84,13 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); - var fromAll = position == null ? FromAll.Start : FromAll.After(new(position.Value, position.Value)); + FromAll GetPosition() { + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromAll.End; + if (position == null) return FromAll.Start; + return FromAll.After(new(position.Value, position.Value)); + } + + var fromAll = GetPosition(); Subscription = await Client.SubscribeToAllAsync( fromAll, diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs index e19733ba..9483ebe4 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs @@ -91,7 +91,13 @@ public StreamSubscription( protected override async ValueTask Subscribe(CancellationToken cancellationToken) { var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); - var fromStream = position == null ? FromStream.Start : FromStream.After(StreamPosition.FromInt64((long)position)); + FromStream GetStreamPosition() { + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromStream.End; + if (position == null) return FromStream.Start; + return FromStream.After(StreamPosition.FromInt64((long)position)); + } + + var fromStream = GetStreamPosition(); Subscription = await Client.SubscribeToStreamAsync( Options.StreamName, diff --git a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs index 9787dfd1..21d360d9 100644 --- a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs @@ -214,6 +214,15 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken await BeforeSubscribe(cancellationToken).NoContext(); var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) { + var endOfStream = await GetSubscriptionEndOfStream(cancellationToken); + if (endOfStream == EndOfStream.Invalid) { + throw new InvalidOperationException($"Could not get the end of the stream for {SubscriptionId}"); + } + await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken); + position = endOfStream.Position; + } + _runner = new TaskRunner(token => PollingQuery(position, token)).Start(); } From 7e389db867c60d530732147904e85b9df88c4588 Mon Sep 17 00:00:00 2001 From: mathantunes Date: Mon, 17 Nov 2025 22:25:27 +0100 Subject: [PATCH 4/5] Add checkpoint initial position tests --- .../SubscribeToAll.cs | 17 +++++++++++++++++ .../Subscriptions/SubscribeTests.cs | 13 ++++++++++++- .../Subscriptions/SubscribeTests.cs | 11 +++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs index 81e496d2..78ee6978 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs @@ -15,6 +15,23 @@ SubscriptionFixtureBase where TSubscriptionOptions : SubscriptionOptions where TCheckpointStore : class, ICheckpointStore { + + protected async Task ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + const int count = 10; + + var commands = await GenerateAndHandleCommands(count); + var testEvents = commands.Select(ToEvent).ToList(); + + await fixture.StartSubscription(); + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), []).Validate(cancellationToken); + + commands = await GenerateAndHandleCommands(count); + testEvents = commands.Select(ToEvent).ToList(); + + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [.. testEvents]).Validate(cancellationToken); + await fixture.StopSubscription(); + } + protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationToken) { const int count = 10; diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs index 77ef9e84..c8c26703 100644 --- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs @@ -7,7 +7,18 @@ namespace Eventuous.Tests.KurrentDB.Subscriptions; -public class SubscribeToAll() + public class SubscribeToAllFromEnd() + : SubscribeToAllBase( + new CatchUpSubscriptionFixture(opt => opt.InitialPosition = CheckpointInitialPosition.End, new("$all"), false) + ) { + [Test] + [Retry(3)] + public async Task Esdb_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + await ShouldStartConsumptionFromEnd(cancellationToken); + } +} + + public class SubscribeToAll() : SubscribeToAllBase( new CatchUpSubscriptionFixture(_ => { }, new("$all"), false) ) { diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs index d4310214..70c6105b 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs @@ -7,6 +7,17 @@ namespace Eventuous.Tests.Postgres.Subscriptions; +[NotInParallel] +public class SubscribeToAllFromEnd() + : SubscribeToAllBase( + new SubscriptionFixture(opt => opt.InitialPosition = Eventuous.Subscriptions.CheckpointInitialPosition.End, false) + ) { + [Test] + public async Task Postgres_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + await ShouldStartConsumptionFromEnd(cancellationToken); + } +} + [NotInParallel] public class SubscribeToAll() : SubscribeToAllBase( From 86adfaf7ad20e2c224f298ca0973d8e776e63c69 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 24 Nov 2025 12:53:21 +0100 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com> --- .../Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs | 1 - .../Subscriptions/SqlSubscriptionBase.cs | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs index 78ee6978..24e88eb1 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs @@ -30,7 +30,6 @@ protected async Task ShouldStartConsumptionFromEnd(CancellationToken cancellatio await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [.. testEvents]).Validate(cancellationToken); await fixture.StopSubscription(); - } protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationToken) { const int count = 10; diff --git a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs index 21d360d9..c91e35b4 100644 --- a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs @@ -215,9 +215,12 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) { - var endOfStream = await GetSubscriptionEndOfStream(cancellationToken); + var endOfStream = await GetSubscriptionEndOfStream(cancellationToken).NoContext(); if (endOfStream == EndOfStream.Invalid) { - throw new InvalidOperationException($"Could not get the end of the stream for {SubscriptionId}"); + throw new InvalidOperationException($"Could not get the end of the stream for subscription {SubscriptionId}"); + } + await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken).NoContext(); + position = endOfStream.Position; } await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken); position = endOfStream.Position;