Skip to content

Conversation

@mathantunes
Copy link
Contributor

@alexeyzimarev
Copy link
Contributor

/review

@qodo-code-review
Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

456 - Partially compliant

Compliant requirements:

  • Provide a way to specify the desired initial position (beginning or end) via subscription configuration/options.
  • Maintain backward compatibility: default behavior remains starting from the beginning if not specified.
  • Update affected implementations across supported checkpoint stores (Postgres, Mongo, Redis, SQL Server, ElasticSearch) and internal wrappers.

Non-compliant requirements:

  • Allow initializing a new subscription from "now" (the end of the stream) instead of the beginning when no checkpoint exists.
  • Ensure checkpoint stores honor the initial position when creating/loading a checkpoint that doesn't exist yet.

Requires further human verification:

  • Verify Postgres initialization logic actually sets the starting position to the latest message when initial position is End (integration test over real data).
  • Verify behavior for stores that currently ignore the parameter (Mongo, Redis, SQL Server, Elastic) matches expectations or is intentionally deferred.
  • Documentation update confirming configuration usage and cross-store support.
⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

The new SQL for inserting a checkpoint when initial position is End uses MAX(global_position) which may set the checkpoint to the very last recorded message position; confirm off-by-one semantics are correct for the subscriber to start from "now" (i.e., not reprocessing the last event nor skipping the first new one).

public string ReadStreamForwards  => $"select * from {schema}.read_stream_forwards(@_stream_name, @_from_position, @_count)";
public string ReadStreamBackwards => $"select * from {schema}.read_stream_backwards(@_stream_name, @_from_position, @_count)";
public string ReadStreamSub       => $"select * from {schema}.read_stream_sub(@_stream_id, @_stream_name, @_from_position, @_count)";
public string ReadAllForwards     => $"select * from {schema}.read_all_forwards(@_from_position, @_count)";
public string CheckStream         => $"select * from {schema}.check_stream(@_stream_name, @_expected_version)";
public string StreamExists        => $"select exists (select 1 from {schema}.streams where stream_name = (@name))";
public string TruncateStream      => $"select * from {schema}.truncate_stream(@_stream_name, @_expected_version, @_position)";
public string GetCheckpointSql    => $"select position from {schema}.checkpoints where id=(@checkpointId)";
public string AddCheckpointSql    => $"insert into {schema}.checkpoints (id, position) values (@checkpointId, case when @initialPosition = 'End' then (SELECT MAX(global_position) FROM {schema}.messages) else null end)";
public string UpdateCheckpointSql => $"update {schema}.checkpoints set position=(@position) where id=(@checkpointId)";
API Design

The enum CheckpointInitialPosition is public; consider whether naming and placement are appropriate and whether additional options (e.g., explicit numeric position) are needed. Ensure binary compatibility and serialization behavior are acceptable.

public enum CheckpointInitialPosition {
    Beginning,
    End
}

public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions {
    public int CheckpointCommitBatchSize { get; set; } = 100;
    public int CheckpointCommitDelayMs   { get; set; } = 5000;
    public CheckpointInitialPosition CheckpointInitialPosition { get; set; } = CheckpointInitialPosition.Beginning;
}
Incomplete Adoption

Most checkpoint stores accept the new parameter but ignore it. Validate that at least one store fully honors it (Postgres partially does) and align others or clearly document the limitation to avoid inconsistent behavior across providers.

[PublicAPI]
public interface ICheckpointStore {
    ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken, CheckpointInitialPosition initialPosition = CheckpointInitialPosition.Beginning);

    ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken);

@alexeyzimarev alexeyzimarev requested a review from Copilot October 31, 2025 12:23
@github-actions
Copy link

github-actions bot commented Oct 31, 2025

Test Results

 34 files   34 suites   18m 5s ⏱️
234 tests 234 ✅ 0 💤 0 ❌
474 runs  474 ✅ 0 💤 0 ❌

Results for commit b264394.

♻️ This comment has been updated with latest results.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@mathantunes mathantunes force-pushed the feat/checkpoint-start branch 2 times, most recently from 1c1100b to 311ee55 Compare November 3, 2025 21:26
Using 3 record types, this commit allows a subscription to define its
starting point.

It can be Beginning, End or From a specific position.Create CheckpointInitialPosition on SubscriptionWithCheckpointOptions

Using 3 record types, this commit allows a subscription to define its
starting point.

It can be Beginning, End or From a specific position.
@mathantunes mathantunes force-pushed the feat/checkpoint-start branch from 311ee55 to 2ae6a27 Compare November 3, 2025 21:31
@mathantunes
Copy link
Contributor Author

mathantunes commented Nov 3, 2025

I'm questioning a bit the approach based on the fact that from what I understand, the checkpoint storage may not be the same as the event store.

While implementing the postgres position check, I can modify the AddCheckpointSql to access the messages table.
But when implementing this same feature on Redis, for example, there is no way to access the messages table to resolve the latest position.

Now if the library allows mixing MongoDb for event store and postgres for checkpoints, the proposed approach will not work.

Instead of introducing an End case, I suppose we could have only a ulong CheckpointInitialPosition which requires the developer to estimate the end of the stream beforehand, which is not ideal.

I'm closing this PR for now until further discussion.

@mathantunes mathantunes closed this Nov 3, 2025
@alexeyzimarev
Copy link
Contributor

I think all event stores support subscribing from the end. Basically, the start position option was the right approach, but delegating the logic to figure out the end of the log to the checkpoint store indeed will not work. It should be done by the subscription, when it gets an empty checkpoint from the checkpoint store.

@mathantunes mathantunes mentioned this pull request Nov 11, 2025
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants