Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,81 @@ await Assert.That(result[0].Metadata.ToDictionary(m => m.Key, m => ((JsonElement
.ContainsKey("Key1")
.And.ContainsKey("Key2");
}

[Test]
[Category("Store")]
public async Task ShouldThrowWhenReadingForwardsFromNegativePosition(CancellationToken cancellationToken) {
object[] events = _fixture.CreateEvents(10).ToArray();
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

await Assert.ThrowsAsync(ReadFunc);

return;

// Try to read from negative position
Task<StreamEvent[]> ReadFunc() => _fixture.EventStore.ReadEvents(streamName, new(-10), 5, true, cancellationToken);
}

[Test]
[Category("Store")]
public async Task ShouldReadBackwardsFromEnd(CancellationToken cancellationToken) {
object[] events = _fixture.CreateEvents(10).ToArray();
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(9), 3, true, cancellationToken);

await Assert.That(result.Length).IsEqualTo(3);
// Events should be in reverse order: positions 9, 8, 7
await Assert.That(result[0].Payload).IsEquivalentTo(events[9]);
await Assert.That(result[1].Payload).IsEquivalentTo(events[8]);
await Assert.That(result[2].Payload).IsEquivalentTo(events[7]);
}

[Test]
[Category("Store")]
public async Task ShouldReadBackwardsFromMiddle(CancellationToken cancellationToken) {
object[] events = _fixture.CreateEvents(20).ToArray();
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(10), 5, true, cancellationToken);

await Assert.That(result.Length).IsEqualTo(5);
// Events should be in reverse order: positions 10, 9, 8, 7, 6
await Assert.That(result[0].Payload).IsEquivalentTo(events[10]);
await Assert.That(result[1].Payload).IsEquivalentTo(events[9]);
await Assert.That(result[2].Payload).IsEquivalentTo(events[8]);
await Assert.That(result[3].Payload).IsEquivalentTo(events[7]);
await Assert.That(result[4].Payload).IsEquivalentTo(events[6]);
}

[Test]
[Category("Store")]
public async Task ShouldReturnWhenReadingBackwards(CancellationToken cancellationToken) {
object[] events = _fixture.CreateEvents(10).ToArray();
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

// Try to read from position 20 when stream only has events at positions 0-9
var result = await _fixture.EventStore.ReadEventsBackwards(streamName, new(20), 5, true, cancellationToken);

await Assert.That(result.Length).IsEqualTo(5);
}

[Test]
[Category("Store")]
public async Task ShouldThrowWhenReadingBackwardsFromNegativePosition(CancellationToken cancellationToken) {
object[] events = _fixture.CreateEvents(10).ToArray();
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

await Assert.ThrowsAsync(ReadFunc);

return;

// Try to read from negative position
Task<StreamEvent[]> ReadFunc() => _fixture.EventStore.ReadEventsBackwards(streamName, new(-10), 5, true, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@
}

public class TestDependency {
public string Value { get; } = "test-value";
public string Value => "test-value";
}

public class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler {
class TestHandler(TestDependency dependency, TestHandlerLogger logger) : BaseEventHandler {
public TestDependency Dependency { get; } = dependency;

public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext ctx)
=> logger.EventReceived(GetType(), ctx);
}

public class CompositionWrapper(IEventHandler innerHandler, TestHandlerLogger logger) : BaseEventHandler {
class CompositionWrapper(IEventHandler _, TestHandlerLogger logger) : BaseEventHandler {

Check warning on line 118 in src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Parameter '_' is unread.

Check warning on line 118 in src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Parameter '_' is unread.

Check warning on line 118 in src/Core/test/Eventuous.Tests.Subscriptions/CompositionHandlerTests.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Parameter '_' is unread.
public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext ctx) {
// Wrap the inner handler call - this simulates what PollyEventHandler does
return logger.EventReceived(GetType(), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,33 @@ declare
_current_version integer;
_stream_id integer;
begin
select s.stream_id into _current_version, _stream_id
select s.version, s.stream_id into _current_version, _stream_id
from __schema__.streams s
where s.stream_name = _stream_name;

if _stream_id is null then
raise exception 'StreamNotFound';
end if;

if _current_version < _from_position + _count then
-- nothing to read / invalid request
if _count <= 0 then
return;
end if;

-- A negative starting position is invalid
if _from_position < 0 then
raise exception 'InvalidStartingPosition';
end if;

-- If the starting position is greater than the current version, set it to the current version.
if _from_position > _current_version
then
_from_position = _current_version;
end if;

return query select m.message_id, m.message_type, m.stream_position, m.global_position,
m.json_data, m.json_metadata, m.created
from __schema__.messages m
from __schema__.messages m
where m.stream_id = _stream_id and m.stream_position <= _from_position
order by m.stream_position desc
limit _count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ begin
raise exception 'StreamNotFound';
end if;

if _from_position < 0 then -- A negative starting position is invalid
raise exception 'InvalidStartingPosition';
end if;

if _current_version < _from_position then
return;
end if;

return query select m.message_id, m.message_type, m.stream_position, m.global_position,
m.json_data, m.json_metadata, m.created
from __schema__.messages m
from __schema__.messages m
where m.stream_id = _stream_id and m.stream_position >= _from_position
order by m.stream_position
limit _count;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Eventuous.Diagnostics.Tracing;
using Eventuous.Postgresql;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Assert = TUnit.Assertions.Assert;

Expand All @@ -13,40 +15,44 @@ public class RegistrationTests {

[Test]
public async Task Should_resolve_store_with_manual_registration() {
var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build();
var builder = new WebHostBuilder();
builder.Configure(_ => { });

builder.ConfigureServices(
services => {
services.AddEventStore<PostgresStore>();
services.AddSingleton(ds);
services.AddSingleton(new PostgresStoreOptions());
}
);
var app = builder.Build();
var aggregateStore = app.Services.GetRequiredService<IEventStore>();
var ds = new NpgsqlDataSourceBuilder(ConnectionString).Build();

using var host = new HostBuilder()
.ConfigureWebHost(webHostBuilder => webHostBuilder
.UseTestServer()
.ConfigureServices(services => {
services.AddEventStore<PostgresStore>();
services.AddSingleton(ds);
services.AddSingleton(new PostgresStoreOptions());
}
)
)
.Build();

var aggregateStore = host.Services.GetRequiredService<IEventStore>();
await Assert.That(aggregateStore).IsNotNull();
}

[Test]
public async Task Should_resolve_store_with_extensions() {
var builder = new WebHostBuilder();
var config = new Dictionary<string, string?> {
["postgres:schema"] = "test",
var config = new Dictionary<string, string?> {
["postgres:schema"] = "test",
["postgres:connectionString"] = ConnectionString
};
builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config));
builder.Configure(_ => { });

builder.ConfigureServices(
(ctx, services) => {
services.AddEventStore<PostgresStore>();
services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres"));
}
);
var app = builder.Build();
var reader = app.Services.GetService<IEventStore>();

using var host = new HostBuilder()
.ConfigureWebHost(webHostBuilder => webHostBuilder
.UseTestServer()
.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config))
.ConfigureServices((ctx, services) => {
services.AddEventStore<PostgresStore>();
services.AddEventuousPostgres(ctx.Configuration.GetSection("postgres"));
}
)
)
.Build();

var reader = host.Services.GetService<IEventStore>();
var npgSqlReader = ((reader as TracedEventStore)!).Inner as PostgresStore;
await Assert.That(npgSqlReader).IsNotNull();
await Assert.That(npgSqlReader!.Schema.StreamMessage).IsEqualTo("test.stream_message");
Expand Down
6 changes: 5 additions & 1 deletion src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ StreamTruncatePosition position
[RequiresDynamicCode(Constants.DynamicSerializationMessage)]
[RequiresUnreferencedCode(Constants.DynamicSerializationMessage)]
public async Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) {
if (count <= 0 || start == StreamReadPosition.End) return [];

await using var connection = await OpenConnection(cancellationToken).NoContext();
await using var cmd = GetReadCommand(connection, stream, start, count);

Expand All @@ -103,6 +105,8 @@ public async Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPositio
[RequiresDynamicCode(Constants.DynamicSerializationMessage)]
[RequiresUnreferencedCode(Constants.DynamicSerializationMessage)]
public async Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) {
if (count <= 0) return [];

await using var connection = await OpenConnection(cancellationToken).NoContext();
await using var cmd = GetReadBackwardsCommand(connection, stream, start, count);

Expand All @@ -123,7 +127,7 @@ async Task<StreamEvent[]> ReadInternal(DbCommand cmd, StreamName stream, bool fa
return failIfNotFound ? throw new StreamNotFound(stream) : [];
}

throw;
throw new ReadFromStreamException(stream, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ BEGIN
END;

-- Validate the starting position for backwards read.
IF @from_position < 0 -- A negative starting position is invalid
OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream
IF @from_position < 0
BEGIN
RETURN;
END;

-- If the starting position is greater than the current version, set it to the current version.
IF @from_position > @current_version
BEGIN
SET @from_position = @current_version;
END;

SELECT TOP (@count)
MessageId,
MessageType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Eventuous.Diagnostics.Tracing;
using Eventuous.SqlServer;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;

namespace Eventuous.Tests.SqlServer.Registrations;
Expand All @@ -12,41 +14,41 @@ public class RegistrationTests {

[Test]
public void Should_resolve_store_with_manual_registration() {
var builder = new WebHostBuilder();
builder.Configure(_ => { });

builder.ConfigureServices(
services => {
services.AddEventStore<SqlServerStore>();
services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString });
}
);
var app = builder.Build();
var store = app.Services.GetRequiredService<IEventStore>();
using var host = new HostBuilder()
.ConfigureWebHost(webHostBuilder => webHostBuilder
.UseTestServer()
.ConfigureServices(services => {
services.AddEventStore<SqlServerStore>();
services.AddSingleton(new SqlServerStoreOptions { ConnectionString = ConnectionString });
}
)
)
.Build();
var store = host.Services.GetRequiredService<IEventStore>();
store.ShouldBeOfType<TracedEventStore>();
var innerStore = ((TracedEventStore)store).Inner;
innerStore.ShouldBeOfType<SqlServerStore>();
}

[Test]
public void Should_resolve_store_with_extensions() {
var builder = new WebHostBuilder();

var config = new Dictionary<string, string?> {
["sqlserver:schema"] = "test",
["sqlserver:connectionString"] = ConnectionString
};
builder.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config));
builder.Configure(_ => { });

builder.ConfigureServices(
(ctx, services) => {
services.AddEventStore<SqlServerStore>();
services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver"));
}
);
var app = builder.Build();
var store = app.Services.GetService<IEventStore>();
using var host = new HostBuilder()
.ConfigureWebHost(webHostBuilder => webHostBuilder
.UseTestServer()
.ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config))
.ConfigureServices((ctx, services) => {
services.AddEventStore<SqlServerStore>();
services.AddEventuousSqlServer(ctx.Configuration.GetSection("sqlserver"));
}
)
)
.Build();
var store = host.Services.GetService<IEventStore>();
store.ShouldNotBeNull();
var inner = ((store as TracedEventStore)!).Inner as SqlServerStore;
inner.ShouldNotBeNull();
Expand Down
Loading