Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Eventuous.Subscriptions;

public enum CheckpointInitialPosition {
Beginning,
End
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)

EventPosition? LastProcessed { get; set; }
CheckpointCommitHandler? CheckpointCommitHandler { get; set; }
ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore);

protected ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore);

protected SubscriptionKind Kind { get; } = kind;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public abstract record SubscriptionOptions {
public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions {
public int CheckpointCommitBatchSize { get; set; } = 100;
public int CheckpointCommitDelayMs { get; set; } = 5000;
public CheckpointInitialPosition InitialPosition { get; set; } = CheckpointInitialPosition.Beginning;
}
16 changes: 16 additions & 0 deletions src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToAll.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ SubscriptionFixtureBase<TContainer, TSubscription, TSubscriptionOptions, TCheckp
where TSubscription : EventSubscription<TSubscriptionOptions>
where TSubscriptionOptions : SubscriptionOptions
where TCheckpointStore : class, ICheckpointStore {

protected async Task ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) {
const int count = 10;

var commands = await GenerateAndHandleCommands(count);
var testEvents = commands.Select(ToEvent).ToList();

await fixture.StartSubscription();
await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), []).Validate(cancellationToken);

commands = await GenerateAndHandleCommands(count);
testEvents = commands.Select(ToEvent).ToList();

await fixture.Handler.AssertCollection(TimeSpan.FromSeconds(2), [.. testEvents]).Validate(cancellationToken);
await fixture.StopSubscription();

protected async Task ShouldConsumeProducedEvents(CancellationToken cancellationToken) {
const int count = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken

var (_, position) = await GetCheckpoint(cancellationToken).NoContext();

var fromAll = position == null ? FromAll.Start : FromAll.After(new(position.Value, position.Value));
FromAll GetPosition() {
if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromAll.End;
if (position == null) return FromAll.Start;
return FromAll.After(new(position.Value, position.Value));
}

var fromAll = GetPosition();

Subscription = await Client.SubscribeToAllAsync(
fromAll,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ public StreamSubscription(
protected override async ValueTask Subscribe(CancellationToken cancellationToken) {
var (_, position) = await GetCheckpoint(cancellationToken).NoContext();

var fromStream = position == null ? FromStream.Start : FromStream.After(StreamPosition.FromInt64((long)position));
FromStream GetStreamPosition() {
if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) return FromStream.End;
if (position == null) return FromStream.Start;
return FromStream.After(StreamPosition.FromInt64((long)position));
}

var fromStream = GetStreamPosition();

Subscription = await Client.SubscribeToStreamAsync(
Options.StreamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@

namespace Eventuous.Tests.KurrentDB.Subscriptions;

public class SubscribeToAll()
public class SubscribeToAllFromEnd()
: SubscribeToAllBase<EventStoreDbContainer, AllStreamSubscription, AllStreamSubscriptionOptions, TestCheckpointStore>(
new CatchUpSubscriptionFixture<AllStreamSubscription, AllStreamSubscriptionOptions, TestEventHandler>(opt => opt.InitialPosition = CheckpointInitialPosition.End, new("$all"), false)
) {
[Test]
[Retry(3)]
public async Task Esdb_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) {
await ShouldStartConsumptionFromEnd(cancellationToken);
}
}

public class SubscribeToAll()
: SubscribeToAllBase<EventStoreDbContainer, AllStreamSubscription, AllStreamSubscriptionOptions, TestCheckpointStore>(
new CatchUpSubscriptionFixture<AllStreamSubscription, AllStreamSubscriptionOptions, TestEventHandler>(_ => { }, new("$all"), false)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@

namespace Eventuous.Tests.Postgres.Subscriptions;

[NotInParallel]
public class SubscribeToAllFromEnd()
: SubscribeToAllBase<PostgreSqlContainer, PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions, PostgresCheckpointStore>(
new SubscriptionFixture<PostgresStore, PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions, TestEventHandler>(opt => opt.InitialPosition = Eventuous.Subscriptions.CheckpointInitialPosition.End, false)
) {
[Test]
public async Task Postgres_ShouldStartConsumptionFromEnd(CancellationToken cancellationToken) {
await ShouldStartConsumptionFromEnd(cancellationToken);
}
}

[NotInParallel]
public class SubscribeToAll()
: SubscribeToAllBase<PostgreSqlContainer, PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions, PostgresCheckpointStore>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken
await BeforeSubscribe(cancellationToken).NoContext();
var (_, position) = await GetCheckpoint(cancellationToken).NoContext();

if (position == null && Options.InitialPosition == CheckpointInitialPosition.End) {
var endOfStream = await GetSubscriptionEndOfStream(cancellationToken).NoContext();
if (endOfStream == EndOfStream.Invalid) {
throw new InvalidOperationException($"Could not get the end of the stream for subscription {SubscriptionId}");
}
await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken).NoContext();
position = endOfStream.Position;
}
await CheckpointStore.StoreCheckpoint(new Checkpoint(SubscriptionId, endOfStream.Position), true, cancellationToken);
position = endOfStream.Position;
}

_runner = new TaskRunner(token => PollingQuery(position, token)).Start();
}

Expand Down