diff --git a/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs new file mode 100644 index 00000000..999d8216 --- /dev/null +++ b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs @@ -0,0 +1,6 @@ +namespace Eventuous.Subscriptions; + +public enum CheckpointInitialPosition { + Beginning, + End +} \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 6ab5d210..f107caa0 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -38,7 +38,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit) EventPosition? LastProcessed { get; set; } CheckpointCommitHandler? CheckpointCommitHandler { get; set; } - ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore); + + protected ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore); protected SubscriptionKind Kind { get; } = kind; diff --git a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs index 01fe5eeb..18a015a3 100644 --- a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs +++ b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs @@ -19,4 +19,5 @@ public abstract record SubscriptionOptions { public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions { public int CheckpointCommitBatchSize { get; set; } = 100; public int CheckpointCommitDelayMs { get; set; } = 5000; + public CheckpointInitialPosition InitialPosition { get; set; } = CheckpointInitialPosition.Beginning; } diff --git a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs index 81e496d2..24e88eb1 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs @@ -15,6 +15,22 @@ SubscriptionFixtureBase where TSubscriptionOptions : SubscriptionOptions where TCheckpointStore : class, ICheckpointStore { + + protected async Task ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + const int count = 10; + + var commands = await GenerateAndHandleCommands(count); + var testEvents = commands.Select(ToEvent).ToList(); + + await fixture.StartSubscription(); + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), []).Validate(cancellationToken); + + commands = await GenerateAndHandleCommands(count); + testEvents = commands.Select(ToEvent).ToList(); + + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [.. testEvents]).Validate(cancellationToken); + await fixture.StopSubscription(); + protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationToken) { const int count = 10; diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs index 18508924..c72c289f 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/AllStreamSubscription.cs @@ -84,7 +84,13 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); - var fromAll = position == null ? FromAll.Start : FromAll.After(new(position.Value, position.Value)); + FromAll GetPosition() { + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromAll.End; + if (position == null) return FromAll.Start; + return FromAll.After(new(position.Value, position.Value)); + } + + var fromAll = GetPosition(); Subscription = await Client.SubscribeToAllAsync( fromAll, diff --git a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs index e19733ba..9483ebe4 100644 --- a/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs +++ b/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/StreamSubscription.cs @@ -91,7 +91,13 @@ public StreamSubscription( protected override async ValueTask Subscribe(CancellationToken cancellationToken) { var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); - var fromStream = position == null ? FromStream.Start : FromStream.After(StreamPosition.FromInt64((long)position)); + FromStream GetStreamPosition() { + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromStream.End; + if (position == null) return FromStream.Start; + return FromStream.After(StreamPosition.FromInt64((long)position)); + } + + var fromStream = GetStreamPosition(); Subscription = await Client.SubscribeToStreamAsync( Options.StreamName, diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs index 77ef9e84..c8c26703 100644 --- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscribeTests.cs @@ -7,7 +7,18 @@ namespace Eventuous.Tests.KurrentDB.Subscriptions; -public class SubscribeToAll() + public class SubscribeToAllFromEnd() + : SubscribeToAllBase( + new CatchUpSubscriptionFixture(opt => opt.InitialPosition = CheckpointInitialPosition.End, new("$all"), false) + ) { + [Test] + [Retry(3)] + public async Task Esdb_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + await ShouldStartConsumptionFromEnd(cancellationToken); + } +} + + public class SubscribeToAll() : SubscribeToAllBase( new CatchUpSubscriptionFixture(_ => { }, new("$all"), false) ) { diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs index d4310214..70c6105b 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs @@ -7,6 +7,17 @@ namespace Eventuous.Tests.Postgres.Subscriptions; +[NotInParallel] +public class SubscribeToAllFromEnd() + : SubscribeToAllBase( + new SubscriptionFixture(opt => opt.InitialPosition = Eventuous.Subscriptions.CheckpointInitialPosition.End, false) + ) { + [Test] + public async Task Postgres_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) { + await ShouldStartConsumptionFromEnd(cancellationToken); + } +} + [NotInParallel] public class SubscribeToAll() : SubscribeToAllBase( diff --git a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs index 9787dfd1..c91e35b4 100644 --- a/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/Subscriptions/SqlSubscriptionBase.cs @@ -214,6 +214,18 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken await BeforeSubscribe(cancellationToken).NoContext(); var (_, position) = await GetCheckpoint(cancellationToken).NoContext(); + if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) { + var endOfStream = await GetSubscriptionEndOfStream(cancellationToken).NoContext(); + if (endOfStream == EndOfStream.Invalid) { + throw new InvalidOperationException($"Could not get the end of the stream for subscription {SubscriptionId}"); + } + await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken).NoContext(); + position = endOfStream.Position; + } + await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken); + position = endOfStream.Position; + } + _runner = new TaskRunner(token => PollingQuery(position, token)).Start(); }