diff --git a/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs
index 99141a9..2948667 100644
--- a/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs
+++ b/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs
@@ -1,6 +1,7 @@
using Cortex.Mediator.Commands;
using Microsoft.Extensions.Logging;
using System;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -9,32 +10,45 @@ namespace Cortex.Mediator.Behaviors
///
/// Pipeline behavior for logging command/query execution.
///
- public class LoggingCommandBehavior : ICommandPipelineBehavior
- where TCommand : ICommand
+ public sealed class LoggingCommandBehavior : ICommandPipelineBehavior
+ where TCommand : ICommand
{
- private readonly ILogger> _logger;
+ private readonly ILogger> _logger;
- public LoggingCommandBehavior(ILogger> logger)
+ public LoggingCommandBehavior(ILogger> logger)
{
_logger = logger;
}
- public async Task Handle(
+ public async Task Handle(
TCommand command,
- CommandHandlerDelegate next,
+ CommandHandlerDelegate next,
CancellationToken cancellationToken)
{
var commandName = typeof(TCommand).Name;
_logger.LogInformation("Executing command {CommandName}", commandName);
+ var stopwatch = Stopwatch.StartNew(); // start timing
try
{
- await next();
- _logger.LogInformation("Command {CommandName} executed successfully", commandName);
+ var result = await next();
+
+ stopwatch.Stop();
+ _logger.LogInformation(
+ "Command {CommandName} executed successfully in {ElapsedMilliseconds} ms",
+ commandName,
+ stopwatch.ElapsedMilliseconds);
+
+ return result;
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error executing command {CommandName}", commandName);
+ stopwatch.Stop();
+ _logger.LogError(
+ ex,
+ "Error executing command {CommandName} after {ElapsedMilliseconds} ms",
+ commandName,
+ stopwatch.ElapsedMilliseconds);
throw;
}
}
diff --git a/src/Cortex.Mediator/Behaviors/LoggingQueryBehavior.cs b/src/Cortex.Mediator/Behaviors/LoggingQueryBehavior.cs
new file mode 100644
index 0000000..7c2b275
--- /dev/null
+++ b/src/Cortex.Mediator/Behaviors/LoggingQueryBehavior.cs
@@ -0,0 +1,56 @@
+using Cortex.Mediator.Queries;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.Mediator.Behaviors
+{
+ ///
+ /// Pipeline behavior for logging command/query execution.
+ ///
+ public sealed class LoggingQueryBehavior : IQueryPipelineBehavior
+ where TQuery : IQuery
+ {
+ private readonly ILogger> _logger;
+
+ public LoggingQueryBehavior(ILogger> logger)
+ {
+ _logger = logger;
+ }
+
+ public async Task Handle(
+ TQuery command,
+ QueryHandlerDelegate next,
+ CancellationToken cancellationToken)
+ {
+ var queryName = typeof(TQuery).Name;
+ _logger.LogInformation("Executing query {QueryName}", queryName);
+
+ var stopwatch = Stopwatch.StartNew(); // start timing
+ try
+ {
+ var result = await next();
+
+ stopwatch.Stop();
+ _logger.LogInformation(
+ "Query {QueryName} executed successfully in {ElapsedMilliseconds} ms",
+ queryName,
+ stopwatch.ElapsedMilliseconds);
+
+ return result;
+ }
+ catch (Exception ex)
+ {
+ stopwatch.Stop();
+ _logger.LogError(
+ ex,
+ "Error executing query {QueryName} after {ElapsedMilliseconds} ms",
+ queryName,
+ stopwatch.ElapsedMilliseconds);
+ throw;
+ }
+ }
+ }
+}
diff --git a/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs
deleted file mode 100644
index d732b52..0000000
--- a/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-using Cortex.Mediator.Commands;
-using Cortex.Mediator.Infrastructure;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Cortex.Mediator.Behaviors
-{
- ///
- /// Pipeline behavior for wrapping command execution in a transaction.
- ///
- public class TransactionCommandBehavior : ICommandPipelineBehavior
- where TCommand : ICommand
- {
- private readonly IUnitOfWork _unitOfWork;
-
- public TransactionCommandBehavior(IUnitOfWork unitOfWork)
- {
- _unitOfWork = unitOfWork;
- }
-
- public async Task Handle(
- TCommand command,
- CommandHandlerDelegate next,
- CancellationToken cancellationToken)
- {
- await using var transaction = await _unitOfWork.BeginTransactionAsync();
- try
- {
- await next();
- await transaction.CommitAsync();
- }
- catch
- {
- await transaction.RollbackAsync();
- throw;
- }
- }
- }
-}
diff --git a/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs
deleted file mode 100644
index b301cc4..0000000
--- a/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs
+++ /dev/null
@@ -1,50 +0,0 @@
-using Cortex.Mediator.Commands;
-using FluentValidation;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Cortex.Mediator.Behaviors
-{
- ///
- /// Pipeline behavior for validating commands and queries before execution.
- ///
- public class ValidationCommandBehavior : ICommandPipelineBehavior
- where TCommand : ICommand
- {
- private readonly IEnumerable> _validators;
-
- public ValidationCommandBehavior(IEnumerable> validators)
- {
- _validators = validators;
- }
-
- public async Task Handle(
- TCommand command,
- CommandHandlerDelegate next,
- CancellationToken cancellationToken)
- {
- var context = new ValidationContext(command);
- var failures = _validators
- .Select(v => v.Validate(context))
- .SelectMany(r => r.Errors)
- .Where(f => f != null)
- .ToList();
-
- if (failures.Count() > 0)
- {
- var errors = failures
- .GroupBy(f => f.PropertyName)
- .ToDictionary(
- g => g.Key,
- g => g.Select(f => f.ErrorMessage).ToArray());
-
- throw new Exceptions.ValidationException(errors);
- }
-
- await next();
- }
- }
-}
diff --git a/src/Cortex.Mediator/Commands/ICommand.cs b/src/Cortex.Mediator/Commands/ICommand.cs
index 643631c..fdc2226 100644
--- a/src/Cortex.Mediator/Commands/ICommand.cs
+++ b/src/Cortex.Mediator/Commands/ICommand.cs
@@ -2,9 +2,10 @@
{
///
/// Represents a command in the CQRS pattern.
- /// Commands are used to change the system state and do not return a value.
+ /// Commands are used to change the system state and do return a value.
+ /// Please note that this is not a common practice in CQRS, as commands typically do not return values.
///
- public interface ICommand
+ public interface ICommand
{
}
}
diff --git a/src/Cortex.Mediator/Commands/ICommandHandler.cs b/src/Cortex.Mediator/Commands/ICommandHandler.cs
index 8b908b5..574386c 100644
--- a/src/Cortex.Mediator/Commands/ICommandHandler.cs
+++ b/src/Cortex.Mediator/Commands/ICommandHandler.cs
@@ -7,14 +7,15 @@ namespace Cortex.Mediator.Commands
/// Defines a handler for a command.
///
/// The type of command being handled.
- public interface ICommandHandler
- where TCommand : ICommand
+ /// The type of command that is being returned.
+ public interface ICommandHandler
+ where TCommand : ICommand
{
///
/// Handles the specified command.
///
/// The command to handle.
/// The cancellation token.
- Task Handle(TCommand command, CancellationToken cancellationToken);
+ Task Handle(TCommand command, CancellationToken cancellationToken);
}
}
diff --git a/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
index 3fd3d08..b1e4a9c 100644
--- a/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
+++ b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
@@ -7,20 +7,20 @@ namespace Cortex.Mediator.Commands
/// Defines a pipeline behavior for wrapping command handlers.
///
/// The type of command being handled.
- public interface ICommandPipelineBehavior
- where TCommand : ICommand
+ public interface ICommandPipelineBehavior
+ where TCommand : ICommand
{
///
/// Handles the command and invokes the next behavior in the pipeline.
///
- Task Handle(
+ Task Handle(
TCommand command,
- CommandHandlerDelegate next,
+ CommandHandlerDelegate next,
CancellationToken cancellationToken);
}
///
/// Represents a delegate that wraps the command handler execution.
///
- public delegate Task CommandHandlerDelegate();
+ public delegate Task CommandHandlerDelegate();
}
diff --git a/src/Cortex.Mediator/Common/Unit.cs b/src/Cortex.Mediator/Common/Unit.cs
new file mode 100644
index 0000000..3829888
--- /dev/null
+++ b/src/Cortex.Mediator/Common/Unit.cs
@@ -0,0 +1,8 @@
+namespace Cortex.Mediator
+{
+ public readonly struct Unit
+ {
+ public static readonly Unit Value = new();
+ public override string ToString() => "()";
+ }
+}
diff --git a/src/Cortex.Mediator/Cortex.Mediator.csproj b/src/Cortex.Mediator/Cortex.Mediator.csproj
index fd02b3e..a3fae15 100644
--- a/src/Cortex.Mediator/Cortex.Mediator.csproj
+++ b/src/Cortex.Mediator/Cortex.Mediator.csproj
@@ -61,8 +61,6 @@
-
-
diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
index c724388..415dfea 100644
--- a/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
@@ -1,4 +1,4 @@
-using Cortex.Mediator.Commands;
+using Cortex.Mediator.Commands;
using Cortex.Mediator.Queries;
using System;
using System.Collections.Generic;
@@ -11,18 +11,39 @@ public class MediatorOptions
internal List CommandBehaviors { get; } = new();
internal List QueryBehaviors { get; } = new();
+ public bool OnlyPublicClasses { get; set; } = true;
+
+
+ ///
+ /// Register a *closed* command pipeline behavior.
+ ///
public MediatorOptions AddCommandPipelineBehavior()
- where TBehavior : ICommandPipelineBehavior // Add constraint
+ where TBehavior : class // Add constraint
{
var behaviorType = typeof(TBehavior);
+
if (behaviorType.IsGenericTypeDefinition)
{
throw new ArgumentException("Open generic types must be registered using AddOpenCommandPipelineBehavior");
}
+
+ var implementsInterface = behaviorType
+ .GetInterfaces()
+ .Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
+
+ if (!implementsInterface)
+ {
+ throw new ArgumentException("Type must implement ICommandPipelineBehavior<,>");
+ }
+
CommandBehaviors.Add(behaviorType);
return this;
}
+ ///
+ /// Register an *open generic* command pipeline behavior, e.g. typeof(LoggingCommandBehavior<,>).
+ ///
public MediatorOptions AddOpenCommandPipelineBehavior(Type openGenericBehaviorType)
{
if (!openGenericBehaviorType.IsGenericTypeDefinition)
@@ -30,6 +51,24 @@ public MediatorOptions AddOpenCommandPipelineBehavior(Type openGenericBehaviorTy
throw new ArgumentException("Type must be an open generic type definition");
}
+ var implementsInterface = openGenericBehaviorType
+ .GetInterfaces()
+ .Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
+
+ // For open generics, interface might not appear in GetInterfaces() yet; check by definition instead.
+ if (!implementsInterface &&
+ !(openGenericBehaviorType.IsGenericTypeDefinition &&
+ openGenericBehaviorType.GetGenericTypeDefinition() == openGenericBehaviorType))
+ {
+ // Fall back to checking generic arguments count to give a clear error
+ var ok = openGenericBehaviorType.GetGenericArguments().Length == 2;
+ if (!ok)
+ {
+ throw new ArgumentException("Type must implement ICommandPipelineBehavior<,>");
+ }
+ }
+
CommandBehaviors.Add(openGenericBehaviorType);
return this;
}
diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
index 6a237ef..76f8d07 100644
--- a/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
@@ -1,5 +1,4 @@
using Cortex.Mediator.Behaviors;
-using Cortex.Mediator.Commands;
namespace Cortex.Mediator.DependencyInjection
{
@@ -8,9 +7,9 @@ public static class MediatorOptionsExtensions
public static MediatorOptions AddDefaultBehaviors(this MediatorOptions options)
{
return options
- .AddCommandPipelineBehavior>()
- .AddCommandPipelineBehavior>()
- .AddCommandPipelineBehavior>();
+ // Register the open generic logging behavior for commands that return TResult
+ .AddOpenCommandPipelineBehavior(typeof(LoggingCommandBehavior<,>))
+ .AddOpenQueryPipelineBehavior(typeof(LoggingQueryBehavior<,>));
}
}
}
diff --git a/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
index 5acf911..fc54ea6 100644
--- a/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
@@ -1,8 +1,7 @@
-using Cortex.Mediator.Commands;
+using Cortex.Mediator.Commands;
using Cortex.Mediator.Infrastructure;
using Cortex.Mediator.Notifications;
using Cortex.Mediator.Queries;
-using FluentValidation;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
@@ -24,10 +23,13 @@ public static IServiceCollection AddCortexMediator(
configure?.Invoke(options);
services.AddScoped();
- services.AddValidatorsFromAssemblies(handlerAssemblyMarkerTypes.Select(t => t.Assembly));
+
+ // Validation has been removed for issue #118
+ //services.AddValidatorsFromAssemblies(handlerAssemblyMarkerTypes.Select(t => t.Assembly));
+
services.AddUnitOfWork();
- RegisterHandlers(services, handlerAssemblyMarkerTypes);
+ RegisterHandlers(services, handlerAssemblyMarkerTypes, options);
RegisterPipelineBehaviors(services, options);
return services;
@@ -35,28 +37,29 @@ public static IServiceCollection AddCortexMediator(
private static void RegisterHandlers(
IServiceCollection services,
- IEnumerable assemblyMarkerTypes)
+ IEnumerable assemblyMarkerTypes,
+ MediatorOptions options)
{
var assemblies = assemblyMarkerTypes.Select(t => t.Assembly).ToArray();
services.Scan(scan => scan
.FromAssemblies(assemblies)
.AddClasses(classes => classes
- .AssignableTo(typeof(ICommandHandler<>)))
+ .AssignableTo(typeof(ICommandHandler<,>)), options.OnlyPublicClasses)
.AsImplementedInterfaces()
.WithScopedLifetime());
services.Scan(scan => scan
.FromAssemblies(assemblies)
.AddClasses(classes => classes
- .AssignableTo(typeof(IQueryHandler<,>)))
+ .AssignableTo(typeof(IQueryHandler<,>)), options.OnlyPublicClasses)
.AsImplementedInterfaces()
.WithScopedLifetime());
services.Scan(scan => scan
.FromAssemblies(assemblies)
.AddClasses(classes => classes
- .AssignableTo(typeof(INotificationHandler<>)))
+ .AssignableTo(typeof(INotificationHandler<>)), options.OnlyPublicClasses)
.AsImplementedInterfaces()
.WithScopedLifetime());
}
@@ -66,7 +69,7 @@ private static void RegisterPipelineBehaviors(IServiceCollection services, Media
// Command behaviors
foreach (var behaviorType in options.CommandBehaviors)
{
- services.AddTransient(typeof(ICommandPipelineBehavior<>), behaviorType);
+ services.AddTransient(typeof(ICommandPipelineBehavior<,>), behaviorType);
}
// Query behaviors (if needed)
@@ -76,7 +79,6 @@ private static void RegisterPipelineBehaviors(IServiceCollection services, Media
}
}
-
private static void AddUnitOfWork(this IServiceCollection services)
{
services.AddScoped(provider =>
diff --git a/src/Cortex.Mediator/IMediator.cs b/src/Cortex.Mediator/IMediator.cs
index 899679e..7eb104c 100644
--- a/src/Cortex.Mediator/IMediator.cs
+++ b/src/Cortex.Mediator/IMediator.cs
@@ -11,12 +11,12 @@ namespace Cortex.Mediator
///
public interface IMediator
{
- Task SendAsync(
+ Task SendCommandAsync(
TCommand command,
CancellationToken cancellationToken = default)
- where TCommand : ICommand;
+ where TCommand : ICommand;
- Task SendAsync(
+ Task SendQueryAsync(
TQuery query,
CancellationToken cancellationToken = default)
where TQuery : IQuery;
diff --git a/src/Cortex.Mediator/Mediator.cs b/src/Cortex.Mediator/Mediator.cs
index 63f070a..cd0c521 100644
--- a/src/Cortex.Mediator/Mediator.cs
+++ b/src/Cortex.Mediator/Mediator.cs
@@ -21,20 +21,20 @@ public Mediator(IServiceProvider serviceProvider)
_serviceProvider = serviceProvider;
}
- public async Task SendAsync(TCommand command, CancellationToken cancellationToken = default)
- where TCommand : ICommand
+ public async Task SendCommandAsync(TCommand command, CancellationToken cancellationToken = default)
+ where TCommand : ICommand
{
- var handler = _serviceProvider.GetRequiredService>();
+ var handler = _serviceProvider.GetRequiredService>();
- foreach (var behavior in _serviceProvider.GetServices>().Reverse())
+ foreach (var behavior in _serviceProvider.GetServices>().Reverse())
{
- handler = new PipelineBehaviorNextDelegate(behavior, handler);
+ handler = new PipelineBehaviorNextDelegate(behavior, handler);
}
- await handler.Handle(command, cancellationToken);
+ return await handler.Handle(command, cancellationToken);
}
- public async Task SendAsync(TQuery query, CancellationToken cancellationToken = default)
+ public async Task SendQueryAsync(TQuery query, CancellationToken cancellationToken = default)
where TQuery : IQuery
{
var handler = _serviceProvider.GetRequiredService>();
@@ -57,21 +57,21 @@ public async Task PublishAsync(
await Task.WhenAll(tasks);
}
- private class PipelineBehaviorNextDelegate : ICommandHandler
- where TCommand : ICommand
+ private class PipelineBehaviorNextDelegate : ICommandHandler
+ where TCommand : ICommand
{
- private readonly ICommandPipelineBehavior _behavior;
- private readonly ICommandHandler _next;
+ private readonly ICommandPipelineBehavior _behavior;
+ private readonly ICommandHandler _next;
public PipelineBehaviorNextDelegate(
- ICommandPipelineBehavior behavior,
- ICommandHandler next)
+ ICommandPipelineBehavior behavior,
+ ICommandHandler next)
{
_behavior = behavior;
_next = next;
}
- public Task Handle(TCommand command, CancellationToken cancellationToken)
+ public Task Handle(TCommand command, CancellationToken cancellationToken)
{
return _behavior.Handle(
command,
diff --git a/src/Cortex.Mediator/README.md b/src/Cortex.Mediator/README.md
index d6ddb74..ede9704 100644
--- a/src/Cortex.Mediator/README.md
+++ b/src/Cortex.Mediator/README.md
@@ -9,9 +9,8 @@ Built as part of the [Cortex Data Framework](https://github.com/buildersoftio/co
- ✅ Commands & Queries
- ✅ Notifications (Events)
- ✅ Pipeline Behaviors
-- ✅ FluentValidation
+- ✅ FluentValidation - Coming in the next release v1.8
- ✅ Logging
-- ✅ Transaction Handling
---
@@ -35,7 +34,7 @@ In `Program.cs` or `Startup.cs`:
builder.Services.AddCortexMediator(
builder.Configuration,
new[] { typeof(Program) }, // Assemblies to scan for handlers
- options => options.AddDefaultBehaviors() // Logging, Validation, Transaction
+ options => options.AddDefaultBehaviors() // Logging
);
```
@@ -52,7 +51,7 @@ Features/
## ✏️ Defining a Command
```csharp
-public class CreateUserCommand : ICommand
+public class CreateUserCommand : ICommand
{
public string UserName { get; set; }
public string Email { get; set; }
@@ -61,16 +60,16 @@ public class CreateUserCommand : ICommand
### Handler
```csharp
-public class CreateUserCommandHandler : ICommandHandler
+public class CreateUserCommandHandler : ICommandHandler
{
- public async Task Handle(CreateUserCommand command, CancellationToken cancellationToken)
+ public async Task Handle(CreateUserCommand command, CancellationToken cancellationToken)
{
// Logic here
}
}
```
-### Validator (Optional, via FluentValidation)
+### Validator (Optional, via FluentValidation) - Coming in the next release v1.8
```csharp
public class CreateUserValidator : AbstractValidator
{
@@ -126,9 +125,8 @@ await mediator.PublishAsync(new UserCreatedNotification { UserName = "Andy" });
## 🔧 Pipeline Behaviors (Built-in)
Out of the box, Cortex.Mediator supports:
-- `ValidationCommandBehavior`
+- `ValidationCommandBehavior` - Coming in the next release v1.8
- `LoggingCommandBehavior`
-- `TransactionCommandBehavior`
You can also register custom behaviors:
```csharp
diff --git a/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs b/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
index f3fa43e..a33bf9e 100644
--- a/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
+++ b/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
@@ -28,6 +28,10 @@ public PulsarSinkOperator(string serviceUrl, string topic, ISerializer s
_client = PulsarClient.Builder()
.ServiceUrl(new Uri(_serviceUrl))
.Build();
+
+ // BUG #103 Start PulsarSink Operator when Sink is initialized
+ // Pulsar Producer doesnot start when the production happens, we have to start the Producer when it is initialized.
+ Start();
}
public void Start()
diff --git a/src/Cortex.Tests/Cortex.Tests.csproj b/src/Cortex.Tests/Cortex.Tests.csproj
index 406cf1b..6208286 100644
--- a/src/Cortex.Tests/Cortex.Tests.csproj
+++ b/src/Cortex.Tests/Cortex.Tests.csproj
@@ -9,6 +9,12 @@
true
+
+
+
+
+
+
all
@@ -32,8 +38,4 @@
-
-
-
-