diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs new file mode 100644 index 00000000..2452dd8e --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microwave.Domain; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDb : ICosmosDb + { + + private readonly IMicrowaveConfiguration _configuration; + + public CosmosDb(IMicrowaveConfiguration configuration) + { + _configuration = configuration; + } + + public DocumentClient GetCosmosDbClient() + { + return new DocumentClient(new Uri(_configuration.DatabaseConfiguration.ConnectionString), + _configuration.DatabaseConfiguration.PrimaryKey); + } + } +} \ No newline at end of file diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbClient.cs b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbClient.cs new file mode 100644 index 00000000..28d9fad8 --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbClient.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDbClient : ICosmosDbClient + { + private readonly DocumentClient _client; + private IEnumerable _domainEventTypes; + private const string DatabaseId = "Eventstore"; + private const string CollectionId = "DomainEvents"; + + public CosmosDbClient(ICosmosDb cosmosDb, IEnumerable assemblies) + { + _client = cosmosDb.GetCosmosDbClient(); + var type = typeof(IDomainEvent); + _domainEventTypes = assemblies + .SelectMany(s => s.GetTypes()) + .Where(p => type.IsAssignableFrom(p)); + + } + + public async Task InitializeCosmosDbAsync() + { + var database = await _client.CreateDatabaseIfNotExistsAsync(new Database { Id = DatabaseId }); + var collection = await _client.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(DatabaseId), + new DocumentCollection { Id = CollectionId }); + if (database == null || collection == null) + { + throw new ArgumentException("Could not create Database or Collection with given CosmosDb Configuration Parameters!"); + } + } + + + public async Task CreateDomainEventAsync(IDomainEvent domainEvent) + { + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + await _client.CreateDocumentAsync(uri, domainEvent); + + } + + + public async Task> GetDomainEventsAsync(Identity identity) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.DomainEvent.EntityId == identity) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + + var result = wrappedEvents.Select(e => JsonConvert.DeserializeObject(e.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == e.GetValue("DomainEventType").ToString()))).ToList(); + return new List(); + } + + + public async Task>> GetDomainEventsAsync(DateTimeOffset tickSince) + { + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + var query = _client.CreateDocumentQuery(uri, queryOptions) + .Where(e => e.Created > tickSince); + return Result>.Ok(query.ToList()); + } + + public async Task CreateItemAsync(DomainEventWrapper domainEvent) + { + return await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), domainEvent); + } + + public async Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince) + { + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + var query = _client.CreateDocumentQuery(uri, queryOptions) + .Where(e => e.DomainEventType == eventType); + return Result>.Ok(query.ToList()); + } + } + +} \ No newline at end of file diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs new file mode 100644 index 00000000..d31e1c1b --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Microwave.EventStores.Ports; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDbEventRepository : IEventRepository + { + private readonly ICosmosDbClient _cosmosDbClient; + + + public CosmosDbEventRepository(ICosmosDbClient cosmosDbClient) + { + _cosmosDbClient = cosmosDbClient; + } + + public async Task>> LoadEventsByEntity(Identity entityId, long @from = 0) + { + throw new NotImplementedException(); + //var uri = CreateUriForCosmosDb(entityId); + //var domainEvents = (await _client.ReadDocumentAsync>(uri)).Document; + //return new EventStoreResult>(domainEvents, domainEvents.Max(e => e.Version)); + } + + public async Task AppendAsync(IEnumerable domainEvents, long currentEntityVersion) + { + foreach (var domainEvent in domainEvents) + { + + await _cosmosDbClient.CreateDomainEventAsync(domainEvent); + } + + return Result.Ok(); + } + + public async Task>> LoadEvents(DateTimeOffset tickSince = default(DateTimeOffset)) + { + var result = await _cosmosDbClient.GetDomainEventsAsync(tickSince); + if (result.Value.Any()) + { + return Result>.Ok(result.Value); + } + else + { + return Result>.NotFound(null); + } + } + + public async Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince = default(DateTimeOffset)) + { + var result = _cosmosDbClient.LoadEventsByTypeAsync(eventType, tickSince); + return Result>.Ok(result.Result.Value); + } + + public async Task> GetLastEventOccuredOn(string domainEventType) + { + throw new NotImplementedException(); + //FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + //var uri = UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionId); + //var query = _client.CreateDocumentQuery(uri, queryOptions).ToList(); + //var latestEventTime = query.Max(e => e.Created); + + //return Result.Ok(latestEventTime); + } + + private Uri CreateUriForCosmosDb(Identity identity) + { + //return UriFactory.CreateDocumentUri(DatabaseName, CollectionId, identity.Id); + return null; + } + } +} diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs new file mode 100644 index 00000000..764f5442 --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public interface ICosmosDb + { + DocumentClient GetCosmosDbClient(); + } +} \ No newline at end of file diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDbClient.cs b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDbClient.cs new file mode 100644 index 00000000..3dc3755f --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDbClient.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public interface ICosmosDbClient + { + Task CreateDomainEventAsync(IDomainEvent domainEvent); + Task>> GetDomainEventsAsync(DateTimeOffset tickSince); + Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince); + Task CreateItemAsync(DomainEventWrapper domainEvent); + Task> GetDomainEventsAsync(Identity identity); + } +} \ No newline at end of file diff --git a/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj new file mode 100644 index 00000000..2cf43180 --- /dev/null +++ b/CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj @@ -0,0 +1,16 @@ + + + + netcoreapp2.1 + + + + + + + + + + + + diff --git a/Microwave.Discovery/ServiceBaseAddressCollection.cs b/Microwave.Discovery/ServiceBaseAddressCollection.cs index e2aa65bb..e4027627 100644 --- a/Microwave.Discovery/ServiceBaseAddressCollection.cs +++ b/Microwave.Discovery/ServiceBaseAddressCollection.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; -namespace Microwave +namespace Microwave.Discovery { public class ServiceBaseAddressCollection : List { diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs new file mode 100644 index 00000000..5e9d1d21 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.ObjectModel; +using System.Security; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDb : ICosmosDb + { + public SecureString PrimaryKey { get; set; } + + public Uri CosmosDbLocation { get; set; } + + public DocumentClient GetCosmosDbClient() + { + var client = new DocumentClient(CosmosDbLocation, PrimaryKey); + return client; + } + + public string DatabaseId => "Eventstore"; + public string EventsCollectionId => "DomainEvents"; + public string SnapshotsCollectionId => "Snapshots"; + public string ServiceMapCollectionId => "ServiceMap"; + public string StatusCollectionId => "Status"; + public string VersionCollectionId => "Versions"; + + public async Task InitializeCosmosDb() + { + var client = new DocumentClient(CosmosDbLocation, PrimaryKey); + + var domainEventsCollection = new DocumentCollection + { + Id = EventsCollectionId + }; + domainEventsCollection.UniqueKeyPolicy = new UniqueKeyPolicy + { + UniqueKeys = + new Collection + { + new UniqueKey {Paths = new Collection {"/Version", "/DomainEvent/EntityId/Id"}} + } + }; + + var snapShotCollection = new DocumentCollection + { + Id = SnapshotsCollectionId + }; + snapShotCollection.UniqueKeyPolicy = new UniqueKeyPolicy + { + UniqueKeys = + new Collection + { + new UniqueKey {Paths = new Collection {"/Version", "/Id/Id"}} + } + }; + var versionCollection = new DocumentCollection + { + Id = VersionCollectionId + }; + //versionCollection.UniqueKeyPolicy = new UniqueKeyPolicy + //{ + // UniqueKeys = + // new Collection + // { + // new UniqueKey {Paths = new Collection {"/Version", "/DomainEventType"}} + // } + //}; + + try + { + await client.CreateDatabaseIfNotExistsAsync(new Database {Id = DatabaseId}); + await client.CreateDocumentCollectionIfNotExistsAsync( + UriFactory.CreateDatabaseUri(DatabaseId), + domainEventsCollection); + await client.CreateDocumentCollectionIfNotExistsAsync( + UriFactory.CreateDatabaseUri(DatabaseId), + new DocumentCollection{Id = StatusCollectionId}); + await client.CreateDocumentCollectionIfNotExistsAsync( + UriFactory.CreateDatabaseUri(DatabaseId), + snapShotCollection); + await client.CreateDocumentCollectionIfNotExistsAsync( + UriFactory.CreateDatabaseUri(DatabaseId), + versionCollection); + } + catch (DocumentClientException e) + { + throw new ArgumentException( + $"Could not create Database or Collection with given CosmosDb Configuration Parameters! Exception : {e}" ); + + } + } + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs new file mode 100644 index 00000000..d0e10abd --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbEventRepository.cs @@ -0,0 +1,184 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Microwave.EventStores.Ports; +using Microwave.Persistence.MongoDb.Eventstores; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbEventRepository : IEventRepository + { + private readonly ICosmosDb _cosmosDb; + private readonly IAssemblyProvider _assemblyProvider; + private readonly IVersionCache _versionCache; + private DocumentClient _client; + private IEnumerable _domainEventTypes; + + + public CosmosDbEventRepository(ICosmosDb cosmosDb, IAssemblyProvider assemblyProvider, IVersionCache versionCache) + { + _cosmosDb = cosmosDb; + _assemblyProvider = assemblyProvider; + _versionCache = versionCache; + _client = cosmosDb.GetCosmosDbClient(); + var type = typeof(IDomainEvent); + _domainEventTypes = assemblyProvider.GetAssemblies() + .SelectMany(s => s.GetTypes()) + .Where(p => type.IsAssignableFrom(p)); + + } + + public async Task>> LoadEventsByEntity(Identity entityId, long @from = 0) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.DomainEvent.EntityId == entityId && e.Version >= from) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + + var result = new List(); + foreach (var wrappedEvent in wrappedEvents) + { + result.Add(new DomainEventWrapper + { + Created = (DateTimeOffset)wrappedEvent.GetValue(nameof(DomainEventWrapper.Created)), + DomainEvent = (IDomainEvent)JsonConvert.DeserializeObject(wrappedEvent.GetValue(nameof(DomainEventWrapper.DomainEvent)).ToString(), _domainEventTypes.Single(x => x.Name == wrappedEvent.GetValue(nameof(DomainEventWrapper.DomainEvent)).ToString())), + Version = (long)wrappedEvent.GetValue(nameof(DomainEventWrapper.Version)) + }); + } + + return Result>.Ok(result); + } + + public async Task AppendAsync(IEnumerable domainEvents, long currentEntityVersion) + { + foreach (var domainEvent in domainEvents) + { + + var wrappedEvent = new DomainEventWrapper + { + DomainEvent = domainEvent, + Created = DateTimeOffset.Now, + Version = currentEntityVersion + }; + try + { + await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), wrappedEvent); + } + catch (DocumentClientException) + { + var actualVersion = await _versionCache.Get(domainEvent.EntityId); + return Result.ConcurrencyResult(currentEntityVersion, actualVersion); + } + + } + return Result.Ok(); + } + + public async Task>> LoadEvents(DateTimeOffset tickSince = default(DateTimeOffset)) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.Created >= tickSince) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + var result = new List(); + foreach (var wrappedEvent in wrappedEvents) + { + result.Add(new DomainEventWrapper + { + Created = (DateTimeOffset)wrappedEvent.GetValue("Created"), + DomainEvent = (IDomainEvent)JsonConvert.DeserializeObject(wrappedEvent.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == wrappedEvent.GetValue("DomainEventType").ToString())), + Version = (long)wrappedEvent.GetValue("Version") + }); + } + if (result.Any()) + { + return Result>.Ok(result); + } + else + { + return Result>.NotFound(null); + } + + } + + public async Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince = default(DateTimeOffset)) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.Created >= tickSince && e.DomainEventType == eventType) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + + var result = new List(); + foreach (var wrappedEvent in wrappedEvents) + { + result.Add(new DomainEventWrapper + { + Created = (DateTimeOffset)wrappedEvent.GetValue("Created"), + DomainEvent = (IDomainEvent)JsonConvert.DeserializeObject(wrappedEvent.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == wrappedEvent.GetValue("DomainEventType").ToString())), + Version = (long)wrappedEvent.GetValue("Version") + }); + } + + return Result>.Ok(result); + } + + public async Task> GetLastEventOccuredOn(string domainEventType) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.DomainEventType == domainEventType) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + + var result = new List(); + foreach (var wrappedEvent in wrappedEvents) + { + result.Add(new DomainEventWrapper + { + Created = (DateTimeOffset)wrappedEvent.GetValue("Created"), + DomainEvent = (IDomainEvent)JsonConvert.DeserializeObject(wrappedEvent.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == wrappedEvent.GetValue("DomainEventType").ToString())), + Version = (long)wrappedEvent.GetValue("Version") + }); + } + + return Result.Ok(result.Max(s => s.Created)); + } + } +} diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbPersistenceLayer.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbPersistenceLayer.cs new file mode 100644 index 00000000..c7a77089 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbPersistenceLayer.cs @@ -0,0 +1,37 @@ +using System.Collections.Generic; +using System.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Microwave.Discovery; +using Microwave.EventStores; +using Microwave.EventStores.Ports; +using Microwave.Queries; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbPersistenceLayer + { + public MicrowaveCosmosDb MicrowaveCosmosDb { get; set; } = new MicrowaveCosmosDb(); + + public IServiceCollection AddPersistenceLayer(IServiceCollection services, IEnumerable assemblies) + { + services.AddTransient(); + var cosmosDb = new CosmosDb(); + cosmosDb.InitializeCosmosDb().Wait(); + + services.AddTransient(); + services.AddTransient(); + services.AddSingleton(MicrowaveCosmosDb); + + services.AddTransient(); + services.AddTransient(); + + return services; + } + } + + public class MicrowaveCosmosDb + { + public string DatabaseUrl { get; set; } + public string PrimaryKey { get; set; } + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbReadModelRepository.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbReadModelRepository.cs new file mode 100644 index 00000000..3fa7a0ad --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbReadModelRepository.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.Persistence.MongoDb.Querries; +using Microwave.Queries; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbReadModelRepository : IReadModelRepository + { + private readonly ICosmosDb _cosmosDb; + private DocumentClient _client; + private string GetReadModelCollectionName() => $"ReadModelDbos_{typeof(T).Name}"; + private string GetQuerryCollectionName() => $"QueryDbos_{typeof(T).Name}"; + + public CosmosDbReadModelRepository(ICosmosDb cosmosDb) + { + _cosmosDb = cosmosDb; + _client = _cosmosDb.GetCosmosDbClient(); + } + + public async Task> LoadAsync() where T : Query + { + var name = typeof(T).Name; + var result = Result.NotFound(StringIdentity.Create(name)); + var queryResult = _client.CreateDocumentQuery>( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, GetQuerryCollectionName()), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.Type == name) + .AsEnumerable().FirstOrDefault(); + if (queryResult != null) + { + result = Result.Ok(queryResult.Payload); + } + + return result; + } + + public async Task> LoadAsync(Identity id) where T : ReadModelBase + { + var name = typeof(T).Name; + var queryResult = _client.CreateDocumentQuery>( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, GetReadModelCollectionName()), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.Id == id.Id) + .AsEnumerable().FirstOrDefault(); + if (queryResult == null) + { + return Result.NotFound(id); + } + + return Result.Ok(queryResult.Payload); + } + + public async Task SaveQueryAsync(T query) where T : Query + { + await _client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, GetQuerryCollectionName()), query); + return Result.Ok(); + } + + public async Task SaveReadModelAsync(T readModel) where T : ReadModelBase, new() + { + try + { + await _client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, GetReadModelCollectionName()), readModel); + } + catch (DocumentClientException) + { + var actualVersion = (await LoadAsync(readModel.Identity)).Value.Version; + return Result.ConcurrencyResult(readModel.Version, actualVersion); + } + return Result.Ok(); + } + + + public async Task>> LoadAllAsync() where T : ReadModelBase + { + var name = typeof(T).Name; + var queryResult = _client.CreateDocumentQuery>( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, GetReadModelCollectionName()), + new FeedOptions { MaxItemCount = -1 }) + .AsEnumerable(); + return Result>.Ok(queryResult.Select(rm => rm.Payload).ToList()); + } + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbSnapshotRepository.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbSnapshotRepository.cs new file mode 100644 index 00000000..83e11d3a --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbSnapshotRepository.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.Identities; +using Microwave.EventStores; +using Microwave.EventStores.Ports; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbSnapshotRepository : ISnapShotRepository + { + private readonly ICosmosDb _cosmosDb; + private DocumentClient _client; + + public CosmosDbSnapshotRepository(ICosmosDb cosmosDb) + { + _cosmosDb = cosmosDb; + _client = _cosmosDb.GetCosmosDbClient(); + } + + public async Task> LoadSnapShot(Identity entityId) where T : new() + { + var query = _client.CreateDocumentQuery>( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.SnapshotsCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.Id == entityId) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + var allSnapshots = new List>(); + foreach (var wrappedEvent in wrappedEvents) + { + var entity = JsonConvert.DeserializeObject(wrappedEvent.GetValue("Entity").ToString()); + var version = (long)wrappedEvent.GetValue(nameof(DomainEventWrapper.Version)); + Guid.TryParse(wrappedEvent.GetValue("id").ToString(), out var guid); + Identity identity = null; + if (guid != Guid.Empty) + { + identity = Identity.Create(guid); + } + else + { + identity = Identity.Create(wrappedEvent.GetValue("id").ToString()); + } + allSnapshots.Add(new SnapShotWrapper(entity, identity, version)); + } + var result = allSnapshots.Single(x => x.Version == allSnapshots.Max(s => s.Version)); + return new SnapShotResult(result.Entity, result.Version); + } + + public async Task SaveSnapShot(SnapShotWrapper snapShot) + { + await _client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.SnapshotsCollectionId), snapShot); + } + } +} diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbStatusRepository.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbStatusRepository.cs new file mode 100644 index 00000000..d409ab33 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbStatusRepository.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Discovery; +using Microwave.Discovery.EventLocations; +using Microwave.Discovery.ServiceMaps; +using Microwave.Domain.EventSourcing; +using Microwave.EventStores; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbStatusRepository : IStatusRepository + { + private readonly ICosmosDb _cosmosDb; + private DocumentClient _client; + + public CosmosDbStatusRepository(ICosmosDb cosmosDb) + { + _cosmosDb = cosmosDb; + _client = cosmosDb.GetCosmosDbClient(); + } + + public async Task SaveEventLocation(EventLocation eventLocation) + { + var previousEventLocation = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), + new FeedOptions {MaxItemCount = -1}).AsEnumerable().SingleOrDefault(); + + if (previousEventLocation == null) + { + await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), eventLocation); + } + else + { + await _client.ReplaceDocumentAsync( + UriFactory.CreateDocumentUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId, + previousEventLocation.ResourceId), eventLocation); + } + } + + public async Task GetEventLocation() + { + var eventLocation = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), + new FeedOptions { MaxItemCount = -1 }).AsEnumerable().FirstOrDefault(); + + return eventLocation; + } + + public async Task GetServiceMap() + { + var serviceMap = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), + new FeedOptions { MaxItemCount = -1 }).AsEnumerable().FirstOrDefault(); + + return serviceMap; + } + + public async Task SaveServiceMap(ServiceMap map) + { + var previousEventLocation = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), + new FeedOptions { MaxItemCount = -1 }).AsEnumerable().SingleOrDefault(); + + if (previousEventLocation == null) + { + await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId), map); + } + else + { + await _client.ReplaceDocumentAsync( + UriFactory.CreateDocumentUri(_cosmosDb.DatabaseId, _cosmosDb.StatusCollectionId, + previousEventLocation.ResourceId), map); + } + } + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbVersionCache.cs b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbVersionCache.cs new file mode 100644 index 00000000..b5e96370 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/CosmosDbVersionCache.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.Identities; +using Microwave.EventStores; +using Microwave.Persistence.MongoDb.Eventstores; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbVersionCache : IVersionCache + { + private readonly ICosmosDb _cosmosDb; + private DocumentClient _client; + private readonly ConcurrentDictionary _cache = new ConcurrentDictionary(); + + public CosmosDbVersionCache(ICosmosDb cosmosDb) + { + _cosmosDb = cosmosDb; + _client = cosmosDb.GetCosmosDbClient(); + } + + public async Task Get(Identity entityId) + { + if (!_cache.TryGetValue(entityId, out var version)) + { + var actualVersion = await GetVersionFromDb(entityId); + _cache[entityId] = actualVersion; + return actualVersion; + } + + return version; + } + + private async Task GetVersionFromDb(Identity entityId) + { + var events = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.EventsCollectionId), + new FeedOptions {MaxItemCount = -1}) + .Where(e => e.DomainEvent.EntityId.Id == entityId.Id); + var actualVersion = events.Max(e => e.Version); + return actualVersion; + } + + public async Task GetForce(Identity entityId) + { + var actualVersion = await GetVersionFromDb(entityId); + _cache[entityId] = actualVersion; + return actualVersion; + } + + public void Update(Identity entityId, long actualVersion) + { + _cache[entityId] = actualVersion; + } + } +} diff --git a/Microwave.Eventstores.Persistence.CosmosDb/IAssemblyProvider.cs b/Microwave.Eventstores.Persistence.CosmosDb/IAssemblyProvider.cs new file mode 100644 index 00000000..afc9e3bc --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/IAssemblyProvider.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; +using System.Reflection; + +namespace Microwave.Persistence.CosmosDb +{ + public interface IAssemblyProvider + { + IEnumerable GetAssemblies(); + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs b/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs new file mode 100644 index 00000000..22838af2 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs @@ -0,0 +1,17 @@ +using Microsoft.Azure.Documents.Client; + +namespace Microwave.Persistence.CosmosDb +{ + public interface ICosmosDb + { + string DatabaseId { get; } + + string EventsCollectionId { get; } + string SnapshotsCollectionId { get; } + string ServiceMapCollectionId { get; } + + string StatusCollectionId { get; } + string VersionCollectionId { get; } + DocumentClient GetCosmosDbClient(); + } +} \ No newline at end of file diff --git a/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj b/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj new file mode 100644 index 00000000..2cf43180 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Eventstores.Persistence.CosmosDb.csproj @@ -0,0 +1,16 @@ + + + + netcoreapp2.1 + + + + + + + + + + + + diff --git a/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj b/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj new file mode 100644 index 00000000..8f3ee4da --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + full + + + + + + + + + + + + + diff --git a/Microwave.Eventstores.Persistence.CosmosDb/VersionRepositoryMongoDb.cs b/Microwave.Eventstores.Persistence.CosmosDb/VersionRepositoryMongoDb.cs new file mode 100644 index 00000000..addec260 --- /dev/null +++ b/Microwave.Eventstores.Persistence.CosmosDb/VersionRepositoryMongoDb.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.EventStores; +using Microwave.Persistence.MongoDb; +using Microwave.Persistence.MongoDb.Querries; +using Microwave.Queries.Ports; +using MongoDB.Driver; +using Newtonsoft.Json.Linq; + +namespace Microwave.Persistence.CosmosDb +{ + public class CosmosDbVersionRepository : IVersionRepository + { + private readonly ICosmosDb _cosmosDb; + private readonly string _lastProcessedVersions = "LastProcessedVersions"; + private DocumentClient _client; + + public CosmosDbVersionRepository(ICosmosDb cosmosDb) + { + _cosmosDb = cosmosDb; + _client = cosmosDb.GetCosmosDbClient(); + } + + public async Task GetVersionAsync(string domainEventType) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(_cosmosDb.DatabaseId, _cosmosDb.VersionCollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.EventType == domainEventType) + .AsDocumentQuery(); + var versions = new List(); + while (query.HasMoreResults) + { + versions.AddRange(await query.ExecuteNextAsync()); + } + + var lastProcessedVersion = versions.Where(version => version.EventType == domainEventType).FirstOrDefault(); + if (lastProcessedVersion == null) return DateTimeOffset.MinValue; + return lastProcessedVersion.LastVersion; + } + + public async Task SaveVersion(LastProcessedVersion version) + { + await _client.UpsertDocumentAsync(_cosmosDb.VersionCollectionId, version); + } + } +} \ No newline at end of file diff --git a/Microwave.Persistence.CosmosDb/CosmosDb.cs b/Microwave.Persistence.CosmosDb/CosmosDb.cs new file mode 100644 index 00000000..2452dd8e --- /dev/null +++ b/Microwave.Persistence.CosmosDb/CosmosDb.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microwave.Domain; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDb : ICosmosDb + { + + private readonly IMicrowaveConfiguration _configuration; + + public CosmosDb(IMicrowaveConfiguration configuration) + { + _configuration = configuration; + } + + public DocumentClient GetCosmosDbClient() + { + return new DocumentClient(new Uri(_configuration.DatabaseConfiguration.ConnectionString), + _configuration.DatabaseConfiguration.PrimaryKey); + } + } +} \ No newline at end of file diff --git a/Microwave.Persistence.CosmosDb/CosmosDbClient.cs b/Microwave.Persistence.CosmosDb/CosmosDbClient.cs new file mode 100644 index 00000000..28d9fad8 --- /dev/null +++ b/Microwave.Persistence.CosmosDb/CosmosDbClient.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDbClient : ICosmosDbClient + { + private readonly DocumentClient _client; + private IEnumerable _domainEventTypes; + private const string DatabaseId = "Eventstore"; + private const string CollectionId = "DomainEvents"; + + public CosmosDbClient(ICosmosDb cosmosDb, IEnumerable assemblies) + { + _client = cosmosDb.GetCosmosDbClient(); + var type = typeof(IDomainEvent); + _domainEventTypes = assemblies + .SelectMany(s => s.GetTypes()) + .Where(p => type.IsAssignableFrom(p)); + + } + + public async Task InitializeCosmosDbAsync() + { + var database = await _client.CreateDatabaseIfNotExistsAsync(new Database { Id = DatabaseId }); + var collection = await _client.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(DatabaseId), + new DocumentCollection { Id = CollectionId }); + if (database == null || collection == null) + { + throw new ArgumentException("Could not create Database or Collection with given CosmosDb Configuration Parameters!"); + } + } + + + public async Task CreateDomainEventAsync(IDomainEvent domainEvent) + { + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + await _client.CreateDocumentAsync(uri, domainEvent); + + } + + + public async Task> GetDomainEventsAsync(Identity identity) + { + var query = _client.CreateDocumentQuery( + UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), + new FeedOptions { MaxItemCount = -1 }) + .Where(e => e.DomainEvent.EntityId == identity) + .AsDocumentQuery(); + + var wrappedEvents = new List(); + while (query.HasMoreResults) + { + wrappedEvents.AddRange(await query.ExecuteNextAsync()); + } + + var result = wrappedEvents.Select(e => JsonConvert.DeserializeObject(e.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == e.GetValue("DomainEventType").ToString()))).ToList(); + return new List(); + } + + + public async Task>> GetDomainEventsAsync(DateTimeOffset tickSince) + { + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + var query = _client.CreateDocumentQuery(uri, queryOptions) + .Where(e => e.Created > tickSince); + return Result>.Ok(query.ToList()); + } + + public async Task CreateItemAsync(DomainEventWrapper domainEvent) + { + return await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), domainEvent); + } + + public async Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince) + { + FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId); + var query = _client.CreateDocumentQuery(uri, queryOptions) + .Where(e => e.DomainEventType == eventType); + return Result>.Ok(query.ToList()); + } + } + +} \ No newline at end of file diff --git a/Microwave.Persistence.CosmosDb/CosmosDbEventRepository.cs b/Microwave.Persistence.CosmosDb/CosmosDbEventRepository.cs new file mode 100644 index 00000000..d31e1c1b --- /dev/null +++ b/Microwave.Persistence.CosmosDb/CosmosDbEventRepository.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Microwave.EventStores.Ports; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public class CosmosDbEventRepository : IEventRepository + { + private readonly ICosmosDbClient _cosmosDbClient; + + + public CosmosDbEventRepository(ICosmosDbClient cosmosDbClient) + { + _cosmosDbClient = cosmosDbClient; + } + + public async Task>> LoadEventsByEntity(Identity entityId, long @from = 0) + { + throw new NotImplementedException(); + //var uri = CreateUriForCosmosDb(entityId); + //var domainEvents = (await _client.ReadDocumentAsync>(uri)).Document; + //return new EventStoreResult>(domainEvents, domainEvents.Max(e => e.Version)); + } + + public async Task AppendAsync(IEnumerable domainEvents, long currentEntityVersion) + { + foreach (var domainEvent in domainEvents) + { + + await _cosmosDbClient.CreateDomainEventAsync(domainEvent); + } + + return Result.Ok(); + } + + public async Task>> LoadEvents(DateTimeOffset tickSince = default(DateTimeOffset)) + { + var result = await _cosmosDbClient.GetDomainEventsAsync(tickSince); + if (result.Value.Any()) + { + return Result>.Ok(result.Value); + } + else + { + return Result>.NotFound(null); + } + } + + public async Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince = default(DateTimeOffset)) + { + var result = _cosmosDbClient.LoadEventsByTypeAsync(eventType, tickSince); + return Result>.Ok(result.Result.Value); + } + + public async Task> GetLastEventOccuredOn(string domainEventType) + { + throw new NotImplementedException(); + //FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 }; + //var uri = UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionId); + //var query = _client.CreateDocumentQuery(uri, queryOptions).ToList(); + //var latestEventTime = query.Max(e => e.Created); + + //return Result.Ok(latestEventTime); + } + + private Uri CreateUriForCosmosDb(Identity identity) + { + //return UriFactory.CreateDocumentUri(DatabaseName, CollectionId, identity.Id); + return null; + } + } +} diff --git a/Microwave.Persistence.CosmosDb/ICosmosDb.cs b/Microwave.Persistence.CosmosDb/ICosmosDb.cs new file mode 100644 index 00000000..764f5442 --- /dev/null +++ b/Microwave.Persistence.CosmosDb/ICosmosDb.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public interface ICosmosDb + { + DocumentClient GetCosmosDbClient(); + } +} \ No newline at end of file diff --git a/Microwave.Persistence.CosmosDb/ICosmosDbClient.cs b/Microwave.Persistence.CosmosDb/ICosmosDbClient.cs new file mode 100644 index 00000000..3dc3755f --- /dev/null +++ b/Microwave.Persistence.CosmosDb/ICosmosDbClient.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; + +namespace Microwave.Eventstores.Persistence.CosmosDb +{ + public interface ICosmosDbClient + { + Task CreateDomainEventAsync(IDomainEvent domainEvent); + Task>> GetDomainEventsAsync(DateTimeOffset tickSince); + Task>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince); + Task CreateItemAsync(DomainEventWrapper domainEvent); + Task> GetDomainEventsAsync(Identity identity); + } +} \ No newline at end of file diff --git a/Microwave.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj b/Microwave.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj new file mode 100644 index 00000000..3eeae904 --- /dev/null +++ b/Microwave.Persistence.CosmosDb/Microwave.Persistence.CosmosDb.csproj @@ -0,0 +1,17 @@ + + + + netcoreapp2.1 + Microwave.Persistence.CosmosDb + Microwave.Persistence.CosmosDb + + + + + + + + + + + diff --git a/Microwave.sln b/Microwave.sln index a5e3a0fe..a6c73e4a 100644 --- a/Microwave.sln +++ b/Microwave.sln @@ -62,6 +62,8 @@ ProjectSection(SolutionItems) = preProject .gitignore = .gitignore EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microwave.Persistence.CosmosDb", "Microwave.Eventstores.Persistence.CosmosDb\Microwave.Persistence.CosmosDb.csproj", "{0A4D3089-A5D1-4D3C-9F11-BD1D0E4DFC0C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -160,6 +162,10 @@ Global {A1C79B11-FAAB-4D3F-822F-08C93C16D4BE}.Debug|Any CPU.Build.0 = Debug|Any CPU {A1C79B11-FAAB-4D3F-822F-08C93C16D4BE}.Release|Any CPU.ActiveCfg = Release|Any CPU {A1C79B11-FAAB-4D3F-822F-08C93C16D4BE}.Release|Any CPU.Build.0 = Release|Any CPU + {0A4D3089-A5D1-4D3C-9F11-BD1D0E4DFC0C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0A4D3089-A5D1-4D3C-9F11-BD1D0E4DFC0C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0A4D3089-A5D1-4D3C-9F11-BD1D0E4DFC0C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0A4D3089-A5D1-4D3C-9F11-BD1D0E4DFC0C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Microwave/MicrowaveConfiguration.cs b/Microwave/MicrowaveConfiguration.cs index 88f292d0..de7da336 100644 --- a/Microwave/MicrowaveConfiguration.cs +++ b/Microwave/MicrowaveConfiguration.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using Microwave.Discovery; using Microwave.EventStores.SnapShots; using Microwave.Queries.Polling; using Microwave.WebApi; diff --git a/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/CosmosDbEventRepositoryTests.cs b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/CosmosDbEventRepositoryTests.cs new file mode 100644 index 00000000..80e3b50f --- /dev/null +++ b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/CosmosDbEventRepositoryTests.cs @@ -0,0 +1,165 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Security; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microwave.Domain.EventSourcing; +using Microwave.Domain.Exceptions; +using Microwave.Domain.Identities; +using Microwave.Domain.Results; +using Microwave.EventStores; +using Microwave.EventStores.Ports; +using Microwave.Persistence.CosmosDb; + +namespace Microwave.Persistence.CosmosDb.UnitTests +{ + [TestClass] + public class CosmosDbEventRepositoryTests : IntegrationTests + { + [TestMethod] + public async Task DomainEventIsAppendedCorrectly() + { + var cosmosDbClient = new CosmosDbClient( + Database, + new List + { + Assembly.GetAssembly(typeof(UserCreatedEvent)) + }); + + await Database.InitializeCosmosDb(); + + var eventRepository = new CosmosDbEventRepository(cosmosDbClient, Database, new List { Assembly.GetAssembly(typeof(UserCreatedEvent)) }); + var domainEvent = new UserCreatedEvent(GuidIdentity.Create(Guid.NewGuid()), "Hans Wurst"); + await cosmosDbClient.CreateItemAsync(new DomainEventWrapper + { + Created = DateTimeOffset.Now, + Version = 0, + DomainEvent = domainEvent + }); + + } + + [TestMethod] + public async Task DomainEventReturnsConcurrencyResult() + { + var cosmosDbClient = new CosmosDbClient( + Database, + new List + { + Assembly.GetAssembly(typeof(UserCreatedEvent)) + }); + + await Database.InitializeCosmosDb(); + var entityGuid = Guid.NewGuid(); + var eventRepository = new CosmosDbEventRepository(cosmosDbClient, Database, new List { Assembly.GetAssembly(typeof(UserCreatedEvent)) }); + var domainEvent = new UserCreatedEvent(GuidIdentity.Create(entityGuid), "Hans Wurst"); + await cosmosDbClient.CreateItemAsync(new DomainEventWrapper + { + Created = DateTimeOffset.Now, + Version = 0, + DomainEvent = domainEvent + }); + var result = await cosmosDbClient.CreateItemAsync(new DomainEventWrapper + { + Created = DateTimeOffset.Now, + Version = 0, + DomainEvent = domainEvent + }); + Assert.ThrowsException(() => result.Check()); + + } + + [TestMethod] + public async Task DomainEventsAreGettedCorrectly() + { + var cosmosDbClient = new CosmosDbClient( + Database, + new List + { + Assembly.GetAssembly(typeof(UserCreatedEvent)) + }); + + await Database.InitializeCosmosDb(); + var entityGuid = Guid.NewGuid(); + var domainEvent = new UserCreatedEvent(GuidIdentity.Create(entityGuid), "Hans Wurst"); + await cosmosDbClient.CreateItemAsync(new DomainEventWrapper + { + Created = DateTimeOffset.Now, + Version = 0, + DomainEvent = domainEvent + }); + var eventRepository = new CosmosDbEventRepository(cosmosDbClient, Database, new List{ Assembly.GetAssembly(typeof(UserCreatedEvent)) } ); + var result = await cosmosDbClient.GetDomainEventsAsync(Identity.Create(entityGuid), 0); + + Assert.AreEqual(result.Count(), 1); + } + + [TestMethod] + public async Task SnapShotIsCreatedSuccesfully() + { + var cosmosDbClient = new CosmosDbClient( + Database, + new List + { + Assembly.GetAssembly(typeof(UserCreatedEvent)) + }); + + await Database.InitializeCosmosDb(); + + await cosmosDbClient.SaveSnapshotAsync(new SnapShotWrapper(new TestUser + { + UserName = "TestUser", + Age = 28 + }, + new GuidIdentity(Guid.NewGuid().ToString()), 1)); + } + + [TestMethod] + public async Task SnapSHotAreGettedCorrectly() + { + var cosmosDbClient = new CosmosDbClient( + Database, + new List + { + Assembly.GetAssembly(typeof(UserCreatedEvent)) + }); + + await Database.InitializeCosmosDb(); + + var eventRepository = new CosmosDbEventRepository(cosmosDbClient, Database, new List { Assembly.GetAssembly(typeof(UserCreatedEvent)) }); + + var entityGuid = Guid.NewGuid(); + await cosmosDbClient.SaveSnapshotAsync(new SnapShotWrapper(new TestUser + { + UserName = "TestUser", + Age = 28 + }, + new GuidIdentity(entityGuid.ToString()), 1)); + + //var result = await eventRepository.LoadSnapshotAsync(Identity.Create(entityGuid)); + + //Assert.AreEqual(result.Entity.Age, 28); + //Assert.AreEqual(result.Entity.UserName, "TestUser"); + } + } + + public class TestUser + { + public string UserName { get; set; } + public int Age { get; set; } + } + + public class UserCreatedEvent : IDomainEvent + { + public Identity EntityId { get; } + public string Username { get; } + + public UserCreatedEvent(GuidIdentity entityId, string name) + { + EntityId = entityId; + Username = name; + } + } +} diff --git a/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/IntegrationTests.cs b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/IntegrationTests.cs new file mode 100644 index 00000000..6e51ff19 --- /dev/null +++ b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/IntegrationTests.cs @@ -0,0 +1,37 @@ +using System; +using System.Security; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Microwave.Persistence.CosmosDb.UnitTests +{ + public class IntegrationTests + { + protected CosmosDb Database; + + public SecureString PrimaryKey + { + get + { + var secure = new SecureString(); + foreach (char c in "mCPtXM99gxlUalpz6bkFiWib2QD2OvIB9oEYj8tlpCPz1I4jSkOzlhJGnxAAEH4uiqWiYZ7enElzAM0lopKlJA==") + { + secure.AppendChar(c); + } + + return secure; + } + } + + public Uri CosmosDbLocation => new Uri("https://spoppinga.documents.azure.com:443/"); + + [TestInitialize] + public void SetupCosmosDb() + { + Database = new CosmosDb + { + PrimaryKey = PrimaryKey, + CosmosDbLocation = CosmosDbLocation + }; + } + } +} \ No newline at end of file diff --git a/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/Microwave.Persistence.CosmosDb.UnitTests.csproj b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/Microwave.Persistence.CosmosDb.UnitTests.csproj new file mode 100644 index 00000000..79ed01fb --- /dev/null +++ b/TestProjects/Microwave.Persistence.CosmosDb.UnitTests/Microwave.Persistence.CosmosDb.UnitTests.csproj @@ -0,0 +1,27 @@ + + + + netcoreapp2.2 + false + full + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + diff --git a/TestProjects/Microwave.Persistence.MongoDb.UnitTestsSetup/CosmosDbTestSetup.cs b/TestProjects/Microwave.Persistence.MongoDb.UnitTestsSetup/CosmosDbTestSetup.cs new file mode 100644 index 00000000..c4294919 --- /dev/null +++ b/TestProjects/Microwave.Persistence.MongoDb.UnitTestsSetup/CosmosDbTestSetup.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Security; +using Microwave.Discovery; +using Microwave.Discovery.EventLocations; +using Microwave.EventStores; +using Microwave.EventStores.Ports; +using Microwave.Persistence.CosmosDb; +using Microwave.Persistence.MongoDb.Eventstores; +using Microwave.Persistence.MongoDb.Querries; +using Microwave.Persistence.UnitTestSetupPorts; +using Microwave.Queries; +using Microwave.Queries.Ports; + +namespace Microwave.Persistence.MongoDb.UnitTestsSetup +{ + public class CosmosDbTestSetup : IPersistenceLayerProvider + { + public CosmosDbTestSetup() + { + CosmosDb = new CosmosDb.CosmosDb(); + CosmosDb.PrimaryKey = PrimaryKey; + CosmosDb.CosmosDbLocation = CosmosDbLocation; + CosmosDb.InitializeCosmosDb(); + CosmosDbClient = new CosmosDbClient(CosmosDb, new List{ Assembly.GetCallingAssembly() }); + + } + + public CosmosDbClient CosmosDbClient { get; } + public CosmosDb.CosmosDb CosmosDb { get; } + public IVersionRepository VersionRepository { get; } + public IStatusRepository StatusRepository => new CosmosDbStatusRepository(CosmosDb); + public IReadModelRepository ReadModelRepository => new CosmosDbReadModelRepository(CosmosDb); + public ISnapShotRepository SnapShotRepository => new CosmosDbSnapshotRepository(CosmosDb); + public IEventRepository EventRepository => new CosmosDbEventRepository(CosmosDbClient, CosmosDb, new List { Assembly.GetCallingAssembly() }); + + public SecureString PrimaryKey + { + get + { + var secure = new SecureString(); + foreach (char c in "mCPtXM99gxlUalpz6bkFiWib2QD2OvIB9oEYj8tlpCPz1I4jSkOzlhJGnxAAEH4uiqWiYZ7enElzAM0lopKlJA==") + { + secure.AppendChar(c); + } + + return secure; + } + } + + public Uri CosmosDbLocation => new Uri("https://spoppinga.documents.azure.com:443/"); + } + +} \ No newline at end of file diff --git a/TestProjects/Microwave.Persistence.UnitTests/PersistenceTypeTestAttribute.cs b/TestProjects/Microwave.Persistence.UnitTests/PersistenceTypeTestAttribute.cs index c88eb9da..e026a120 100644 --- a/TestProjects/Microwave.Persistence.UnitTests/PersistenceTypeTestAttribute.cs +++ b/TestProjects/Microwave.Persistence.UnitTests/PersistenceTypeTestAttribute.cs @@ -3,6 +3,7 @@ using System.Globalization; using System.Reflection; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microwave.Persistence.UnitTestsSetup.CosmosDb; using Microwave.Persistence.UnitTestsSetup.InMemory; using Microwave.Persistence.UnitTestsSetup.MongoDb; @@ -14,6 +15,7 @@ public IEnumerable GetData(MethodInfo methodInfo) { yield return new object[] { new MongoDbTestSetup() }; yield return new object[] { new InMemroyTestSetup() }; + yield return new object[] { new CosmosDbTestSetup(), }; } public string GetDisplayName(MethodInfo methodInfo, object[] data) diff --git a/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/AssemblyProvider.cs b/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/AssemblyProvider.cs new file mode 100644 index 00000000..d7e2c34f --- /dev/null +++ b/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/AssemblyProvider.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; +using System.Reflection; +using Microwave.Persistence.CosmosDb; +using Microwave.Persistence.UnitTestsSetup.MongoDb; + +namespace Microwave.Persistence.UnitTestsSetup.CosmosDb +{ + public class AssemblyProvider : IAssemblyProvider + { + public IEnumerable GetAssemblies() + { + return new List + { + Assembly.GetAssembly(typeof(TestEvent1)) + }; + } + } +} \ No newline at end of file diff --git a/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/CosmosDbTestSetup.cs b/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/CosmosDbTestSetup.cs new file mode 100644 index 00000000..92adfd48 --- /dev/null +++ b/TestProjects/Microwave.Persistence.UnitTestsSetup/CosmosDb/CosmosDbTestSetup.cs @@ -0,0 +1,45 @@ +using System; +using System.Security; +using Microwave.Discovery; +using Microwave.EventStores.Ports; +using Microwave.Persistence.CosmosDb; +using Microwave.Queries; +using Microwave.Queries.Ports; + +namespace Microwave.Persistence.UnitTestsSetup.CosmosDb +{ + public class CosmosDbTestSetup : PersistenceLayerProvider + { + public CosmosDbTestSetup() + { + CosmosDb = new Persistence.CosmosDb.CosmosDb(); + CosmosDb.PrimaryKey = PrimaryKey; + CosmosDb.CosmosDbLocation = CosmosDbLocation; + CosmosDb.InitializeCosmosDb().Wait(); + } + + public SecureString PrimaryKey + { + get + { + var secure = new SecureString(); + foreach (char c in "mCPtXM99gxlUalpz6bkFiWib2QD2OvIB9oEYj8tlpCPz1I4jSkOzlhJGnxAAEH4uiqWiYZ7enElzAM0lopKlJA==") + { + secure.AppendChar(c); + } + + return secure; + } + } + + public Uri CosmosDbLocation => new Uri("https://spoppinga.documents.azure.com:443/"); + + public Persistence.CosmosDb.CosmosDb CosmosDb { get; } + public override IVersionRepository VersionRepository => new CosmosDbVersionRepository(CosmosDb); + public override IStatusRepository StatusRepository => new CosmosDbStatusRepository(CosmosDb); + public override IReadModelRepository ReadModelRepository => new CosmosDbReadModelRepository(CosmosDb); + public override ISnapShotRepository SnapShotRepository => new CosmosDbSnapshotRepository(CosmosDb); + public override IEventRepository EventRepository => new CosmosDbEventRepository(CosmosDb, new AssemblyProvider(), new CosmosDbVersionCache(CosmosDb)); + } + +} \ No newline at end of file diff --git a/TestProjects/Microwave.Persistence.UnitTestsSetup/Microwave.Persistence.UnitTestsSetup.csproj b/TestProjects/Microwave.Persistence.UnitTestsSetup/Microwave.Persistence.UnitTestsSetup.csproj index cbb6305e..4bb3714b 100644 --- a/TestProjects/Microwave.Persistence.UnitTestsSetup/Microwave.Persistence.UnitTestsSetup.csproj +++ b/TestProjects/Microwave.Persistence.UnitTestsSetup/Microwave.Persistence.UnitTestsSetup.csproj @@ -15,6 +15,7 @@ + diff --git a/TestServices/ServerConfig/ServiceConfiguration.cs b/TestServices/ServerConfig/ServiceConfiguration.cs index a1968025..51111705 100644 --- a/TestServices/ServerConfig/ServiceConfiguration.cs +++ b/TestServices/ServerConfig/ServiceConfiguration.cs @@ -1,5 +1,6 @@ using System; using Microwave; +using Microwave.Discovery; namespace ServerConfig { diff --git a/TestServices/WriteService2/ApiKeyRequirement.cs b/TestServices/WriteService2/ApiKeyRequirement.cs index 24698d66..0b9008ae 100644 --- a/TestServices/WriteService2/ApiKeyRequirement.cs +++ b/TestServices/WriteService2/ApiKeyRequirement.cs @@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.Filters; -namespace ReadService1 +namespace WriteService2 { public class ApiKeyRequirement : AuthorizationHandler, IAuthorizationRequirement { diff --git a/TestServices/WriteService2/Startup.cs b/TestServices/WriteService2/Startup.cs index d0b30c60..d346f793 100644 --- a/TestServices/WriteService2/Startup.cs +++ b/TestServices/WriteService2/Startup.cs @@ -10,7 +10,6 @@ using Microwave.Persistence.InMemory; using Microwave.Persistence.MongoDb; using Microwave.UI; -using ReadService1; using ServerConfig; namespace WriteService2