Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/Core/src/Eventuous.Subscriptions/CheckpointInitialPosition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Eventuous.Subscriptions {
public record CheckpointInitialPosition() {
public static CheckpointInitialPosition From(long position) => new From(position);
public static CheckpointInitialPosition Beginning => new Beginning();
public static CheckpointInitialPosition End => new End();
};
public record From(long Position) : CheckpointInitialPosition;
public record Beginning() : CheckpointInitialPosition;
public record End() : CheckpointInitialPosition;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Eventuous.Subscriptions.Checkpoints;

[PublicAPI]
public interface ICheckpointStore {
ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken);
ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition initialPosition, CancellationToken cancellationToken);

ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class MeasuredCheckpointStore(ICheckpointStore checkpointStore) : ICheckp
public const string SubscriptionIdTag = "subscriptionId";
public const string CheckpointBaggage = "checkpoint";

public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
using var activity = EventuousDiagnostics.ActivitySource.CreateActivity(
ReadOperationName,
ActivityKind.Internal,
Expand All @@ -23,7 +23,7 @@ public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, Cancel
)
?.Start();

var checkpoint = await checkpointStore.GetLastCheckpoint(checkpointId, cancellationToken).NoContext();
var checkpoint = await checkpointStore.GetLastCheckpoint(checkpointId, CheckpointInitialPosition.Beginning, cancellationToken).NoContext();

activity?.AddBaggage(CheckpointBaggage, checkpoint.Position?.ToString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Eventuous.Subscriptions.Checkpoints;
public class NoOpCheckpointStore(ulong? start = null) : ICheckpointStore {
Checkpoint _start = new("", start);

public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
var checkpoint = _start with { Id = checkpointId };
Logger.Current.CheckpointLoaded(this, checkpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected async Task<Checkpoint> GetCheckpoint(CancellationToken cancellationTok

Logger.Current = Log;

var checkpoint = await CheckpointStore.GetLastCheckpoint(Options.SubscriptionId, cancellationToken).NoContext();
var checkpoint = await CheckpointStore.GetLastCheckpoint(Options.SubscriptionId, Options.CheckpointInitialPosition, cancellationToken).NoContext();

LastProcessed = new EventPosition(checkpoint.Position, DateTime.Now);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public abstract record SubscriptionOptions {
public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions {
public int CheckpointCommitBatchSize { get; set; } = 100;
public int CheckpointCommitDelayMs { get; set; } = 5000;
public CheckpointInitialPosition CheckpointInitialPosition { get; set; } = new Beginning();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ SubscriptionFixtureBase<TContainer, TSubscription, TSubscriptionOptions, TCheckp
where TSubscriptionOptions : SubscriptionOptions
where TCheckpointStore : class, ICheckpointStore {
protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationToken) {
const int count = 10;

const int count = 10;
var commands = await GenerateAndHandleCommands(count);
var testEvents = commands.Select(ToEvent).ToList();

Expand All @@ -38,8 +37,7 @@ protected async Task ShouldConsumeProducedEventsWhenRestarting(CancellationToken
return;

async Task TestConsumptionOfProducedEvents() {
const int count = 10;

const int count = 10;
var commands = await GenerateAndHandleCommands(count);
var testEvents = commands.Select(ToEvent).ToList();
await fixture.StartSubscription();
Expand All @@ -54,11 +52,11 @@ protected async Task ShouldUseExistingCheckpoint(CancellationToken cancellationT

await GenerateAndHandleCommands(count);

await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken);
await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
var last = await fixture.GetLastPosition();
await fixture.CheckpointStore.StoreCheckpoint(new(fixture.SubscriptionId, last), true, cancellationToken);

var l = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken);
var l = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
WriteLine("Last checkpoint: {0}", l.Position!);

await fixture.StartSubscription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Eventuous.Tests.Subscriptions.Base;

public abstract class SubscribeToStreamBase<TContainer, TSub, TSubOptions, TCheckpointStore>(
StreamName streamName,
StreamName streamName,
SubscriptionFixtureBase<TContainer, TSub, TSubOptions, TCheckpointStore, TestEventHandler> fixture
) : SubscriptionTestBase(fixture)
where TContainer : DockerContainer
Expand All @@ -25,11 +25,10 @@ protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationT
var testEvents = await GenerateAndProduceEvents(count);

await fixture.StartSubscription();
await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken);
await fixture.StopSubscription();
await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); await fixture.StopSubscription();
await Assert.That(fixture.Handler.Count).IsEqualTo(10);

var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken);
var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
await Assert.That(checkpoint.Position).IsEqualTo(expected);
}

Expand All @@ -43,7 +42,7 @@ protected async Task ShouldConsumeProducedEventsWhenRestarting(CancellationToken
WriteLine("Phase two");
await TestConsumptionOfProducedEvents();

var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken);
var checkpoint = await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
await Assert.That(checkpoint.Position).IsEqualTo(19UL);

return;
Expand All @@ -56,8 +55,7 @@ async Task TestConsumptionOfProducedEvents() {

WriteLine("Starting subscription");
await fixture.StartSubscription();
await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken);
WriteLine("Stopping subscription");
await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [..testEvents]).Validate(cancellationToken); WriteLine("Stopping subscription");
await fixture.StopSubscription();
await Assert.That(fixture.Handler.Count).IsEqualTo(10);
}
Expand All @@ -68,7 +66,7 @@ public async Task ShouldUseExistingCheckpoint(CancellationToken cancellationToke

await GenerateAndProduceEvents(count);

await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, cancellationToken);
await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
Logger.ConfigureIfNull(fixture.SubscriptionId, fixture.LoggerFactory);
await fixture.CheckpointStore.StoreCheckpoint(new(fixture.SubscriptionId, 9), true, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace Eventuous.Tests.EventStore.Fixtures;
public class TestCheckpointStore : ICheckpointStore {
readonly Dictionary<string, Checkpoint> _checkpoints = new();

public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
var checkpoint = _checkpoints.TryGetValue(checkpointId, out var cp) ? cp : new(checkpointId, null);
Logger.Current.CheckpointLoaded(this, checkpoint);
return new(checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public async Task Setup(CancellationToken cancellationToken) {
services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
"test-custom",
b => b
.Configure(cfg => cfg.StreamName = _streamName)
.Configure(cfg => {
cfg.StreamName = _streamName;
cfg.CheckpointInitialPosition = CheckpointInitialPosition.Beginning;
})
.UseCheckpointStore<TestCheckpointStore>(_ => _checkpointStore)
.UseSerializer<TestSerializer>(_ => _serializer)
.UseTypeMapper(typeMapper)
Expand Down Expand Up @@ -93,7 +96,7 @@ public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context
class TestCheckpointStore : ICheckpointStore {
public bool ReceivedGetCheckpoint { get; private set; }

public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
ReceivedGetCheckpoint = true;

return ValueTask.FromResult(new Checkpoint(checkpointId, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task SubscribeAndProduceManyWithIgnored(CancellationToken cancellat
await _handler.AssertCollection(5.Seconds(), expected).Validate(cancellationToken);
await DisposeAsync();

var last = await _checkpointStore.GetLastCheckpoint(_subscriptionId, cancellationToken);
var last = await _checkpointStore.GetLastCheckpoint(_subscriptionId, CheckpointInitialPosition.Beginning, cancellationToken);
last.Position.ShouldBe((ulong)(testEvents.Count - 1));

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0.

using System.Collections.Concurrent;
using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -34,7 +35,7 @@ static void CreateCheckpointIndex(IElasticClient client, string indexName) {

readonly ConcurrentDictionary<string, int> _counters = new();

public async ValueTask<EventuousCheckpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public async ValueTask<EventuousCheckpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
var response = await _client.GetAsync(
DocumentPath<Checkpoint>.Id(checkpointId),
x => x.Realtime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public MongoCheckpointStore(IMongoDatabase database, IOptions<MongoCheckpointSto

IMongoCollection<Checkpoint> Checkpoints { get; } = Ensure.NotNull(database).GetCollection<Checkpoint>(options.CollectionName);

public async ValueTask<EventuousCheckpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken = default) {
public async ValueTask<EventuousCheckpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken = default) {
var storedCheckpoint = await Checkpoints.AsQueryable()
.Where(x => x.Id == checkpointId)
.SingleOrDefaultAsync(cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ protected override void ConfigureServices(IServiceCollection services, string su

public async Task WaitForPosition(ulong position) {
var checkpointStore = Host.Services.GetRequiredService<ICheckpointStore>();
var count = 100;
var count = 100;

while (count-- > 0) {
var checkpoint = await checkpointStore.GetLastCheckpoint(nameof(ProjectWithBuilder), default);
var checkpoint = await checkpointStore.GetLastCheckpoint(nameof(ProjectWithBuilder), CheckpointInitialPosition.Beginning, default);

if (checkpoint.Position.HasValue && checkpoint.Position.Value >= position) break;

Expand Down
2 changes: 1 addition & 1 deletion src/Postgres/src/Eventuous.Postgresql/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Schema(string schema = Schema.DefaultSchema) {
public string StreamExists => $"select exists (select 1 from {schema}.streams where stream_name = (@name))";
public string TruncateStream => $"select * from {schema}.truncate_stream(@_stream_name, @_expected_version, @_position)";
public string GetCheckpointSql => $"select position from {schema}.checkpoints where id=(@checkpointId)";
public string AddCheckpointSql => $"insert into {schema}.checkpoints (id) values (@checkpointId)";
public string AddCheckpointSql => $"insert into {schema}.checkpoints (id, position) values (@checkpointId, case when @initialPosition = 'end' then (select max(global_position) from {schema}.messages) when @initialPosition is not null then cast(@initialPosition as bigint) else null end) returning position";
public string UpdateCheckpointSql => $"update {schema}.checkpoints set position=(@position) where id=(@checkpointId)";
public string TryInsertTombstone => $"select {schema}.try_insert_tombstone(@_gap_position, @_stream_name, @_type, @_id)";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Logging;
using Microsoft.Extensions.Logging;
Expand All @@ -9,6 +10,7 @@
namespace Eventuous.Postgresql.Subscriptions;

using Extensions;
using System.Data;

public class PostgresCheckpointStoreOptions {
public PostgresCheckpointStoreOptions() : this(Postgresql.Schema.DefaultSchema) { }
Expand Down Expand Up @@ -48,23 +50,27 @@ public PostgresCheckpointStore(NpgsqlDataSource dataSource, IOptions<PostgresChe
: this(dataSource, options?.Value.Schema ?? Schema.DefaultSchema, loggerFactory) { }

/// <inheritdoc />
public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition initialPosition, CancellationToken cancellationToken) {
Logger.ConfigureIfNull(checkpointId, _loggerFactory);

var (checkpoint, loaded) = await GetCheckpoint().NoContext();

if (loaded) return checkpoint;

await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext();
await using var add = GetCheckpointCommand(connection, _addCheckpointSql, checkpointId);
await add.ExecuteNonQueryAsync(cancellationToken).NoContext();
await using var add = GetCheckpointCommand(connection, _addCheckpointSql, checkpointId, initialPosition);
await using var reader = await add.ExecuteReaderAsync(cancellationToken).NoContext();
await reader.ReadAsync(cancellationToken).NoContext();
var position = (ulong?)reader.GetInt64(0);
checkpoint = new Checkpoint(checkpointId, position);

Logger.Current.CheckpointLoaded(this, checkpoint);

return checkpoint;

async Task<(Checkpoint Checkpoint, bool Loaded)> GetCheckpoint() {
await using var c = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext();
await using var cmd = GetCheckpointCommand(c, _getCheckpointSql, checkpointId);
await using var c = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext();
await using var cmd = GetCheckpointCommand(c, _getCheckpointSql, checkpointId, null);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext();

if (!await reader.ReadAsync(cancellationToken).NoContext()) return (Checkpoint.Empty(checkpointId), false);
Expand All @@ -83,7 +89,7 @@ public async ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool f

await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext();

await using var cmd = GetCheckpointCommand(connection, _storeCheckpointSql, checkpoint.Id)
await using var cmd = GetCheckpointCommand(connection, _storeCheckpointSql, checkpoint.Id, null)
.Add("position", NpgsqlDbType.Bigint, (long)checkpoint.Position);

await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext();
Expand All @@ -92,6 +98,13 @@ public async ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool f
return checkpoint;
}

static NpgsqlCommand GetCheckpointCommand(NpgsqlConnection connection, string sql, string checkpointId)
=> connection.GetCommand(sql).Add("checkpointId", NpgsqlDbType.Varchar, checkpointId);
static NpgsqlCommand GetCheckpointCommand(NpgsqlConnection connection, string sql, string checkpointId, CheckpointInitialPosition? initialPosition)
=> connection.GetCommand(sql)
.Add("checkpointId", NpgsqlDbType.Varchar, checkpointId)
.Add("initialPosition", NpgsqlDbType.Varchar, initialPosition switch {
End => "end",
From position => position.ToString(),
Beginning => "beginning",
_ => "beginning"
});
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Logging;
using Microsoft.Extensions.Logging;

namespace Eventuous.Redis.Subscriptions;

public class RedisCheckpointStore(GetRedisDatabase getDatabase, ILoggerFactory? loggerFactory) : ICheckpointStore {
public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) {
public async ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CheckpointInitialPosition _, CancellationToken cancellationToken) {
Logger.ConfigureIfNull(checkpointId, loggerFactory);
var position = await getDatabase().StringGetAsync(checkpointId).NoContext();
var position = await getDatabase().StringGetAsync(checkpointId).NoContext();
var checkpoint = position.IsNull ? Checkpoint.Empty(checkpointId) : new Checkpoint(checkpointId, Convert.ToUInt64(position));
Logger.Current.CheckpointLoaded(this, checkpoint);
return checkpoint;
Expand Down
Loading