Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ namespace Eventuous.Projections.MongoDB;
using Tools;

[Obsolete("Use MongoProjector instead")]
public abstract class MongoProjection<T>(IMongoDatabase database, ITypeMapper? typeMap = null) : MongoProjector<T>(database, typeMap)
public abstract class MongoProjection<T>(IMongoDatabase database, ITypeMapper? typeMap = null) : MongoProjector<T>(database, null, typeMap)
where T : ProjectedDocument;

public record MongoProjectionOptions<T> where T : Document {
public string CollectionName { get; set; } = MongoCollectionName.For<T>();
}

/// <summary>
/// Base class for MongoDB projectors. Specify your event handlers in the constructor using <code>On</code> methods family.
/// </summary>
/// <typeparam name="T"></typeparam>
[UsedImplicitly]
public abstract class MongoProjector<T>(IMongoDatabase database, ITypeMapper? typeMap = null) : BaseEventHandler where T : ProjectedDocument {
public abstract class MongoProjector<T>(IMongoDatabase database, MongoProjectionOptions<T>? options = null, ITypeMapper? typeMap = null)
: BaseEventHandler where T : ProjectedDocument {
[PublicAPI]
protected IMongoCollection<T> Collection { get; } = Ensure.NotNull<IMongoDatabase>(database).GetDocumentCollection<T>();
protected IMongoCollection<T> Collection { get; } =
options != null ? Ensure.NotNull(database).GetCollection<T>(options?.CollectionName) : Ensure.NotNull<IMongoDatabase>(database).GetDocumentCollection<T>();

readonly Dictionary<Type, ProjectUntypedEvent> _handlers = new();
readonly ITypeMapper _map = typeMap ?? TypeMap.Instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
namespace Eventuous.Tests.Projections.MongoDB;

[ClassDataSource<IntegrationFixture>]
public class ProjectWithBuilder(IntegrationFixture fixture) : ProjectionTestBase<ProjectWithBuilder.SutProjection>(nameof(ProjectWithBuilder), fixture) {
public class ProjectWithBuilder(IntegrationFixture fixture) {
[Test]
public async Task ShouldProjectImported() {
var evt = DomainFixture.CreateImportBooking();
var id = new BookingId(CreateId());
var stream = StreamNameFactory.For<Booking, BookingState, BookingId>(id);
[MethodDataSource(typeof(CollectionSource), nameof(CollectionSource.TestOptions))]
public async Task ShouldProjectImported(MongoProjectionOptions<BookingDocument>? options) {
var projectionFixture = new ProjectionTestBase<SutProjection>(nameof(ProjectWithBuilder), fixture);
var evt = DomainFixture.CreateImportBooking();
var id = new BookingId(projectionFixture.CreateId());
var stream = StreamNameFactory.For<Booking, BookingState, BookingId>(id);

await projectionFixture.InitializeAsync();

var first = await Act(stream, evt);
var first = await Act(projectionFixture, stream, evt);

var expected = new BookingDocument(id.ToString()) {
RoomId = evt.RoomId,
Expand All @@ -29,9 +33,11 @@ public async Task ShouldProjectImported() {

first.Doc.Should().BeEquivalentTo(expected);

var payment = new BookingPaymentRegistered(Fixture.Auto.Create<string>(), evt.Price);
var payment = new BookingPaymentRegistered(projectionFixture.Fixture.Auto.Create<string>(), evt.Price);

var second = await Act(stream, payment);
var second = await Act(projectionFixture, stream, payment);

await projectionFixture.DisposeAsync();

expected = expected with {
PaidAmount = payment.AmountPaid,
Expand All @@ -42,18 +48,16 @@ public async Task ShouldProjectImported() {
second.Doc.Should().BeEquivalentTo(expected);
}

async Task<(AppendEventsResult Append, BookingDocument? Doc)> Act<T>(StreamName stream, T evt)
where T : class {
var append = await Fixture.AppendEvent(stream, evt);
await WaitForPosition(append.GlobalPosition);
var actual = await Fixture.Mongo.LoadDocument<BookingDocument>(stream.GetId());
static async Task<(AppendEventsResult Append, BookingDocument? Doc)> Act<T>(ProjectionTestBase<SutProjection> f, StreamName stream, T evt) where T : class {
var append = await f.Fixture.AppendEvent(stream, evt);
await f.WaitForPosition(append.GlobalPosition);
var actual = await f.Fixture.Mongo.LoadDocument<BookingDocument>(stream.GetId());

return (append, actual);
}

public class SutProjection : MongoProjector<BookingDocument> {
public SutProjection(IMongoDatabase database)
: base(database) {
public SutProjection(IMongoDatabase database) : base(database) {
On<BookingImported>(
b => b
.InsertOne
Expand Down Expand Up @@ -88,3 +92,10 @@ public SutProjection(IMongoDatabase database)
}
}
}

public static class CollectionSource {
public static IEnumerable<MongoProjectionOptions<BookingDocument>?> TestOptions() {
yield return null;
yield return new() { CollectionName = "test" };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Eventuous.Tests.Projections.MongoDB;
public class ProjectWithBulkBuilder(IntegrationFixture fixture) : ProjectionTestBase<ProjectWithBulkBuilder.SutBulkProjection>(nameof(ProjectWithBulkBuilder), fixture) {
[Test]
public async Task ShouldProjectImported() {
await InitializeAsync();
var evt = DomainFixture.CreateImportBooking();
var id = new BookingId(CreateId());
var stream = StreamNameFactory.For<Booking, BookingState, BookingId>(id);
Expand All @@ -32,6 +33,7 @@ public async Task ShouldProjectImported() {
var payment = new BookingPaymentRegistered(Fixture.Auto.Create<string>(), evt.Price);

var second = await Act(stream, payment);
await DisposeAsync();

expected = expected with {
PaidAmount = payment.AmountPaid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public sealed class ProjectingWithTypedHandlers(IntegrationFixture fixture)
: ProjectionTestBase<ProjectingWithTypedHandlers.SutProjection>(nameof(ProjectingWithTypedHandlers), fixture) {
[Test]
public async Task ShouldProjectImported(CancellationToken cancellationToken) {
await InitializeAsync();
var evt = DomainFixture.CreateImportBooking();
var id = new BookingId(CreateId());
var stream = StreamNameFactory.For<Booking, BookingState, BookingId>(id);
Expand All @@ -32,6 +33,8 @@ public async Task ShouldProjectImported(CancellationToken cancellationToken) {

var actual = await Fixture.Mongo.LoadDocument<BookingDocument>(id.ToString(), cancellationToken: cancellationToken);
actual.Should().Be(expected);

await DisposeAsync();
}

public class SutProjection : MongoProjector<BookingDocument> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@ protected ProjectionTestBase(string id) {

protected abstract void ConfigureServices(IServiceCollection services, string id);

[Before(Test)]
public async Task InitializeAsync() {
_builder.ConfigureServices(collection => ConfigureServices(collection, _id));
Host = _builder.Build();
Host.Services.AddEventuousLogs();
await Host.StartAsync();
}

[After(Test)]
public async Task DisposeAsync() => await Host.StopAsync();
}

public abstract class ProjectionTestBase<TProjection>(string id, IntegrationFixture fixture) : ProjectionTestBase(id)
public class ProjectionTestBase<TProjection>(string id, IntegrationFixture fixture) : ProjectionTestBase(id)
where TProjection : class, IEventHandler {
protected readonly IntegrationFixture Fixture = fixture;
public readonly IntegrationFixture Fixture = fixture;

protected override void ConfigureServices(IServiceCollection services, string id)
=> services
Expand All @@ -47,9 +45,9 @@ protected override void ConfigureServices(IServiceCollection services, string id
builder => builder.AddEventHandler<TProjection>()
);

protected string CreateId() => new(Guid.NewGuid().ToString("N"));
public string CreateId() => new(Guid.NewGuid().ToString("N"));

protected async Task WaitForPosition(ulong position) {
public async Task WaitForPosition(ulong position) {
var checkpointStore = Host.Services.GetRequiredService<ICheckpointStore>();
var count = 100;

Expand Down
Loading