diff --git a/Source/EventFlow.MsSql.Tests/IntegrationTests/CrashResilienceTests.cs b/Source/EventFlow.MsSql.Tests/IntegrationTests/CrashResilienceTests.cs new file mode 100644 index 000000000..843b380f9 --- /dev/null +++ b/Source/EventFlow.MsSql.Tests/IntegrationTests/CrashResilienceTests.cs @@ -0,0 +1,210 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.Extensions; +using EventFlow.MsSql.Extensions; +using EventFlow.MsSql.ReliablePublish; +using EventFlow.PublishRecovery; +using EventFlow.Subscribers; +using EventFlow.TestHelpers; +using EventFlow.TestHelpers.Aggregates; +using EventFlow.TestHelpers.MsSql; +using FluentAssertions; +using NUnit.Framework; + +namespace EventFlow.MsSql.Tests.IntegrationTests +{ + [Category(Categories.Integration)] + public sealed class CrashResilienceTests : IntegrationTest + { + private IMsSqlDatabase _testDatabase; + private TestPublisher _publisher; + private PublishVerificator _publishVerificator; + private RecoveryHandlerForTest _recoveryHandler; + + protected override IRootResolver CreateRootResolver(IEventFlowOptions eventFlowOptions) + { + _testDatabase = MsSqlHelpz.CreateDatabase("eventflow"); + + _publisher = null; + + var resolver = eventFlowOptions + .ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(_testDatabase.ConnectionString.Value)) + .UseMssqlReliablePublishing() + .RegisterServices(sr => sr.Register(Lifetime.Singleton)) + .RegisterServices(sr => sr.Decorate( + (r, dea) => + _publisher ?? (_publisher = new TestPublisher(dea)))) + .RegisterServices(sr => sr.Register()) + .CreateResolver(); + + var databaseMigrator = resolver.Resolve(); + EventFlowPublishLogMsSql.MigrateDatabase(databaseMigrator); + databaseMigrator.MigrateDatabaseUsingEmbeddedScripts(GetType().Assembly); + + _publisher = (TestPublisher)resolver.Resolve(); + _publishVerificator = (PublishVerificator)resolver.Resolve(); + _recoveryHandler = (RecoveryHandlerForTest)resolver.Resolve(); + + return resolver; + } + + [TearDown] + public void TearDown() + { + _testDatabase.DisposeSafe("Failed to delete database"); + } + + [Test] + public async Task ShouldRecoverAfterFailure() + { + // Arrange + var id = ThingyId.New; + _publisher.SimulatePublishFailure = true; + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + + // Act + _publisher.SimulatePublishFailure = false; + await Verify().ConfigureAwait(false); + + // Assert + _recoveryHandler.RecoveredEvents.Should() + .BeEquivalentTo(_publisher.NotPublishedEvents); + } + + [Test] + public async Task ShouldRetryVerification() + { + // Arrange + var id = ThingyId.New; + _publisher.SimulatePublishFailure = true; + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + + // Act + var result = await _publishVerificator.VerifyOnceAsync(CancellationToken.None).ConfigureAwait(false); + + // Assert + result.Should().Be(PublishVerificationResult.RecoveredNeedVerify); + _publisher.PublishedEvents.Should().BeEmpty(); + } + + [Test] + public async Task ShouldNotRecoverAfterFailureWithoutVerificator() + { + // Arrange + var id = ThingyId.New; + _publisher.SimulatePublishFailure = true; + + // Act + await PublishPingCommandsAsync(id, 1).ConfigureAwait(false); + + // Assert + _recoveryHandler.RecoveredEvents.Should().BeEmpty(); + } + + private async Task Verify() + { + PublishVerificationResult result; + do + { + result = await _publishVerificator.VerifyOnceAsync(CancellationToken.None).ConfigureAwait(false); + } while (result != PublishVerificationResult.CompletedNoMoreDataToVerify); + } + + private class RecoveryHandlerForTest : IRecoveryHandlerProcessor + { + private readonly List _recoveredEvents = new List(); + private readonly IReliableMarkProcessor _markProcessor; + + public RecoveryHandlerForTest(IReliableMarkProcessor markProcessor) + { + _markProcessor = markProcessor; + } + + public IReadOnlyList RecoveredEvents => _recoveredEvents; + + public Task RecoverAfterUnexpectedShutdownAsync(IReadOnlyList eventsForRecovery, CancellationToken cancellationToken) + { + _recoveredEvents.AddRange(eventsForRecovery); + + return _markProcessor.MarkEventsPublishedAsync(eventsForRecovery); + } + } + + private class TestPublisher : IDomainEventPublisher + { + private readonly IDomainEventPublisher _inner; + private readonly List _publishedEvents = new List(); + private readonly List _notPublishedEvents = new List(); + + public TestPublisher(IDomainEventPublisher inner) + { + _inner = inner; + } + + public bool SimulatePublishFailure { get; set; } + + public IReadOnlyList PublishedEvents => _publishedEvents; + + public IReadOnlyList NotPublishedEvents => _notPublishedEvents; + + public async Task PublishAsync(IReadOnlyCollection domainEvents, + CancellationToken cancellationToken) + { + if (SimulatePublishFailure) + { + _notPublishedEvents.AddRange(domainEvents); + return; + } + + await _inner.PublishAsync(domainEvents, cancellationToken); + + _publishedEvents.AddRange(domainEvents); + } + + [Obsolete("Use PublishAsync (without generics and aggregate identity)")] + public Task PublishAsync(TIdentity id, + IReadOnlyCollection domainEvents, + CancellationToken cancellationToken) where TAggregate : IAggregateRoot + where TIdentity : IIdentity + { + return _inner.PublishAsync(id, domainEvents, cancellationToken); + } + } + + private sealed class AlwaysRecoverDetector : IRecoveryDetector + { + public bool IsNeedRecovery(IDomainEvent domainEvent) + { + return true; + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow.MsSql/EventFlow.MsSql.csproj b/Source/EventFlow.MsSql/EventFlow.MsSql.csproj index fa963bb6a..530c75355 100644 --- a/Source/EventFlow.MsSql/EventFlow.MsSql.csproj +++ b/Source/EventFlow.MsSql/EventFlow.MsSql.csproj @@ -33,6 +33,9 @@ + + + \ No newline at end of file diff --git a/Source/EventFlow.MsSql/Extensions/EventFlowOptionsMsSqlReliablePublishingExtensions.cs b/Source/EventFlow.MsSql/Extensions/EventFlowOptionsMsSqlReliablePublishingExtensions.cs new file mode 100644 index 000000000..b9222fafe --- /dev/null +++ b/Source/EventFlow.MsSql/Extensions/EventFlowOptionsMsSqlReliablePublishingExtensions.cs @@ -0,0 +1,37 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Extensions; +using EventFlow.MsSql.ReliablePublish; + +namespace EventFlow.MsSql.Extensions +{ + public static class EventFlowOptionsMsSqlReliablePublishingExtensions + { + public static IEventFlowOptions UseMssqlReliablePublishing(this IEventFlowOptions eventFlowOptions) + { + return eventFlowOptions + .UseReliablePublishing(); + } + } +} \ No newline at end of file diff --git a/Source/EventFlow.MsSql/ReliablePublish/EventFlowPublishLogMsSql.cs b/Source/EventFlow.MsSql/ReliablePublish/EventFlowPublishLogMsSql.cs new file mode 100644 index 000000000..46dc99870 --- /dev/null +++ b/Source/EventFlow.MsSql/ReliablePublish/EventFlowPublishLogMsSql.cs @@ -0,0 +1,45 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Reflection; +using EventFlow.Sql.Extensions; +using EventFlow.Sql.Migrations; + +namespace EventFlow.MsSql.ReliablePublish +{ + public static class EventFlowPublishLogMsSql + { + public static Assembly Assembly { get; } = typeof(EventFlowPublishLogMsSql).GetTypeInfo().Assembly; + + public static IEnumerable GetSqlScripts() + { + return Assembly.GetEmbeddedSqlScripts("EventFlow.MsSql.ReliablePublish.Scripts"); + } + + public static void MigrateDatabase(IMsSqlDatabaseMigrator msSqlDatabaseMigrator) + { + msSqlDatabaseMigrator.MigrateDatabaseUsingScripts(GetSqlScripts()); + } + } +} \ No newline at end of file diff --git a/Source/EventFlow.MsSql/ReliablePublish/MsSqlReliablePublishPersistence.cs b/Source/EventFlow.MsSql/ReliablePublish/MsSqlReliablePublishPersistence.cs new file mode 100644 index 000000000..ef6a36f27 --- /dev/null +++ b/Source/EventFlow.MsSql/ReliablePublish/MsSqlReliablePublishPersistence.cs @@ -0,0 +1,132 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.EventStores; +using EventFlow.PublishRecovery; +using EventFlow.Subscribers; + +namespace EventFlow.MsSql.ReliablePublish +{ + public sealed class MsSqlReliablePublishPersistence : IReliablePublishPersistence + { + private readonly IMsSqlConnection _msSqlConnection; + + public MsSqlReliablePublishPersistence(IMsSqlConnection msSqlConnection) + { + _msSqlConnection = msSqlConnection; + } + + public async Task MarkPublishedAsync(IIdentity aggregateIdentity, IReadOnlyCollection domainEvents) + { + var item = new PublishLogItem + { + AggregateId = aggregateIdentity.Value, + MinAggregateSequenceNumber = domainEvents.Min(x => x.AggregateSequenceNumber), + MaxAggregateSequenceNumber = domainEvents.Max(x => x.AggregateSequenceNumber), + }; + + await _msSqlConnection.ExecuteAsync( + Label.Named("publishlog-commit"), + CancellationToken.None, // Unable to Cancel + @"INSERT INTO [dbo].[EventFlowPublishLog] + (AggregateId, MinAggregateSequenceNumber, MaxAggregateSequenceNumber) + VALUES + (@AggregateId, @MinAggregateSequenceNumber, @MaxAggregateSequenceNumber)", + item) + .ConfigureAwait(false); + } + + public async Task GetUnverifiedItemsAsync(int maxCount, CancellationToken cancellationToken) + { + var logItems = await _msSqlConnection.QueryAsync( + Label.Named("publishlog-select"), + cancellationToken, + "SELECT TOP(@Top) [Id], [AggregateId],[MinAggregateSequenceNumber], [MaxAggregateSequenceNumber] FROM [dbo].[EventFlowPublishLog] ORDER BY [Id]", + new { Top = maxCount }) + .ConfigureAwait(false); + + var positions = await _msSqlConnection.QueryAsync( + Label.Named("publishlog-global-position-select"), + cancellationToken, + "SELECT TOP 1 [LastVerifiedPosition] FROM [dbo].[EventFlowPublishVerifyState]") + .ConfigureAwait(false); + + return new VerificationState( + new GlobalPosition(positions.First()), + logItems); + } + + public async Task MarkVerifiedAsync( + IReadOnlyCollection verifiedItems, + GlobalPosition newVerifiedPosition, + CancellationToken cancellationToken) + { + await _msSqlConnection.ExecuteAsync( + Label.Named("publishlog-global-position-update"), + cancellationToken, + "UPDATE [dbo].[EventFlowPublishVerifyState] SET LastVerifiedPosition = @LastVerifiedPosition", + new + { + LastVerifiedPosition = newVerifiedPosition.Value + }); + + foreach (var publishVerificationItem in verifiedItems) + { + var logItem = (PublishLogItem)publishVerificationItem; + + await _msSqlConnection.ExecuteAsync( + Label.Named("publishlog-confirm"), + cancellationToken, + "DELETE FROM [dbo].[EventFlowPublishLog] WHERE Id = @Id", + new { logItem.Id}); + } + } + + private sealed class PublishLogItem : IPublishVerificationItem + { + public long Id { get; set; } + public string AggregateId { get; set; } + public int MinAggregateSequenceNumber { get; set; } + public int MaxAggregateSequenceNumber { get; set; } + + public bool IsPublished(ICommittedDomainEvent committedDomainEvent) + { + return AggregateId == committedDomainEvent.AggregateId && + MinAggregateSequenceNumber <= committedDomainEvent.AggregateSequenceNumber && + committedDomainEvent.AggregateSequenceNumber <= MaxAggregateSequenceNumber; + } + + public bool IsFinalEvent(ICommittedDomainEvent committedDomainEvent) + { + return committedDomainEvent.AggregateId == AggregateId && + MaxAggregateSequenceNumber == committedDomainEvent.AggregateSequenceNumber; + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow.MsSql/ReliablePublish/Scripts/0001 - Create PublishLog.sql b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0001 - Create PublishLog.sql new file mode 100644 index 000000000..9b7c12788 --- /dev/null +++ b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0001 - Create PublishLog.sql @@ -0,0 +1,13 @@ +IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = N'dbo' AND table_name = N'EventFlowPublishLog') +BEGIN + CREATE TABLE [dbo].[EventFlowPublishLog]( + [Id] [bigint] IDENTITY(1,1) NOT NULL, + [AggregateId] [nvarchar](128) NOT NULL, + [MinAggregateSequenceNumber] [int] NOT NULL, + [MaxAggregateSequenceNumber] [int] NOT NULL, + CONSTRAINT [PK_EventFlowPublishLog] PRIMARY KEY CLUSTERED + ( + [Id] ASC + ) + ) +END diff --git a/Source/EventFlow.MsSql/ReliablePublish/Scripts/0002 - Create PublishVerifyState.sql b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0002 - Create PublishVerifyState.sql new file mode 100644 index 000000000..c9a0e5eb3 --- /dev/null +++ b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0002 - Create PublishVerifyState.sql @@ -0,0 +1,11 @@ +IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = N'dbo' AND table_name = N'EventFlowPublishVerifyState') +BEGIN + CREATE TABLE [dbo].[EventFlowPublishVerifyState]( + [Id] [bigint] IDENTITY(1,1) NOT NULL, + [LastVerifiedPosition] [nvarchar](255) NOT NULL, + CONSTRAINT [PK_EventFlowPublishVerifyState] PRIMARY KEY CLUSTERED + ( + [Id] ASC + ) + ) +END diff --git a/Source/EventFlow.MsSql/ReliablePublish/Scripts/0003 - Setup initial PublishVerifyState.sql b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0003 - Setup initial PublishVerifyState.sql new file mode 100644 index 000000000..4c25dccd0 --- /dev/null +++ b/Source/EventFlow.MsSql/ReliablePublish/Scripts/0003 - Setup initial PublishVerifyState.sql @@ -0,0 +1,4 @@ +IF NOT EXISTS (SELECT * FROM [dbo].[EventFlowPublishVerifyState]) +BEGIN + INSERT INTO [dbo].[EventFlowPublishVerifyState] (LastVerifiedPosition) VALUES ('') +END diff --git a/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModelRecoveryTests.cs b/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModelRecoveryTests.cs new file mode 100644 index 000000000..c492f7953 --- /dev/null +++ b/Source/EventFlow.Tests/IntegrationTests/ReadStores/ReadModelRecoveryTests.cs @@ -0,0 +1,104 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2019 Rasmus Mikkelsen +// Copyright (c) 2015-2019 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Configuration; +using EventFlow.Extensions; +using EventFlow.PublishRecovery; +using EventFlow.ReadStores; +using EventFlow.TestHelpers; +using EventFlow.TestHelpers.Aggregates; +using EventFlow.TestHelpers.Aggregates.Events; +using FluentAssertions; +using NUnit.Framework; + +namespace EventFlow.Tests.IntegrationTests.ReadStores +{ + public sealed class ReadModelRecoveryTests : IntegrationTest + { + protected override IRootResolver CreateRootResolver(IEventFlowOptions eventFlowOptions) + { + return eventFlowOptions + .UseInMemoryReadStoreFor() + .UseReadModelRecoveryHandler(Lifetime.Singleton) + .CreateResolver(); + } + + [Test] + public async Task ShouldRecoveryForExceptionInReadModel() + { + var recoveryHandler = (TestRecoveryHandler)Resolver.Resolve>(); + recoveryHandler.ShouldRecover = true; + + await PublishPingCommandAsync(ThingyId.New); + + recoveryHandler.LastRecoveredEvents.Should() + .ContainSingle(x => x.GetAggregateEvent() is ThingyPingEvent); + } + + [Test] + public async Task ShouldThrowOriginalErrorWhenNoRecovery() + { + var recoveryHandler = (TestRecoveryHandler)Resolver.Resolve>(); + recoveryHandler.ShouldRecover = false; + + Func publishPing = () => PublishPingCommandAsync(ThingyId.New); + + (await publishPing.Should().ThrowAsync().ConfigureAwait(false)) + .WithMessage("Read model exception. Should be recovered."); + } + + private sealed class FailingReadModel : IReadModel, + IAmReadModelFor + { + public void Apply(IReadModelContext context, IDomainEvent domainEvent) + { + throw new Exception("Read model exception. Should be recovered."); + } + } + + private sealed class TestRecoveryHandler : IReadModelRecoveryHandler + { + public IReadOnlyCollection LastRecoveredEvents { get; private set; } + + public bool ShouldRecover { get; set; } + + public Task RecoverFromShutdownAsync(IReadOnlyCollection eventsForRecovery, CancellationToken cancellationToken) + { + return Task.FromResult(0); + } + + public Task RecoverFromErrorAsync(IReadOnlyCollection eventsForRecovery, Exception exception, + CancellationToken cancellationToken) + { + LastRecoveredEvents = eventsForRecovery; + + return Task.FromResult(ShouldRecover); + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/EventFlowOptions.cs b/Source/EventFlow/EventFlowOptions.cs index 5ecfb33df..f919f9c47 100644 --- a/Source/EventFlow/EventFlowOptions.cs +++ b/Source/EventFlow/EventFlowOptions.cs @@ -40,6 +40,7 @@ using EventFlow.Jobs; using EventFlow.Logs; using EventFlow.Provided; +using EventFlow.PublishRecovery; using EventFlow.Queries; using EventFlow.ReadStores; using EventFlow.Sagas; diff --git a/Source/EventFlow/Extensions/EventFlowOptionsReliablePublishingExtensions.cs b/Source/EventFlow/Extensions/EventFlowOptionsReliablePublishingExtensions.cs new file mode 100644 index 000000000..00959fcaa --- /dev/null +++ b/Source/EventFlow/Extensions/EventFlowOptionsReliablePublishingExtensions.cs @@ -0,0 +1,75 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Configuration; +using EventFlow.PublishRecovery; +using EventFlow.ReadStores; +using EventFlow.Subscribers; + +namespace EventFlow.Extensions +{ + public static class EventFlowOptionsReliablePublishingExtensions + { + public static IEventFlowOptions UseReliablePublishing( + this IEventFlowOptions eventFlowOptions, + Lifetime lifetime = Lifetime.AlwaysUnique) + where TReliablePublishPersistence : class, IReliablePublishPersistence + { + return eventFlowOptions + .RegisterServices(f => f.Register()) + .RegisterServices(f => f.Register()) + .RegisterServices(f => f.Register()) + .RegisterServices(r => r.Register()) + .RegisterServices(f => f.Register(lifetime)) + .RegisterServices(f => f.Decorate( + (context, inner) => new ReliableDomainEventPublisher(inner, context.Resolver.Resolve()))); + } + + public static IEventFlowOptions UseReadModelRecoveryHandler( + this IEventFlowOptions eventFlowOptions, + Lifetime lifetime = Lifetime.AlwaysUnique) + where TRecoveryHandler : class, IReadModelRecoveryHandler + where TReadModel : class, IReadModel + { + return eventFlowOptions + .RegisterServices(f => + { + f.Register, TRecoveryHandler>(lifetime); + + f.Register(ctx => (IReadModelRecoveryHandler)ctx.Resolver.Resolve>()); + + f.Decorate((ctx, inner) => + { + if (inner.ReadModelType == typeof(TReadModel)) + { + return new ReadStoreManagerWithErrorRecovery( + (IReadStoreManager)inner, + ctx.Resolver.Resolve>()); + } + + return inner; + }); + }); + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IPublishVerificationItem.cs b/Source/EventFlow/PublishRecovery/IPublishVerificationItem.cs new file mode 100644 index 000000000..44630a285 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IPublishVerificationItem.cs @@ -0,0 +1,36 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.EventStores; + +namespace EventFlow.PublishRecovery +{ + public interface IPublishVerificationItem + { + string AggregateId { get; } + + bool IsPublished(ICommittedDomainEvent committedDomainEvent); + + bool IsFinalEvent(ICommittedDomainEvent committedDomainEvent); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IPublishVerificator.cs b/Source/EventFlow/PublishRecovery/IPublishVerificator.cs new file mode 100644 index 000000000..3a5a0e519 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IPublishVerificator.cs @@ -0,0 +1,33 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.PublishRecovery +{ + public interface IPublishVerificator + { + Task VerifyOnceAsync(CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IReadModelRecoveryHandler.cs b/Source/EventFlow/PublishRecovery/IReadModelRecoveryHandler.cs new file mode 100644 index 000000000..edfac1548 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IReadModelRecoveryHandler.cs @@ -0,0 +1,49 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.ReadStores; + +namespace EventFlow.PublishRecovery +{ + public interface IReadModelRecoveryHandler + { + Task RecoverFromShutdownAsync( + IReadOnlyCollection eventsForRecovery, + CancellationToken cancellationToken); + + Task RecoverFromErrorAsync( + IReadOnlyCollection eventsForRecovery, + Exception exception, + CancellationToken cancellationToken); + } + + public interface IReadModelRecoveryHandler : IReadModelRecoveryHandler + where TReadModel : class, IReadModel + { + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IRecoveryDetector.cs b/Source/EventFlow/PublishRecovery/IRecoveryDetector.cs new file mode 100644 index 000000000..9f8aeddd3 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IRecoveryDetector.cs @@ -0,0 +1,32 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public interface IRecoveryDetector + { + bool IsNeedRecovery(IDomainEvent domainEvent); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IRecoveryHandlerProcessor.cs b/Source/EventFlow/PublishRecovery/IRecoveryHandlerProcessor.cs new file mode 100644 index 000000000..83bb72bc5 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IRecoveryHandlerProcessor.cs @@ -0,0 +1,37 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public interface IRecoveryHandlerProcessor + { + Task RecoverAfterUnexpectedShutdownAsync( + IReadOnlyList eventsForRecovery, + CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IReliableMarkProcessor.cs b/Source/EventFlow/PublishRecovery/IReliableMarkProcessor.cs new file mode 100644 index 000000000..100948157 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IReliableMarkProcessor.cs @@ -0,0 +1,34 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Threading.Tasks; +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public interface IReliableMarkProcessor + { + Task MarkEventsPublishedAsync(IReadOnlyCollection domainEvents); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/IReliablePublishPersistence.cs b/Source/EventFlow/PublishRecovery/IReliablePublishPersistence.cs new file mode 100644 index 000000000..4df097673 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/IReliablePublishPersistence.cs @@ -0,0 +1,41 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.EventStores; + +namespace EventFlow.PublishRecovery +{ + public interface IReliablePublishPersistence + { + Task MarkPublishedAsync(IIdentity aggregateIdentity, IReadOnlyCollection domainEvents); + + Task GetUnverifiedItemsAsync(int maxCount, CancellationToken cancellationToken); + + Task MarkVerifiedAsync(IReadOnlyCollection verifiedItems, GlobalPosition newVerifiedPosition, CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/PublishVerificationResult.cs b/Source/EventFlow/PublishRecovery/PublishVerificationResult.cs new file mode 100644 index 000000000..b1ca9fb11 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/PublishVerificationResult.cs @@ -0,0 +1,32 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +namespace EventFlow.PublishRecovery +{ + public enum PublishVerificationResult + { + CompletedNoMoreDataToVerify, + RecoveredNeedVerify, + HasMoreDataNeedVerify, + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/PublishVerificator.cs b/Source/EventFlow/PublishRecovery/PublishVerificator.cs new file mode 100644 index 000000000..d7655f824 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/PublishVerificator.cs @@ -0,0 +1,146 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.EventStores; + +namespace EventFlow.PublishRecovery +{ + public sealed class PublishVerificator : IPublishVerificator + { + private const int PageSize = 200; + + private readonly IEventPersistence _eventPersistence; + private readonly IRecoveryHandlerProcessor _recoveryHandlerProcessor; + private readonly IEventJsonSerializer _eventSerializer; + private readonly IRecoveryDetector _recoveryDetector; + private readonly IReliablePublishPersistence _reliablePublishPersistence; + + public PublishVerificator(IEventPersistence eventPersistence, IRecoveryHandlerProcessor recoveryHandlerProcessor, IEventJsonSerializer eventSerializer, IRecoveryDetector recoveryDetector, IReliablePublishPersistence reliablePublishPersistence) + { + _eventPersistence = eventPersistence; + _recoveryHandlerProcessor = recoveryHandlerProcessor; + _eventSerializer = eventSerializer; + _recoveryDetector = recoveryDetector; + _reliablePublishPersistence = reliablePublishPersistence; + } + + public async Task VerifyOnceAsync(CancellationToken cancellationToken) + { + var state = await _reliablePublishPersistence.GetUnverifiedItemsAsync(PageSize, cancellationToken) + .ConfigureAwait(false); + + var logItemLookup = state.Items.ToLookup(x => x.AggregateId); + + var page = await _eventPersistence.LoadAllCommittedEvents(state.LastVerifiedPosition, PageSize, cancellationToken) + .ConfigureAwait(false); + + var verifyResult = VerifyDomainEvents(page, logItemLookup); + + // Some of not published events can be in flight, so no need recovery them + // but we have to check them again on next iteration + var eventsForRecovery = GetEventsForRecovery(verifyResult.UnpublishedEvents); + + if (eventsForRecovery.Any()) + { + // Do it inside transaction to recover in single thread + // success recovery should put LogItem + await _recoveryHandlerProcessor.RecoverAfterUnexpectedShutdownAsync(eventsForRecovery, cancellationToken) + .ConfigureAwait(false); + + return PublishVerificationResult.RecoveredNeedVerify; + } + + // Remove logs and move position forward only when it is successfully recovered. + if (verifyResult.UnpublishedEvents.Count == 0) + { + await _reliablePublishPersistence + .MarkVerifiedAsync(verifyResult.PublishedLogItems, page.NextGlobalPosition, cancellationToken) + .ConfigureAwait(false); + + return page.CommittedDomainEvents.Count < PageSize + ? PublishVerificationResult.CompletedNoMoreDataToVerify + : PublishVerificationResult.HasMoreDataNeedVerify; + } + + return PublishVerificationResult.CompletedNoMoreDataToVerify; + } + + private IReadOnlyList GetEventsForRecovery(IReadOnlyList unpublishedEvents) + { + return unpublishedEvents + .Select(evnt => _eventSerializer.Deserialize(evnt)) + .Where(evnt => _recoveryDetector.IsNeedRecovery(evnt)) + .ToList(); + } + + private VerifyResult VerifyDomainEvents(AllCommittedEventsPage page, ILookup logItemLookup) + { + var unpublishedEvents = new List(); + var publishedLogItems = new List(); + + foreach (var committedDomainEvent in page.CommittedDomainEvents) + { + var logItem = TryGetPublishedLogItem(committedDomainEvent, logItemLookup); + + if (logItem == null) + { + unpublishedEvents.Add(committedDomainEvent); + } + // Remove logItem only on the last event related with this log item + else if (logItem.IsFinalEvent(committedDomainEvent)) + { + publishedLogItems.Add(logItem); + } + } + + return new VerifyResult(unpublishedEvents, publishedLogItems); + } + + private IPublishVerificationItem TryGetPublishedLogItem(ICommittedDomainEvent committedDomainEvent, ILookup logItemLookup) + { + var logItems = logItemLookup[committedDomainEvent.AggregateId]; + + return logItems.FirstOrDefault(logItem => logItem.IsPublished(committedDomainEvent)); + } + + private sealed class VerifyResult + { + public VerifyResult( + IReadOnlyList unpublishedEvents, + IReadOnlyList publishedLogItems) + { + UnpublishedEvents = unpublishedEvents; + PublishedLogItems = publishedLogItems; + } + + public IReadOnlyList UnpublishedEvents { get; } + + public IReadOnlyList PublishedLogItems { get; } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/ReadStoreManagerWithErrorRecovery.cs b/Source/EventFlow/PublishRecovery/ReadStoreManagerWithErrorRecovery.cs new file mode 100644 index 000000000..6387a0cc9 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/ReadStoreManagerWithErrorRecovery.cs @@ -0,0 +1,64 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.ReadStores; + +namespace EventFlow.PublishRecovery +{ + public sealed class ReadStoreManagerWithErrorRecovery : IReadStoreManager + where TReadModel : class, IReadModel + { + private readonly IReadStoreManager _original; + private readonly IReadModelRecoveryHandler _recoveryHandler; + + public ReadStoreManagerWithErrorRecovery(IReadStoreManager original, IReadModelRecoveryHandler recoveryHandler) + { + _original = original; + _recoveryHandler = recoveryHandler; + } + + public Type ReadModelType => _original.ReadModelType; + + public async Task UpdateReadStoresAsync(IReadOnlyCollection domainEvents, CancellationToken cancellationToken) + { + try + { + await _original.UpdateReadStoresAsync(domainEvents, cancellationToken); + } + catch (Exception ex) + { + var handled = await _recoveryHandler.RecoverFromErrorAsync(domainEvents, ex, cancellationToken); + + if (!handled) + { + throw; + } + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/RecoveryHandlerProcessor.cs b/Source/EventFlow/PublishRecovery/RecoveryHandlerProcessor.cs new file mode 100644 index 000000000..38a5e5e5d --- /dev/null +++ b/Source/EventFlow/PublishRecovery/RecoveryHandlerProcessor.cs @@ -0,0 +1,62 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public sealed class RecoveryHandlerProcessor : IRecoveryHandlerProcessor + { + private readonly IReliableMarkProcessor _markProcessor; + private readonly IReadOnlyList _readModelRecoveryHandlers; + + public RecoveryHandlerProcessor( + IReliableMarkProcessor markProcessor, + IEnumerable readModelRecoveryHandlers) + { + _markProcessor = markProcessor; + _readModelRecoveryHandlers = readModelRecoveryHandlers.ToList(); + } + + public async Task RecoverAfterUnexpectedShutdownAsync(IReadOnlyList eventsForRecovery, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var recoveryTasks = _readModelRecoveryHandlers.Select( + handler => handler.RecoverFromShutdownAsync(eventsForRecovery, cancellationToken)); + + await Task.WhenAll(recoveryTasks) + .ConfigureAwait(false); + + // TODO: Recover Subscribers, Sagas + + cancellationToken.ThrowIfCancellationRequested(); + + await _markProcessor.MarkEventsPublishedAsync(eventsForRecovery).ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/ReliableDomainEventPublisher.cs b/Source/EventFlow/PublishRecovery/ReliableDomainEventPublisher.cs new file mode 100644 index 000000000..22fe554cf --- /dev/null +++ b/Source/EventFlow/PublishRecovery/ReliableDomainEventPublisher.cs @@ -0,0 +1,61 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.Subscribers; + +namespace EventFlow.PublishRecovery +{ + public sealed class ReliableDomainEventPublisher : IDomainEventPublisher + { + private readonly IDomainEventPublisher _nonReliableDomainEventPublisher; + private readonly IReliableMarkProcessor _reliableMarkProcessor; + + public ReliableDomainEventPublisher(IDomainEventPublisher nonReliableDomainEventPublisher, IReliableMarkProcessor reliableMarkProcessor) + { + _nonReliableDomainEventPublisher = nonReliableDomainEventPublisher; + _reliableMarkProcessor = reliableMarkProcessor; + } + + public async Task PublishAsync(IReadOnlyCollection domainEvents, CancellationToken cancellationToken) + { + await _nonReliableDomainEventPublisher.PublishAsync(domainEvents, cancellationToken).ConfigureAwait(false); + + await _reliableMarkProcessor.MarkEventsPublishedAsync(domainEvents); + } + + [Obsolete("Use PublishAsync (without generics and aggregate identity)")] + public async Task PublishAsync(TIdentity id, IReadOnlyCollection domainEvents, + CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity + { + await _nonReliableDomainEventPublisher.PublishAsync(id, domainEvents, cancellationToken); + + await _reliableMarkProcessor.MarkEventsPublishedAsync(domainEvents); + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/ReliableMarkProcessor.cs b/Source/EventFlow/PublishRecovery/ReliableMarkProcessor.cs new file mode 100644 index 000000000..978605546 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/ReliableMarkProcessor.cs @@ -0,0 +1,56 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public sealed class ReliableMarkProcessor : IReliableMarkProcessor + { + private readonly IReliablePublishPersistence _reliablePublishPersistence; + + public ReliableMarkProcessor(IReliablePublishPersistence reliablePublishPersistence) + { + _reliablePublishPersistence = reliablePublishPersistence; + } + + public async Task MarkEventsPublishedAsync(IReadOnlyCollection domainEvents) + { + var count = domainEvents.Count; + if (count == 0) + { + return; + } + + foreach (var domaiEnventsPerAggregate in domainEvents.GroupBy(x => x.GetIdentity())) + { + var aggregateIdentity = domaiEnventsPerAggregate.Key; + + await _reliablePublishPersistence.MarkPublishedAsync(aggregateIdentity, domainEvents).ConfigureAwait(false); + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/TimeBasedRecoveryDetector.cs b/Source/EventFlow/PublishRecovery/TimeBasedRecoveryDetector.cs new file mode 100644 index 000000000..ce36a9d11 --- /dev/null +++ b/Source/EventFlow/PublishRecovery/TimeBasedRecoveryDetector.cs @@ -0,0 +1,38 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using EventFlow.Aggregates; + +namespace EventFlow.PublishRecovery +{ + public sealed class TimeBasedRecoveryDetector : IRecoveryDetector + { + public TimeSpan DelayToNeedRecover { get; set; } = TimeSpan.FromMinutes(5); + + public bool IsNeedRecovery(IDomainEvent domainEvent) + { + return DateTimeOffset.UtcNow - domainEvent.Timestamp > DelayToNeedRecover; + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/PublishRecovery/VerificationState.cs b/Source/EventFlow/PublishRecovery/VerificationState.cs new file mode 100644 index 000000000..08d61579a --- /dev/null +++ b/Source/EventFlow/PublishRecovery/VerificationState.cs @@ -0,0 +1,41 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2018 Rasmus Mikkelsen +// Copyright (c) 2015-2018 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using EventFlow.EventStores; + +namespace EventFlow.PublishRecovery +{ + public class VerificationState + { + public VerificationState(GlobalPosition lastVerifiedPosition, IReadOnlyCollection items) + { + LastVerifiedPosition = lastVerifiedPosition; + Items = items; + } + + public GlobalPosition LastVerifiedPosition { get; } + + public IReadOnlyCollection Items { get; } + } +} \ No newline at end of file