diff --git a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs index dbc02c69..edcc230c 100644 --- a/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs +++ b/src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs @@ -83,4 +83,81 @@ await Assert.That(result[0].Metadata.ToDictionary(m => m.Key, m => ((JsonElement .ContainsKey("Key1") .And.ContainsKey("Key2"); } + + [Test] + [Category("Store")] + public async Task ShouldThrowWhenReadingForwardsFromNegativePosition(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + await Assert.ThrowsAsync(ReadFunc); + + return; + + // Try to read from negative position + Task ReadFunc() => _fixture.EventStore.ReadEvents(streamName, new(-10), 5, true, cancellationToken); + } + + [Test] + [Category("Store")] + public async Task ShouldReadBackwardsFromEnd(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(9), 3, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(3); + // Events should be in reverse order: positions 9, 8, 7 + await Assert.That(result[0].Payload).IsEquivalentTo(events[9]); + await Assert.That(result[1].Payload).IsEquivalentTo(events[8]); + await Assert.That(result[2].Payload).IsEquivalentTo(events[7]); + } + + [Test] + [Category("Store")] + public async Task ShouldReadBackwardsFromMiddle(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(20).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(10), 5, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(5); + // Events should be in reverse order: positions 10, 9, 8, 7, 6 + await Assert.That(result[0].Payload).IsEquivalentTo(events[10]); + await Assert.That(result[1].Payload).IsEquivalentTo(events[9]); + await Assert.That(result[2].Payload).IsEquivalentTo(events[8]); + await Assert.That(result[3].Payload).IsEquivalentTo(events[7]); + await Assert.That(result[4].Payload).IsEquivalentTo(events[6]); + } + + [Test] + [Category("Store")] + public async Task ShouldReturnWhenReadingBackwards(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + // Try to read from position 20 when stream only has events at positions 0-9 + var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(20), 5, true, cancellationToken); + + await Assert.That(result.Length).IsEqualTo(5); + } + + [Test] + [Category("Store")] + public async Task ShouldThrowWhenReadingBackwardsFromNegativePosition(CancellationToken cancellationToken) { + object[] events = _fixture.CreateEvents(10).ToArray(); + var streamName = Helpers.GetStreamName(); + await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream); + + await Assert.ThrowsAsync(ReadFunc); + + return; + + // Try to read from negative position + Task ReadFunc() => _fixture.EventStore.ReadEventsBackwards(streamName, new(-10), 5, true, cancellationToken); + } } diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs index 8fba7024..48c3139b 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs @@ -105,17 +105,17 @@ class TestSub(TestOptions options, ConsumePipe consumePipe) } public class TestDependency { - public string Value { get; } = "test-value"; + public string Value => "test-value"; } - public class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { + class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler { public TestDependency Dependency { get; } = dependency; public override ValueTask HandleEvent(IMessageConsumeContext ctx) => logger.EventReceived(GetType(), ctx); } - public class CompositionWrapper(IEventHandler innerHandler, TestHandlerLogger logger) : BaseEventHandler { + class CompositionWrapper(IEventHandler _, TestHandlerLogger logger) : BaseEventHandler { public override ValueTask HandleEvent(IMessageConsumeContext ctx) { // Wrap the inner handler call - this simulates what PollyEventHandler does return logger.EventReceived(GetType(), ctx); diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql index c3cdcc4c..d8717e95 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/5_ReadStreamBackwards.sql @@ -17,7 +17,7 @@ declare _current_version integer; _stream_id integer; begin - select s.stream_id into _current_version, _stream_id + select s.version, s.stream_id into _current_version, _stream_id from __schema__.streams s where s.stream_name = _stream_name; @@ -25,13 +25,25 @@ begin raise exception 'StreamNotFound'; end if; - if _current_version < _from_position + _count then + -- nothing to read / invalid request + if _count <= 0 then return; end if; + -- A negative starting position is invalid + if _from_position < 0 then + raise exception 'InvalidStartingPosition'; + end if; + + -- If the starting position is greater than the current version, set it to the current version. + if _from_position > _current_version + then + _from_position = _current_version; + end if; + return query select m.message_id, m.message_type, m.stream_position, m.global_position, m.json_data, m.json_metadata, m.created - from __schema__.messages m + from __schema__.messages m where m.stream_id = _stream_id and m.stream_position <= _from_position order by m.stream_position desc limit _count; diff --git a/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql b/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql index dd31c2d9..1b8d5b17 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql +++ b/src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql @@ -25,13 +25,17 @@ begin raise exception 'StreamNotFound'; end if; + if _from_position < 0 then -- A negative starting position is invalid + raise exception 'InvalidStartingPosition'; + end if; + if _current_version < _from_position then return; end if; return query select m.message_id, m.message_type, m.stream_position, m.global_position, m.json_data, m.json_metadata, m.created - from __schema__.messages m + from __schema__.messages m where m.stream_id = _stream_id and m.stream_position >= _from_position order by m.stream_position limit _count; diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs index 7955298c..7b8c6692 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Registrations/RegistrationTests.cs @@ -1,8 +1,10 @@ using Eventuous.Diagnostics.Tracing; using Eventuous.Postgresql; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Npgsql; using Assert = TUnit.Assertions.Assert; @@ -13,40 +15,44 @@ public class RegistrationTests { [Test] public async Task Should_resolve_store_with_manual_registration() { - var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build(); - var builder = new WebHostBuilder(); - builder.Configure(_ => { }); - - builder.ConfigureServices( - services => { - services.AddEventStore(); - services.AddSingleton(ds); - services.AddSingleton(new PostgresStoreOptions()); - } - ); - var app = builder.Build(); - var aggregateStore = app.Services.GetRequiredService(); + var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build(); + + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureServices(services => { + services.AddEventStore(); + services.AddSingleton(ds); + services.AddSingleton(new PostgresStoreOptions()); + } + ) + ) + .Build(); + + var aggregateStore = host.Services.GetRequiredService(); await Assert.That(aggregateStore).IsNotNull(); } [Test] public async Task Should_resolve_store_with_extensions() { - var builder = new WebHostBuilder(); - var config = new Dictionary { - ["postgres:schema"] = "test", + var config = new Dictionary { + ["postgres:schema"] = "test", ["postgres:connectionString"] = ConnectionString }; - builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)); - builder.Configure(_ => { }); - - builder.ConfigureServices( - (ctx, services) => { - services.AddEventStore(); - services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres")); - } - ); - var app = builder.Build(); - var reader = app.Services.GetService(); + + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)) + .ConfigureServices((ctx, services) => { + services.AddEventStore(); + services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres")); + } + ) + ) + .Build(); + + var reader = host.Services.GetService(); var npgSqlReader = ((reader as TracedEventStore)!).Inner as PostgresStore; await Assert.That(npgSqlReader).IsNotNull(); await Assert.That(npgSqlReader!.Schema.StreamMessage).IsEqualTo("test.stream_message"); diff --git a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs index 30a342db..d4e40771 100644 --- a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs @@ -93,6 +93,8 @@ StreamTruncatePosition position [RequiresDynamicCode(Constants.DynamicSerializationMessage)] [RequiresUnreferencedCode(Constants.DynamicSerializationMessage)] public async Task ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { + if (count <= 0 || start == StreamReadPosition.End) return []; + await using var connection = await OpenConnection(cancellationToken).NoContext(); await using var cmd = GetReadCommand(connection, stream, start, count); @@ -103,6 +105,8 @@ public async Task ReadEvents(StreamName stream, StreamReadPositio [RequiresDynamicCode(Constants.DynamicSerializationMessage)] [RequiresUnreferencedCode(Constants.DynamicSerializationMessage)] public async Task ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { + if (count <= 0) return []; + await using var connection = await OpenConnection(cancellationToken).NoContext(); await using var cmd = GetReadBackwardsCommand(connection, stream, start, count); @@ -123,7 +127,7 @@ async Task ReadInternal(DbCommand cmd, StreamName stream, bool fa return failIfNotFound ? throw new StreamNotFound(stream) : []; } - throw; + throw new ReadFromStreamException(stream, e); } } diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql index 6f6caa82..fa91c170 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql @@ -29,12 +29,17 @@ BEGIN END; -- Validate the starting position for backwards read. - IF @from_position < 0 -- A negative starting position is invalid - OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream + IF @from_position < 0 BEGIN RETURN; END; + -- If the starting position is greater than the current version, set it to the current version. + IF @from_position > @current_version + BEGIN + SET @from_position = @current_version; + END; + SELECT TOP (@count) MessageId, MessageType, diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs index 098d836d..ffa90849 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Registrations/RegistrationTests.cs @@ -1,8 +1,10 @@ using Eventuous.Diagnostics.Tracing; using Eventuous.SqlServer; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Shouldly; namespace Eventuous.Tests.SqlServer.Registrations; @@ -12,17 +14,17 @@ public class RegistrationTests { [Test] public void Should_resolve_store_with_manual_registration() { - var builder = new WebHostBuilder(); - builder.Configure(_ => { }); - - builder.ConfigureServices( - services => { - services.AddEventStore(); - services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString }); - } - ); - var app = builder.Build(); - var store = app.Services.GetRequiredService(); + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureServices(services => { + services.AddEventStore(); + services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString }); + } + ) + ) + .Build(); + var store = host.Services.GetRequiredService(); store.ShouldBeOfType(); var innerStore = ((TracedEventStore)store).Inner; innerStore.ShouldBeOfType(); @@ -30,23 +32,23 @@ public void Should_resolve_store_with_manual_registration() { [Test] public void Should_resolve_store_with_extensions() { - var builder = new WebHostBuilder(); - var config = new Dictionary { ["sqlserver:schema"] = "test", ["sqlserver:connectionString"] = ConnectionString }; - builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)); - builder.Configure(_ => { }); - builder.ConfigureServices( - (ctx, services) => { - services.AddEventStore(); - services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver")); - } - ); - var app = builder.Build(); - var store = app.Services.GetService(); + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)) + .ConfigureServices((ctx, services) => { + services.AddEventStore(); + services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver")); + } + ) + ) + .Build(); + var store = host.Services.GetService(); store.ShouldNotBeNull(); var inner = ((store as TracedEventStore)!).Inner as SqlServerStore; inner.ShouldNotBeNull();