Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
name: "Build and test"
# runs-on: ubuntu-latest
runs-on: self-hosted
strategy:
matrix:
dotnet-version: [ '8.0', '9.0' ]
env:
NUGET_PACKAGES: ${{ github.workspace }}/.nuget/packages
TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }}
Expand All @@ -32,27 +35,29 @@ jobs:
name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
8.0.x
9.0.x
dotnet-version: ${{ matrix.dotnet-version }}
-
name: Restore
run: |
dotnet restore -p:TargetFramework=net${{ matrix.dotnet-version }} -p:Configuration="Debug CI"
-
name: Build
run: |
dotnet build -c "Debug CI"
dotnet build -c "Debug CI" -f net${{ matrix.dotnet-version }} --no-restore
-
name: Prepare Testcontainers Cloud agent
if: env.TC_CLOUD_TOKEN != ''
uses: atomicjar/testcontainers-cloud-setup-action@main
-
name: Run tests
run: |
dotnet test -c "Debug CI" --no-build
dotnet test -c "Debug CI" --no-build -f net${{ matrix.dotnet-version }}
-
name: Upload Test Results
if: always()
uses: actions/upload-artifact@v4
with:
name: Test Results ${{ matrix.dotnet }}
name: Test Results ${{ matrix.dotnet-version }}
path: |
test-results/**/*.xml
test-results/**/*.trx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ.All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;
using Eventuous.Shared;
using static Eventuous.CommandServiceDelegates;

namespace Eventuous;
Expand Down Expand Up @@ -38,46 +40,43 @@ public interface IDefineIdentity<out TCommand, out TAggregate, out TState, TId>
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetIdAsync(Func<TCommand, CancellationToken, ValueTask<TId>> getId);
}

public interface IDefineStore<out TCommand, out TAggregate, out TState, TId>
public interface IDefineStore<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event store from the command. It assigns both reader and writer.
/// If not defined, the reader and writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveStore">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveStore(Func<TCommand, IEventStore> resolveStore);
IDefineExecution<TCommand, TAggregate, TState> ResolveStore(Func<TCommand, IEventStore> resolveStore);
}

public interface IDefineReader<out TCommand, out TAggregate, out TState, TId>
public interface IDefineReader<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event reader from the command.
/// If not defined, the reader provided by the functional service will be used.
/// </summary>
/// <param name="resolveReader">Function to resolve the event reader</param>
/// <returns></returns>
IDefineWriter<TCommand, TAggregate, TState, TId> ResolveReader(Func<TCommand, IEventReader> resolveReader);
IDefineWriter<TCommand, TAggregate, TState> ResolveReader(Func<TCommand, IEventReader> resolveReader);
}

public interface IDefineWriter<out TCommand, out TAggregate, out TState, TId>
public interface IDefineWriter<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event writer from the command.
/// If not defined, the writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveWriter">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
IDefineExecution<TCommand, TAggregate, TState> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
}

public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState, TId>
Expand All @@ -90,44 +89,42 @@ public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState,
/// </summary>
/// <param name="amendEvent">A function to amend the event</param>
/// <returns></returns>
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> AmendEvent(AmendEvent<TCommand> amendEvent);
IDefineStoreOrExecution<TCommand, TAggregate, TState> AmendEvent(AmendEvent<TCommand> amendEvent);
}

public interface IDefineExecution<out TCommand, out TAggregate, out TState, TId>
public interface IDefineExecution<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an operation on an aggregate</param>
/// <returns></returns>
void Act(Action<TAggregate, TCommand> action);
IDefineAppendAmendment<TCommand> Act(Action<TAggregate, TCommand> action);

/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an asynchronous operation on an aggregate</param>
/// <returns></returns>
void ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
IDefineAppendAmendment<TCommand> ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
}

public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>
public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState>
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class;

public interface ICommandHandlerBuilder<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>,
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>,
IDefineEventAmendment<TCommand, TAggregate, TState, TId>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
Expand All @@ -152,18 +149,20 @@ public class CommandHandlerBuilder<TCommand, TAggregate, TState, TId>(
)
: IDefineExpectedState<TCommand, TAggregate, TState, TId>,
IDefineIdentity<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState>,
IDefineAppendAmendment<TCommand>,
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId>
where TCommand : class
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id {
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
RegisteredHandler<TAggregate, TState, TId>? _handler;

IDefineIdentity<TCommand, TAggregate, TState, TId> IDefineExpectedState<TCommand, TAggregate, TState, TId>.InState(ExpectedState expectedState) {
_expectedState = expectedState;
Expand All @@ -183,50 +182,60 @@ ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> IDefineIdentity<TComma
return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.Act(Action<TAggregate, TCommand> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.Act(Action<TAggregate, TCommand> action) {
_action = (aggregate, cmd, _) => {
action(aggregate, (TCommand)cmd);

return ValueTask.FromResult(aggregate);
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
_action = async (aggregate, cmd, token) => {
await action(aggregate, (TCommand)cmd, token).NoContext();

return aggregate;
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineStore<TCommand, TAggregate, TState, TId>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
IDefineExecution<TCommand, TAggregate, TState> IDefineStore<TCommand, TAggregate, TState>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
Ensure.NotNull(resolveStore, nameof(resolveStore));
_reader = resolveStore;
_writer = resolveStore;

return this;
}

IDefineWriter<TCommand, TAggregate, TState, TId> IDefineReader<TCommand, TAggregate, TState, TId>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
IDefineWriter<TCommand, TAggregate, TState> IDefineReader<TCommand, TAggregate, TState>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
_reader = resolveReader;

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineWriter<TCommand, TAggregate, TState, TId>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
IDefineExecution<TCommand, TAggregate, TState> IDefineWriter<TCommand, TAggregate, TState>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
_writer = resolveWriter;

return this;
}

IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
IDefineStoreOrExecution<TCommand, TAggregate, TState> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
_amendEvent = amendEvent;

return this;
}

void IDefineAppendAmendment<TCommand>.AmendAppend(AmendAppend<TCommand> amendAppend) {
Ensure.NotNull(_handler, "Handler hasn't been built yet").AmendAppend = (append, cmd) => amendAppend(append, (TCommand)cmd);
}

RegisteredHandler<TAggregate, TState, TId> Build() {
return new(
_expectedState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0.

using System.Reflection;
using Eventuous.Persistence;
using static Eventuous.CommandServiceDelegates;
using static Eventuous.FuncServiceDelegates;

Expand All @@ -16,7 +17,9 @@ record RegisteredHandler<TAggregate, TState, TId>(
ResolveReaderFromCommand ResolveReader,
ResolveWriterFromCommand ResolveWriter,
AmendEventFromCommand? AmendEvent
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new();
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
public AmendAppend? AmendAppend { get; set; }
}

class HandlersMap<TAggregate, TState, TId> where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
readonly TypeMap<RegisteredHandler<TAggregate, TState, TId>> _typeMap = new();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;
Expand Down Expand Up @@ -87,8 +89,10 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
// Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length
if (result.Changes.Count == 0) return Result<TState>.FromSuccess(result.State, Array.Empty<Change>(), 0);

var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray());
var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed;
var writer = registeredHandler.ResolveWriter(command);
var storeResult = await writer.StoreAggregate<TAggregate, TState>(stream, result, Amend, cancellationToken).NoContext();
var storeResult = await writer.Store(final, Amend, cancellationToken).NoContext();
var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap));
Log.CommandHandled<TCommand>();

Expand Down
Loading
Loading