diff --git a/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs b/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs index a428e14c..d68b13e8 100644 --- a/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs +++ b/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs @@ -163,12 +163,12 @@ static void Generate(SourceProductionContext context, string assemblyName, Immut sb.AppendLine(); sb.AppendLine($"internal static class {className} {{"); sb.AppendLine(" [ModuleInitializer]"); - sb.AppendLine(" internal static void Initialize() => Register(TypeMap.Instance);"); + sb.AppendLine(" internal static void Initialize() => RegisterDiscoveredTypes(TypeMap.Instance);"); sb.AppendLine(); sb.AppendLine(" /// "); sb.AppendLine(" /// Registers all [EventType] types discovered at compile time into the provided mapper."); sb.AppendLine(" /// "); - sb.AppendLine(" public static void Register(TypeMapper mapper) {"); + sb.AppendLine(" public static void RegisterDiscoveredTypes(this TypeMapper mapper) {"); foreach (var m in distinct) { // Prefer passing the explicit name when we have it, so runtime reflection is avoided diff --git a/src/Core/src/Eventuous.Domain/Aggregate.cs b/src/Core/src/Eventuous.Domain/Aggregate.cs index a1f44206..bb53ce91 100644 --- a/src/Core/src/Eventuous.Domain/Aggregate.cs +++ b/src/Core/src/Eventuous.Domain/Aggregate.cs @@ -30,13 +30,13 @@ namespace Eventuous; /// It is used for optimistic concurrency to check if there were no changes made to the /// aggregate state between load and save for the current operation. /// - public int OriginalVersion => Original.Length - 1; + public long OriginalVersion { get; private set; } = -1; /// /// The current version is set to the original version when the aggregate is loaded from the store. /// It should increase for each state transition performed within the scope of the current operation. /// - public int CurrentVersion => OriginalVersion + Changes.Count; + public long CurrentVersion => OriginalVersion + Changes.Count; readonly List _changes = []; @@ -78,9 +78,10 @@ protected void EnsureExists(Func? getException = null) { return (previous, State); } - public void Load(IEnumerable events) { - Original = events.Where(x => x != null).ToArray()!; - State = Original.Aggregate(State, Fold); + public void Load(long version, IEnumerable events) { + Original = events.Where(x => x != null).ToArray()!; + OriginalVersion = version; + State = Original.Aggregate(State, Fold); return; diff --git a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs index c8e3143f..1907e364 100644 --- a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs +++ b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs @@ -128,7 +128,8 @@ public Task StoreAggregate( try { var events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); - aggregate.Load(events.Select(x => x.Payload)); + if (events.Length == 0) return aggregate; + aggregate.Load(events[^1].Revision, events.Select(x => x.Payload)); } catch (StreamNotFound) when (!failIfNotFound) { return aggregate; } catch (Exception e) { diff --git a/src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs b/src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs index 5b5906f4..ccce6ab7 100644 --- a/src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs +++ b/src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs @@ -15,8 +15,8 @@ public class TieredEventReader(IEventReader hotReader, IEventReader archiveReade public async Task ReadEvents(StreamName streamName, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { var hotEvents = await LoadStreamEvents(hotReader, start, count, true).NoContext(); - var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Position > start.Value - ? await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Position, !failIfNotFound).NoContext() + var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Revision > start.Value + ? await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Revision, !failIfNotFound).NoContext() : Enumerable.Empty(); return archivedEvents.Select(x => x with { FromArchive = true }).Concat(hotEvents).Distinct(Comparer).ToArray(); @@ -35,8 +35,8 @@ async Task LoadStreamEvents(IEventReader reader, StreamReadPositi public async Task ReadEventsBackwards(StreamName streamName, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) { var hotEvents = await LoadStreamEvents(hotReader, start, count, true).NoContext(); - var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Position > start.Value - count - ? await LoadStreamEvents(archiveReader, new(hotEvents[0].Position - 1), count - hotEvents.Length, failIfNotFound).NoContext() + var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Revision > start.Value - count + ? await LoadStreamEvents(archiveReader, new(hotEvents[0].Revision - 1), count - hotEvents.Length, failIfNotFound).NoContext() : Enumerable.Empty(); return hotEvents.Concat(archivedEvents.Select(x => x with { FromArchive = true })).Distinct(Comparer).ToArray(); @@ -53,8 +53,8 @@ async Task LoadStreamEvents(IEventReader reader, StreamReadPositi static readonly StreamEventPositionComparer Comparer = new(); class StreamEventPositionComparer : IEqualityComparer { - public bool Equals(StreamEvent x, StreamEvent y) => x.Position == y.Position; + public bool Equals(StreamEvent x, StreamEvent y) => x.Revision == y.Revision; - public int GetHashCode(StreamEvent obj) => obj.Position.GetHashCode(); + public int GetHashCode(StreamEvent obj) => obj.Revision.GetHashCode(); } } diff --git a/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs b/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs index 06fe26fc..75d4667f 100644 --- a/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs +++ b/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs @@ -27,7 +27,7 @@ public async Task> LoadState( try { var streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); var events = streamEvents.Select(x => x.Payload!).ToArray(); - var expectedVersion = events.Length == 0 ? ExpectedStreamVersion.NoStream : new(streamEvents.Last().Position); + var expectedVersion = events.Length == 0 ? ExpectedStreamVersion.NoStream : new(streamEvents.Last().Revision); return (new(streamName, expectedVersion, events)); } catch (StreamNotFound) when (!failIfNotFound) { diff --git a/src/Core/src/Eventuous.Persistence/StreamEvent.cs b/src/Core/src/Eventuous.Persistence/StreamEvent.cs index 8a2273a7..d353b2ac 100644 --- a/src/Core/src/Eventuous.Persistence/StreamEvent.cs +++ b/src/Core/src/Eventuous.Persistence/StreamEvent.cs @@ -3,10 +3,10 @@ using System.Runtime.InteropServices; -namespace Eventuous; +namespace Eventuous; [StructLayout(LayoutKind.Auto)] public record struct NewStreamEvent(Guid Id, object? Payload, Metadata Metadata); [StructLayout(LayoutKind.Auto)] -public record struct StreamEvent(Guid Id, object? Payload, Metadata Metadata, string ContentType, long Position, bool FromArchive = false); \ No newline at end of file +public record struct StreamEvent(Guid Id, object? Payload, Metadata Metadata, string ContentType, long Revision, bool FromArchive = false); \ No newline at end of file diff --git a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.Amendments.cs b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.Amendments.cs index 060be14f..f52a0a33 100644 --- a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.Amendments.cs +++ b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.Amendments.cs @@ -9,7 +9,7 @@ public async Task Should_amend_event_from_command(CancellationToken cancellation var service = CreateService(amendEvent: AmendEvent); var cmd = CreateCommand(); - await service.Handle(cmd, cancellationToken); + var result = await service.Handle(cmd, cancellationToken); var stream = await Store.ReadStream(StreamName.For(cmd.BookingId), StreamReadPosition.Start, cancellationToken: cancellationToken); await Assert.That(stream[0].Metadata["userId"]).IsEqualTo(cmd.ImportedBy); diff --git a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs index 771fc4a2..09d75309 100644 --- a/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs +++ b/src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs @@ -62,7 +62,7 @@ public async Task ReadEventsBackwards(StreamName stream, StreamRe cancellationToken ); - return response.Length == 0 && failIfNotFound ? throw new StreamNotFound(stream) : response.OrderByDescending(x => x.Position).ToArray(); + return response.Length == 0 && failIfNotFound ? throw new StreamNotFound(stream) : response.OrderByDescending(x => x.Revision).ToArray(); } public async Task StreamExists(StreamName stream, CancellationToken cancellationToken) { diff --git a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/AppServiceTests.cs b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/AppServiceTests.cs index b3e5ceff..93d7c3de 100644 --- a/src/KurrentDB/test/Eventuous.Tests.KurrentDB/AppServiceTests.cs +++ b/src/KurrentDB/test/Eventuous.Tests.KurrentDB/AppServiceTests.cs @@ -12,7 +12,7 @@ public class AppServiceTests { public AppServiceTests(StoreFixture fixture) { _fixture = fixture; - _fixture.TypeMapper.AddType(); + _fixture.TypeMapper.RegisterDiscoveredTypes(); } BookingService Service { get; set; } = null!; @@ -39,6 +39,23 @@ public async Task ProcessAnyForNew(CancellationToken cancellationToken) { result.ShouldBeEquivalentTo(expected); } + [Test] + public async Task ProcessNewThenDeleteAndDoItAgain(CancellationToken cancellationToken) { + // This will create a new stream + var cmd = DomainFixture.CreateImportBooking(); + await Service.Handle(cmd, cancellationToken); + + var streamName = StreamName.For(cmd.BookingId); + await _fixture.EventStore.DeleteStream(streamName, ExpectedStreamVersion.Any, cancellationToken); + + var handlingResult = await Service.Handle(cmd, cancellationToken); + handlingResult.Success.ShouldBeTrue(); + + var cancelCmd = new Commands.CancelBooking(new(cmd.BookingId)); + var secondResult = await Service.Handle(cancelCmd, cancellationToken); + secondResult.Success.ShouldBeTrue(); + } + [After(Test)] public void Dispose() => _listener.Dispose(); } diff --git a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs index 8f44d270..d862b40c 100644 --- a/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs +++ b/src/Redis/test/Eventuous.Tests.Redis/Subscriptions/SubscribeToStream.cs @@ -98,6 +98,6 @@ async Task> GenerateAndProduceEvents(int count) { async Task GetStreamPosition(int count) { var readEvents = await _fixture.IntegrationFixture.EventReader.ReadEvents(_fixture.Stream, StreamReadPosition.Start, count, true, default); - return readEvents.Last().Position; + return readEvents.Last().Revision; } } diff --git a/src/Testing/src/Eventuous.Testing/AggregateSpec.cs b/src/Testing/src/Eventuous.Testing/AggregateSpec.cs index a5a992fe..49f42822 100644 --- a/src/Testing/src/Eventuous.Testing/AggregateSpec.cs +++ b/src/Testing/src/Eventuous.Testing/AggregateSpec.cs @@ -42,7 +42,8 @@ public abstract class AggregateSpec(AggregateFactoryRegistry [MemberNotNull(nameof(Instance))] protected TAggregate Then() { Instance = CreateInstance(); - Instance.Load(GivenEvents()); + var events = GivenEvents(); + Instance.Load(events.Length - 1, events); When(Instance); return Instance; diff --git a/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs b/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs index 682012fb..64405564 100644 --- a/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs +++ b/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs @@ -256,7 +256,7 @@ public FixtureResult FullStreamEventsAre(params object[] events) { /// [StackTraceHidden] public FixtureResult NewStreamEventsAre(params object[] events) { - var stream = _streamEvents.Where(x => x.Position >= _version).Select(x => x.Payload); + var stream = _streamEvents.Where(x => x.Revision >= _version).Select(x => x.Payload); stream.ShouldBe(events); return this; diff --git a/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs b/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs index e0f8305f..0b6fa718 100644 --- a/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs +++ b/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs @@ -92,7 +92,7 @@ public IEnumerable GetEvents(StreamReadPosition from, int count) { if (count > 0) selected = selected.Take(count); - return selected.Select(x => x.Event with { Position = x.Position }); + return selected.Select(x => x.Event with { Revision = x.Position }); } public IEnumerable GetEventsBackwards(StreamReadPosition from, int count) {