From 2ae6a27e0fc35fd6d5c05e909951068458bdede5 Mon Sep 17 00:00:00 2001 From: mathantunes Date: Tue, 28 Oct 2025 22:39:07 +0100 Subject: [PATCH] Create CheckpointInitialPosition on SubscriptionWithCheckpointOptions Using 3 record types, this commit allows a subscription to define its starting point. It can be Beginning, End or From a specific position.Create CheckpointInitialPosition on SubscriptionWithCheckpointOptions Using 3 record types, this commit allows a subscription to define its starting point. It can be Beginning, End or From a specific position. --- .../CheckpointInitialPosition.cs | 19 ++++++++++++ .../Checkpoints/ICheckpointStore.cs | 2 +- .../Checkpoints/MeasuredCheckpointStore.cs | 4 +-- .../Checkpoints/NoOpCheckpointStore.cs | 2 +- .../EventSubscriptionWithCheckpoint.cs | 2 +- .../SubscriptionOptions.cs | 1 + .../SubscribeToAll.cs | 10 +++---- .../SubscribeToStream.cs | 14 ++++----- .../Fixtures/TestCheckpointStore.cs | 2 +- .../Subscriptions/CustomDependenciesTests.cs | 7 +++-- .../SubscriptionIgnoredMessagesTests.cs | 2 +- .../Projections/ElasticCheckpointStore.cs | 3 +- .../MongoCheckpointStore.cs | 2 +- .../ProjectionTestBase.cs | 4 +-- .../src/Eventuous.Postgresql/Schema.cs | 2 +- .../Subscriptions/PostgresCheckpointStore.cs | 29 ++++++++++++++----- .../Subscriptions/RedisCheckpointStore.cs | 5 ++-- .../Subscriptions/SubscribeToAll.cs | 3 +- .../Subscriptions/SubscribeToStream.cs | 3 +- .../Subscriptions/SqlServerCheckpointStore.cs | 3 +- 20 files changed, 78 insertions(+), 41 deletions(-) create mode 100644 src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs diff --git a/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs new file mode 100644 index 00000000..e9ef55af --- /dev/null +++ b/src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs @@ -0,0 +1,19 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Eventuous.Subscriptions { + public record CheckpointInitialPosition() { + public static CheckpointInitialPosition From(long position) => new From(position); + public static CheckpointInitialPosition Beginning => new Beginning(); + public static CheckpointInitialPosition End => new End(); + }; + public record From(long Position) : CheckpointInitialPosition; + public record Beginning() : CheckpointInitialPosition; + public record End() : CheckpointInitialPosition; +} diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/ICheckpointStore.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/ICheckpointStore.cs index 92ec4afc..099164e6 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/ICheckpointStore.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/ICheckpointStore.cs @@ -5,7 +5,7 @@ namespace Eventuous.Subscriptions.Checkpoints; [PublicAPI] public interface ICheckpointStore { - ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken); + ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition initialPosition, CancellationToken cancellationToken); ValueTask StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken); } \ No newline at end of file diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs index dd5aee1b..d842b13c 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs @@ -13,7 +13,7 @@ public class MeasuredCheckpointStore(ICheckpointStore checkpointStore) : ICheckp public const string SubscriptionIdTag = "subscriptionId"; public const string CheckpointBaggage = "checkpoint"; - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { using var activity = EventuousDiagnostics.ActivitySource.CreateActivity( ReadOperationName, ActivityKind.Internal, @@ -23,7 +23,7 @@ public async ValueTask GetLastCheckpoint(string checkpointId, Cancel ) ?.Start(); - var checkpoint = await checkpointStore.GetLastCheckpoint(checkpointId, cancellationToken).NoContext(); + var checkpoint = await checkpointStore.GetLastCheckpoint(checkpointId, CheckpointInitialPosition.Beginning, cancellationToken).NoContext(); activity?.AddBaggage(CheckpointBaggage, checkpoint.Position?.ToString()); diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/NoOpCheckpointStore.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/NoOpCheckpointStore.cs index 8a4bce26..b68f0969 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/NoOpCheckpointStore.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/NoOpCheckpointStore.cs @@ -12,7 +12,7 @@ namespace Eventuous.Subscriptions.Checkpoints; public class NoOpCheckpointStore(ulong? start = null) : ICheckpointStore { Checkpoint _start = new("", start); - public ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { var checkpoint = _start with { Id = checkpointId }; Logger.Current.CheckpointLoaded(this, checkpoint); diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 4181135b..bc8f0d82 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -101,7 +101,7 @@ protected async Task GetCheckpoint(CancellationToken cancellationTok Logger.Current = Log; - var checkpoint = await CheckpointStore.GetLastCheckpoint(Options.SubscriptionId, cancellationToken).NoContext(); + var checkpoint = await CheckpointStore.GetLastCheckpoint(Options.SubscriptionId, Options.CheckpointInitialPosition, cancellationToken).NoContext(); LastProcessed = new EventPosition(checkpoint.Position, DateTime.Now); diff --git a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs index 01fe5eeb..4162dc40 100644 --- a/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs +++ b/src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs @@ -19,4 +19,5 @@ public abstract record SubscriptionOptions { public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions { public int CheckpointCommitBatchSize { get; set; } = 100; public int CheckpointCommitDelayMs { get; set; } = 5000; + public CheckpointInitialPosition CheckpointInitialPosition { get; set; } = new Beginning(); } diff --git a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs index 81e496d2..9261b780 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs @@ -16,8 +16,7 @@ SubscriptionFixtureBase( - StreamName streamName, + StreamName streamName, SubscriptionFixtureBase fixture ) : SubscriptionTestBase(fixture) where TContainer : DockerContainer @@ -25,11 +25,10 @@ protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationT var testEvents = await GenerateAndProduceEvents(count); await fixture.StartSubscription(); - await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); - await fixture.StopSubscription(); + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); await fixture.StopSubscription(); await Assert.That(fixture.Handler.Count).IsEqualTo(10); - var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken); + var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); await Assert.That(checkpoint.Position).IsEqualTo(expected); } @@ -43,7 +42,7 @@ protected async Task ShouldConsumeProducedEventsWhenRestarting(CancellationToken WriteLine("Phase two"); await TestConsumptionOfProducedEvents(); - var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken); + var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); await Assert.That(checkpoint.Position).IsEqualTo(19UL); return; @@ -56,8 +55,7 @@ async Task TestConsumptionOfProducedEvents() { WriteLine("Starting subscription"); await fixture.StartSubscription(); - await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); - WriteLine("Stopping subscription"); + await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); WriteLine("Stopping subscription"); await fixture.StopSubscription(); await Assert.That(fixture.Handler.Count).IsEqualTo(10); } @@ -68,7 +66,7 @@ public async Task ShouldUseExistingCheckpoint(CancellationToken cancellationToke await GenerateAndProduceEvents(count); - await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken); + await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); Logger.ConfigureIfNull(fixture.SubscriptionId, fixture.LoggerFactory); await fixture.CheckpointStore.StoreCheckpoint(new(fixture.SubscriptionId, 9), true, cancellationToken); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/TestCheckpointStore.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/TestCheckpointStore.cs index abc9c2e4..c91e43d0 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/TestCheckpointStore.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/TestCheckpointStore.cs @@ -3,7 +3,7 @@ namespace Eventuous.Tests.EventStore.Fixtures; public class TestCheckpointStore : ICheckpointStore { readonly Dictionary _checkpoints = new(); - public ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { var checkpoint = _checkpoints.TryGetValue(checkpointId, out var cp) ? cp : new(checkpointId, null); Logger.Current.CheckpointLoaded(this, checkpoint); return new(checkpoint); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/CustomDependenciesTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/CustomDependenciesTests.cs index b1c54a79..1d8991d9 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/CustomDependenciesTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/CustomDependenciesTests.cs @@ -44,7 +44,10 @@ public async Task Setup(CancellationToken cancellationToken) { services.AddSubscription( "test-custom", b => b - .Configure(cfg => cfg.StreamName = _streamName) + .Configure(cfg => { + cfg.StreamName = _streamName; + cfg.CheckpointInitialPosition = CheckpointInitialPosition.Beginning; + }) .UseCheckpointStore(_ => _checkpointStore) .UseSerializer(_ => _serializer) .UseTypeMapper(typeMapper) @@ -93,7 +96,7 @@ public ValueTask HandleEvent(IMessageConsumeContext context class TestCheckpointStore : ICheckpointStore { public bool ReceivedGetCheckpoint { get; private set; } - public ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { ReceivedGetCheckpoint = true; return ValueTask.FromResult(new Checkpoint(checkpointId, null)); diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/SubscriptionIgnoredMessagesTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/SubscriptionIgnoredMessagesTests.cs index 948d2ac0..b990a48f 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/SubscriptionIgnoredMessagesTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Subscriptions/SubscriptionIgnoredMessagesTests.cs @@ -42,7 +42,7 @@ public async Task SubscribeAndProduceManyWithIgnored(CancellationToken cancellat await _handler.AssertCollection(5.Seconds(), expected).Validate(cancellationToken); await DisposeAsync(); - var last = await _checkpointStore.GetLastCheckpoint(_subscriptionId, cancellationToken); + var last = await _checkpointStore.GetLastCheckpoint(_subscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); last.Position.ShouldBe((ulong)(testEvents.Count - 1)); return; diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Projections/ElasticCheckpointStore.cs b/src/Experimental/src/Eventuous.ElasticSearch/Projections/ElasticCheckpointStore.cs index 7b9a9982..61e4439d 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Projections/ElasticCheckpointStore.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Projections/ElasticCheckpointStore.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using System.Collections.Concurrent; +using Eventuous.Subscriptions; using Eventuous.Subscriptions.Checkpoints; using Eventuous.Subscriptions.Logging; using Microsoft.Extensions.Options; @@ -34,7 +35,7 @@ static void CreateCheckpointIndex(IElasticClient client, string indexName) { readonly ConcurrentDictionary _counters = new(); - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { var response = await _client.GetAsync( DocumentPath.Id(checkpointId), x => x.Realtime(), diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs index 0f93a7f0..9fc4a215 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs @@ -29,7 +29,7 @@ public MongoCheckpointStore(IMongoDatabase database, IOptions Checkpoints { get; } = Ensure.NotNull(database).GetCollection(options.CollectionName); - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken = default) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken = default) { var storedCheckpoint = await Checkpoints.AsQueryable() .Where(x => x.Id == checkpointId) .SingleOrDefaultAsync(cancellationToken) diff --git a/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/ProjectionTestBase.cs b/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/ProjectionTestBase.cs index bbafbdf6..b698f7e8 100644 --- a/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/ProjectionTestBase.cs +++ b/src/Mongo/test/Eventuous.Tests.Projections.MongoDB/ProjectionTestBase.cs @@ -51,10 +51,10 @@ protected override void ConfigureServices(IServiceCollection services, string su public async Task WaitForPosition(ulong position) { var checkpointStore = Host.Services.GetRequiredService(); - var count = 100; + var count = 100; while (count-- > 0) { - var checkpoint = await checkpointStore.GetLastCheckpoint(nameof(ProjectWithBuilder), default); + var checkpoint = await checkpointStore.GetLastCheckpoint(nameof(ProjectWithBuilder), CheckpointInitialPosition.Beginning, default); if (checkpoint.Position.HasValue && checkpoint.Position.Value >= position) break; diff --git a/src/Postgres/src/Eventuous.Postgresql/Schema.cs b/src/Postgres/src/Eventuous.Postgresql/Schema.cs index bed75ac0..d852cbda 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Schema.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Schema.cs @@ -27,7 +27,7 @@ public class Schema(string schema = Schema.DefaultSchema) { public string StreamExists => $"select exists (select 1 from {schema}.streams where stream_name = (@name))"; public string TruncateStream => $"select * from {schema}.truncate_stream(@_stream_name, @_expected_version, @_position)"; public string GetCheckpointSql => $"select position from {schema}.checkpoints where id=(@checkpointId)"; - public string AddCheckpointSql => $"insert into {schema}.checkpoints (id) values (@checkpointId)"; + public string AddCheckpointSql => $"insert into {schema}.checkpoints (id, position) values (@checkpointId, case when @initialPosition = 'end' then (select max(global_position) from {schema}.messages) when @initialPosition is not null then cast(@initialPosition as bigint) else null end) returning position"; public string UpdateCheckpointSql => $"update {schema}.checkpoints set position=(@position) where id=(@checkpointId)"; public string TryInsertTombstone => $"select {schema}.try_insert_tombstone(@_gap_position, @_stream_name, @_type, @_id)"; diff --git a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresCheckpointStore.cs b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresCheckpointStore.cs index d74dcbe5..9c5aa843 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresCheckpointStore.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresCheckpointStore.cs @@ -1,6 +1,7 @@ // Copyright (C) Eventuous HQ OÜ. All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Subscriptions; using Eventuous.Subscriptions.Checkpoints; using Eventuous.Subscriptions.Logging; using Microsoft.Extensions.Logging; @@ -9,6 +10,7 @@ namespace Eventuous.Postgresql.Subscriptions; using Extensions; +using System.Data; public class PostgresCheckpointStoreOptions { public PostgresCheckpointStoreOptions() : this(Postgresql.Schema.DefaultSchema) { } @@ -48,7 +50,7 @@ public PostgresCheckpointStore(NpgsqlDataSource dataSource, IOptions - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition initialPosition, CancellationToken cancellationToken) { Logger.ConfigureIfNull(checkpointId, _loggerFactory); var (checkpoint, loaded) = await GetCheckpoint().NoContext(); @@ -56,15 +58,19 @@ public async ValueTask GetLastCheckpoint(string checkpointId, Cancel if (loaded) return checkpoint; await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); - await using var add = GetCheckpointCommand(connection, _addCheckpointSql, checkpointId); - await add.ExecuteNonQueryAsync(cancellationToken).NoContext(); + await using var add = GetCheckpointCommand(connection, _addCheckpointSql, checkpointId, initialPosition); + await using var reader = await add.ExecuteReaderAsync(cancellationToken).NoContext(); + await reader.ReadAsync(cancellationToken).NoContext(); + var position = (ulong?)reader.GetInt64(0); + checkpoint = new Checkpoint(checkpointId, position); + Logger.Current.CheckpointLoaded(this, checkpoint); return checkpoint; async Task<(Checkpoint Checkpoint, bool Loaded)> GetCheckpoint() { - await using var c = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); - await using var cmd = GetCheckpointCommand(c, _getCheckpointSql, checkpointId); + await using var c = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); + await using var cmd = GetCheckpointCommand(c, _getCheckpointSql, checkpointId, null); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); if (!await reader.ReadAsync(cancellationToken).NoContext()) return (Checkpoint.Empty(checkpointId), false); @@ -83,7 +89,7 @@ public async ValueTask StoreCheckpoint(Checkpoint checkpoint, bool f await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); - await using var cmd = GetCheckpointCommand(connection, _storeCheckpointSql, checkpoint.Id) + await using var cmd = GetCheckpointCommand(connection, _storeCheckpointSql, checkpoint.Id, null) .Add("position", NpgsqlDbType.Bigint, (long)checkpoint.Position); await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); @@ -92,6 +98,13 @@ public async ValueTask StoreCheckpoint(Checkpoint checkpoint, bool f return checkpoint; } - static NpgsqlCommand GetCheckpointCommand(NpgsqlConnection connection, string sql, string checkpointId) - => connection.GetCommand(sql).Add("checkpointId", NpgsqlDbType.Varchar, checkpointId); + static NpgsqlCommand GetCheckpointCommand(NpgsqlConnection connection, string sql, string checkpointId, CheckpointInitialPosition? initialPosition) + => connection.GetCommand(sql) + .Add("checkpointId", NpgsqlDbType.Varchar, checkpointId) + .Add("initialPosition", NpgsqlDbType.Varchar, initialPosition switch { + End => "end", + From position => position.ToString(), + Beginning => "beginning", + _ => "beginning" + }); } diff --git a/src/Redis/src/Eventuous.Redis/Subscriptions/RedisCheckpointStore.cs b/src/Redis/src/Eventuous.Redis/Subscriptions/RedisCheckpointStore.cs index b6681100..6ac3ec25 100644 --- a/src/Redis/src/Eventuous.Redis/Subscriptions/RedisCheckpointStore.cs +++ b/src/Redis/src/Eventuous.Redis/Subscriptions/RedisCheckpointStore.cs @@ -1,6 +1,7 @@ // Copyright (C) Eventuous HQ OÜ. All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Subscriptions; using Eventuous.Subscriptions.Checkpoints; using Eventuous.Subscriptions.Logging; using Microsoft.Extensions.Logging; @@ -8,9 +9,9 @@ namespace Eventuous.Redis.Subscriptions; public class RedisCheckpointStore(GetRedisDatabase getDatabase, ILoggerFactory? loggerFactory) : ICheckpointStore { - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { Logger.ConfigureIfNull(checkpointId, loggerFactory); - var position = await getDatabase().StringGetAsync(checkpointId).NoContext(); + var position = await getDatabase().StringGetAsync(checkpointId).NoContext(); var checkpoint = position.IsNull ? Checkpoint.Empty(checkpointId) : new Checkpoint(checkpointId, Convert.ToUInt64(position)); Logger.Current.CheckpointLoaded(this, checkpoint); return checkpoint; diff --git a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs index 604354d4..1a2736f9 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToAll.cs @@ -1,3 +1,4 @@ +using Eventuous.Subscriptions; using Eventuous.Subscriptions.Logging; using Eventuous.TestHelpers.TUnit; using Eventuous.Tests.Redis.Fixtures; @@ -69,7 +70,7 @@ public async Task ShouldUseExistingCheckpoint(CancellationToken cancellationToke var (_, result) = await GenerateAndProduceEvents(count); - await _fixture.CheckpointStore.GetLastCheckpoint(_fixture.SubscriptionId, cancellationToken); + await _fixture.CheckpointStore.GetLastCheckpoint(_fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); Logger.ConfigureIfNull(_fixture.SubscriptionId, _fixture.LoggerFactory); await _fixture.CheckpointStore.StoreCheckpoint(new(_fixture.SubscriptionId, result.GlobalPosition), true, cancellationToken); diff --git a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs index 8f44d270..a0157223 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs @@ -1,3 +1,4 @@ +using Eventuous.Subscriptions; using Eventuous.Subscriptions.Logging; using Eventuous.TestHelpers.TUnit; using Eventuous.Tests.Redis.Fixtures; @@ -68,7 +69,7 @@ public async Task ShouldUseExistingCheckpoint(CancellationToken cancellationToke await GenerateAndProduceEvents(count); - await _fixture.CheckpointStore.GetLastCheckpoint(_fixture.SubscriptionId, cancellationToken); + await _fixture.CheckpointStore.GetLastCheckpoint(_fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken); var streamPosition = await GetStreamPosition(count); Logger.ConfigureIfNull(_fixture.SubscriptionId, _fixture.LoggerFactory); await _fixture.CheckpointStore.StoreCheckpoint(new(_fixture.SubscriptionId, (ulong)streamPosition), true, cancellationToken); diff --git a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerCheckpointStore.cs b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerCheckpointStore.cs index ac663f8d..231d29ae 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerCheckpointStore.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Subscriptions/SqlServerCheckpointStore.cs @@ -7,6 +7,7 @@ namespace Eventuous.SqlServer.Subscriptions; +using Eventuous.Subscriptions; using Extensions; /// @@ -33,7 +34,7 @@ public SqlServerCheckpointStore(SqlServerCheckpointStoreOptions options, ILogger : this(options.ConnectionString!, options is { Schema: not null } ? options.Schema : Schema.DefaultSchema, loggerFactory) { } /// - public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + public async ValueTask GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) { Logger.ConfigureIfNull(checkpointId, _loggerFactory); await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext();