From 325aec3dec918841f80066e9a471212bef19a5cc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:28:28 +0000 Subject: [PATCH 1/5] Initial plan From 5985bf35fdc1861e9dfd83013e04b97daba2df36 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:33:10 +0000 Subject: [PATCH 2/5] Fix PostgreSQL read_stream_backwards guard clause Co-authored-by: alexeyzimarev <2821205+alexeyzimarev@users.noreply.github.com> --- .../Scripts/5_ReadStreamBackwards.sql | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql index c3cdcc4c..b1bd416f 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql @@ -25,7 +25,15 @@ begin raise exception 'StreamNotFound'; end if; - if _current_version < _from_position + _count then + -- nothing to read / invalid request + if _count <= 0 then + return; + end if; + + -- Validate the starting position for backwards read. + if _from_position < 0 -- A negative starting position is invalid + or _from_position > _current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream + then return; end if; From 5619006dcd087e269ce087d120dea7ca9b74f0d7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:50:55 +0000 Subject: [PATCH 3/5] Add backwards read tests to StoreReadTests base class Co-authored-by: alexeyzimarev <2821205+alexeyzimarev@users.noreply.github.com> --- .../Store/Read.cs | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs index dbc02c69..c41318c3 100644 --- a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs +++ b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs @@ -83,4 +83,64 @@ await Assert.That(result[0].Metadata.ToDictionary(m => m.Key, m => ((JsonElement .ContainsKey("Key1") .And.ContainsKey("Key2"); } + + [Test] + [Category("Store")] + public async Task ShouldReadBackwardsFromEnd(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(9), 3, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(3); + // Events should be in reverse order: positions 9, 8, 7 + await Assert.That(result[0].Payload).IsEquivalentTo(events[9]); + await Assert.That(result[1].Payload).IsEquivalentTo(events[8]); + await Assert.That(result[2].Payload).IsEquivalentTo(events[7]); + } + + [Test] + [Category("Store")] + public async Task ShouldReadBackwardsFromMiddle(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(20).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(10), 5, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(5); + // Events should be in reverse order: positions 10, 9, 8, 7, 6 + await Assert.That(result[0].Payload).IsEquivalentTo(events[10]); + await Assert.That(result[1].Payload).IsEquivalentTo(events[9]); + await Assert.That(result[2].Payload).IsEquivalentTo(events[8]); + await Assert.That(result[3].Payload).IsEquivalentTo(events[7]); + await Assert.That(result[4].Payload).IsEquivalentTo(events[6]); + } + + [Test] + [Category("Store")] + public async Task ShouldReturnEmptyWhenReadingBackwardsFromBeyondStreamEnd(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + // Try to read from position 20 when stream only has events at positions 0-9 + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(20), 5, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(0); + } + + [Test] + [Category("Store")] + public async Task ShouldReturnEmptyWhenReadingBackwardsFromNegativePosition(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + // Try to read from negative position + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(-1), 5, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(0); + } } From 9adc069ca7c99e61a9ce8e0a1bcaade48b54f521 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 14 Nov 2025 13:47:49 +0100 Subject: [PATCH 4/5] Wrap up reading backwards for SQL stores --- .../Store/Read.cs | 29 +++++++-- .../CompositionHandlerTests.cs | 6 +- .../Scripts/5_ReadStreamBackwards.sql | 13 ++-- .../Scripts/6_ReadStreamForwards.sql | 6 +- .../Registrations/RegistrationTests.cs | 60 ++++++++++--------- .../Eventuous.Sql.Base/SqlEventStoreBase.cs | 6 +- .../Registrations/RegistrationTests.cs | 48 ++++++++------- 7 files changed, 102 insertions(+), 66 deletions(-) diff --git a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs index c41318c3..edcc230c 100644 --- a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs +++ b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs @@ -84,6 +84,21 @@ await Assert.That(result[0].Metadata.ToDictionary(m => m.Key, m => ((JsonElement .And.ContainsKey("Key2"); } + [Test] + [Category("Store")] + public async Task ShouldThrowWhenReadingForwardsFromNegativePosition(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + await Assert.ThrowsAsync(ReadFunc); + + return; + + // Try to read from negative position + Task ReadFunc() => _fixture.EventStore.ReadEvents(streamName, new(-10), 5, true, cancellationToken); + } + [Test] [Category("Store")] public async Task ShouldReadBackwardsFromEnd(CancellationToken cancellationToken) { @@ -120,7 +135,7 @@ public async Task ShouldReadBackwardsFromMiddle(CancellationToken cancellationTo [Test] [Category("Store")] - public async Task ShouldReturnEmptyWhenReadingBackwardsFromBeyondStreamEnd(CancellationToken cancellationToken) { + public async Task ShouldReturnWhenReadingBackwards(CancellationToken cancellationToken) { object[] events = _fixture.CreateEvents(10).ToArray(); var streamName = Helpers.GetStreamName(); await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); @@ -128,19 +143,21 @@ public async Task ShouldReturnEmptyWhenReadingBackwardsFromBeyondStreamEnd(Cance // Try to read from position 20 when stream only has events at positions 0-9 var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(20), 5, true, cancellationToken); - await Assert.That(result.Length).IsEqualTo(0); + await Assert.That(result.Length).IsEqualTo(5); } [Test] [Category("Store")] - public async Task ShouldReturnEmptyWhenReadingBackwardsFromNegativePosition(CancellationToken cancellationToken) { + public async Task ShouldThrowWhenReadingBackwardsFromNegativePosition(CancellationToken cancellationToken) { object[] events = _fixture.CreateEvents(10).ToArray(); var streamName = Helpers.GetStreamName(); await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); - // Try to read from negative position - var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(-1), 5, true, cancellationToken); + await Assert.ThrowsAsync(ReadFunc); + + return; - await Assert.That(result.Length).IsEqualTo(0); + // Try to read from negative position + Task ReadFunc() => _fixture.EventStore.ReadEventsBackwards(streamName, new(-10), 5, true, cancellationToken); } } diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs index 8fba7024..48c3139b 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs @@ -105,17 +105,17 @@ class TestSub(TestOptions options, ConsumePipe consumePipe) } public class TestDependency { - public string Value { get; } = "test-value"; + public string Value => "test-value"; } - public class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { + class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { public TestDependency Dependency { get; } = dependency; public override ValueTask HandleEvent(IMessageConsumeContext ctx) => logger.EventReceived(GetType(), ctx); } - public class CompositionWrapper(IEventHandler innerHandler, TestHandlerLogger logger) : BaseEventHandler { + class CompositionWrapper(IEventHandler _, TestHandlerLogger logger) : BaseEventHandler { public override ValueTask HandleEvent(IMessageConsumeContext ctx) { // Wrap the inner handler call - this simulates what PollyEventHandler does return logger.EventReceived(GetType(), ctx); diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql index b1bd416f..8447128d 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql @@ -17,7 +17,7 @@ declare _current_version integer; _stream_id integer; begin - select s.stream_id into _current_version, _stream_id + select s.version, s.stream_id into _current_version, _stream_id from __schema__.streams s where s.stream_name = _stream_name; @@ -30,16 +30,19 @@ begin return; end if; + if _from_position < 0 then -- A negative starting position is invalid + raise exception 'InvalidStartingPosition'; + end if; + -- Validate the starting position for backwards read. - if _from_position < 0 -- A negative starting position is invalid - or _from_position > _current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream + if _from_position > _current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream then - return; + _from_position = _current_version; end if; return query select m.message_id, m.message_type, m.stream_position, m.global_position, m.json_data, m.json_metadata, m.created - from __schema__.messages m + from __schema__.messages m where m.stream_id = _stream_id and m.stream_position <= _from_position order by m.stream_position desc limit _count; diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql index dd31c2d9..1b8d5b17 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql @@ -25,13 +25,17 @@ begin raise exception 'StreamNotFound'; end if; + if _from_position < 0 then -- A negative starting position is invalid + raise exception 'InvalidStartingPosition'; + end if; + if _current_version < _from_position then return; end if; return query select m.message_id, m.message_type, m.stream_position, m.global_position, m.json_data, m.json_metadata, m.created - from __schema__.messages m + from __schema__.messages m where m.stream_id = _stream_id and m.stream_position >= _from_position order by m.stream_position limit _count; diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs index 7955298c..7b8c6692 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs @@ -1,8 +1,10 @@ using Eventuous.Diagnostics.Tracing; using Eventuous.Postgresql; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Npgsql; using Assert = TUnit.Assertions.Assert; @@ -13,40 +15,44 @@ public class RegistrationTests { [Test] public async Task Should_resolve_store_with_manual_registration() { - var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build(); - var builder = new WebHostBuilder(); - builder.Configure(_ => { }); - - builder.ConfigureServices( - services => { - services.AddEventStore(); - services.AddSingleton(ds); - services.AddSingleton(new PostgresStoreOptions()); - } - ); - var app = builder.Build(); - var aggregateStore = app.Services.GetRequiredService(); + var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build(); + + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureServices(services => { + services.AddEventStore(); + services.AddSingleton(ds); + services.AddSingleton(new PostgresStoreOptions()); + } + ) + ) + .Build(); + + var aggregateStore = host.Services.GetRequiredService(); await Assert.That(aggregateStore).IsNotNull(); } [Test] public async Task Should_resolve_store_with_extensions() { - var builder = new WebHostBuilder(); - var config = new Dictionary { - ["postgres:schema"] = "test", + var config = new Dictionary { + ["postgres:schema"] = "test", ["postgres:connectionString"] = ConnectionString }; - builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)); - builder.Configure(_ => { }); - - builder.ConfigureServices( - (ctx, services) => { - services.AddEventStore(); - services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres")); - } - ); - var app = builder.Build(); - var reader = app.Services.GetService(); + + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)) + .ConfigureServices((ctx, services) => { + services.AddEventStore(); + services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres")); + } + ) + ) + .Build(); + + var reader = host.Services.GetService(); var npgSqlReader = ((reader as TracedEventStore)!).Inner as PostgresStore; await Assert.That(npgSqlReader).IsNotNull(); await Assert.That(npgSqlReader!.Schema.StreamMessage).IsEqualTo("test.stream_message"); diff --git a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs index 30a342db..d4e40771 100644 --- a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs @@ -93,6 +93,8 @@ StreamTruncatePosition position [RequiresDynamicCode(Constants.DynamicSerializationMessage)] [RequiresUnreferencedCode(Constants.DynamicSerializationMessage)] public async Task ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { + if (count <= 0 || start == StreamReadPosition.End) return []; + await using var connection = await OpenConnection(cancellationToken).NoContext(); await using var cmd = GetReadCommand(connection, stream, start, count); @@ -103,6 +105,8 @@ public async Task ReadEvents(StreamName stream, StreamReadPositio [RequiresDynamicCode(Constants.DynamicSerializationMessage)] [RequiresUnreferencedCode(Constants.DynamicSerializationMessage)] public async Task ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { + if (count <= 0) return []; + await using var connection = await OpenConnection(cancellationToken).NoContext(); await using var cmd = GetReadBackwardsCommand(connection, stream, start, count); @@ -123,7 +127,7 @@ async Task ReadInternal(DbCommand cmd, StreamName stream, bool fa return failIfNotFound ? throw new StreamNotFound(stream) : []; } - throw; + throw new ReadFromStreamException(stream, e); } } diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs index 098d836d..ffa90849 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs @@ -1,8 +1,10 @@ using Eventuous.Diagnostics.Tracing; using Eventuous.SqlServer; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Shouldly; namespace Eventuous.Tests.SqlServer.Registrations; @@ -12,17 +14,17 @@ public class RegistrationTests { [Test] public void Should_resolve_store_with_manual_registration() { - var builder = new WebHostBuilder(); - builder.Configure(_ => { }); - - builder.ConfigureServices( - services => { - services.AddEventStore(); - services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString }); - } - ); - var app = builder.Build(); - var store = app.Services.GetRequiredService(); + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureServices(services => { + services.AddEventStore(); + services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString }); + } + ) + ) + .Build(); + var store = host.Services.GetRequiredService(); store.ShouldBeOfType(); var innerStore = ((TracedEventStore)store).Inner; innerStore.ShouldBeOfType(); @@ -30,23 +32,23 @@ public void Should_resolve_store_with_manual_registration() { [Test] public void Should_resolve_store_with_extensions() { - var builder = new WebHostBuilder(); - var config = new Dictionary { ["sqlserver:schema"] = "test", ["sqlserver:connectionString"] = ConnectionString }; - builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)); - builder.Configure(_ => { }); - builder.ConfigureServices( - (ctx, services) => { - services.AddEventStore(); - services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver")); - } - ); - var app = builder.Build(); - var store = app.Services.GetService(); + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)) + .ConfigureServices((ctx, services) => { + services.AddEventStore(); + services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver")); + } + ) + ) + .Build(); + var store = host.Services.GetService(); store.ShouldNotBeNull(); var inner = ((store as TracedEventStore)!).Inner as SqlServerStore; inner.ShouldNotBeNull(); From 8d62403d27b90a49fda8f19c869811e5297307e5 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 14 Nov 2025 14:04:54 +0100 Subject: [PATCH 5/5] Fix discrepancies in reads backwards --- .../Scripts/5_ReadStreamBackwards.sql | 7 ++++--- .../Scripts/5_ReadStreamBackwards.sql | 9 +++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql index 8447128d..d8717e95 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql @@ -30,12 +30,13 @@ begin return; end if; - if _from_position < 0 then -- A negative starting position is invalid + -- A negative starting position is invalid + if _from_position < 0 then raise exception 'InvalidStartingPosition'; end if; - -- Validate the starting position for backwards read. - if _from_position > _current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream + -- If the starting position is greater than the current version, set it to the current version. + if _from_position > _current_version then _from_position = _current_version; end if; diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql index 6f6caa82..fa91c170 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql @@ -29,12 +29,17 @@ BEGIN END; -- Validate the starting position for backwards read. - IF @from_position < 0 -- A negative starting position is invalid - OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream + IF @from_position < 0 BEGIN RETURN; END; + -- If the starting position is greater than the current version, set it to the current version. + IF @from_position > @current_version + BEGIN + SET @from_position = @current_version; + END; + SELECT TOP (@count) MessageId, MessageType,