diff --git a/src/Cortex.Mediator/Abstractions/ICommand.cs b/src/Cortex.Mediator/Abstractions/ICommand.cs deleted file mode 100644 index 11621a9..0000000 --- a/src/Cortex.Mediator/Abstractions/ICommand.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace Cortex.Mediator -{ - public interface ICommand { } -} diff --git a/src/Cortex.Mediator/Abstractions/IHandler.cs b/src/Cortex.Mediator/Abstractions/IHandler.cs deleted file mode 100644 index da94e09..0000000 --- a/src/Cortex.Mediator/Abstractions/IHandler.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace Cortex.Mediator -{ - public interface IHandler where TCommand : ICommand - { - public Task Handle(TCommand command); - } -} diff --git a/src/Cortex.Mediator/Abstractions/IMediator.cs b/src/Cortex.Mediator/Abstractions/IMediator.cs deleted file mode 100644 index db52780..0000000 --- a/src/Cortex.Mediator/Abstractions/IMediator.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace Cortex.Mediator -{ - public interface IMediator - { - Task SendAsync(TCommand command) where TCommand : ICommand; - } -} diff --git a/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs new file mode 100644 index 0000000..99141a9 --- /dev/null +++ b/src/Cortex.Mediator/Behaviors/LoggingCommandBehavior.cs @@ -0,0 +1,42 @@ +using Cortex.Mediator.Commands; +using Microsoft.Extensions.Logging; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Behaviors +{ + /// + /// Pipeline behavior for logging command/query execution. + /// + public class LoggingCommandBehavior : ICommandPipelineBehavior + where TCommand : ICommand + { + private readonly ILogger> _logger; + + public LoggingCommandBehavior(ILogger> logger) + { + _logger = logger; + } + + public async Task Handle( + TCommand command, + CommandHandlerDelegate next, + CancellationToken cancellationToken) + { + var commandName = typeof(TCommand).Name; + _logger.LogInformation("Executing command {CommandName}", commandName); + + try + { + await next(); + _logger.LogInformation("Command {CommandName} executed successfully", commandName); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing command {CommandName}", commandName); + throw; + } + } + } +} diff --git a/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs new file mode 100644 index 0000000..d732b52 --- /dev/null +++ b/src/Cortex.Mediator/Behaviors/TransactionCommandBehavior.cs @@ -0,0 +1,39 @@ +using Cortex.Mediator.Commands; +using Cortex.Mediator.Infrastructure; +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Behaviors +{ + /// + /// Pipeline behavior for wrapping command execution in a transaction. + /// + public class TransactionCommandBehavior : ICommandPipelineBehavior + where TCommand : ICommand + { + private readonly IUnitOfWork _unitOfWork; + + public TransactionCommandBehavior(IUnitOfWork unitOfWork) + { + _unitOfWork = unitOfWork; + } + + public async Task Handle( + TCommand command, + CommandHandlerDelegate next, + CancellationToken cancellationToken) + { + await using var transaction = await _unitOfWork.BeginTransactionAsync(); + try + { + await next(); + await transaction.CommitAsync(); + } + catch + { + await transaction.RollbackAsync(); + throw; + } + } + } +} diff --git a/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs new file mode 100644 index 0000000..b301cc4 --- /dev/null +++ b/src/Cortex.Mediator/Behaviors/ValidationCommandBehavior.cs @@ -0,0 +1,50 @@ +using Cortex.Mediator.Commands; +using FluentValidation; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Behaviors +{ + /// + /// Pipeline behavior for validating commands and queries before execution. + /// + public class ValidationCommandBehavior : ICommandPipelineBehavior + where TCommand : ICommand + { + private readonly IEnumerable> _validators; + + public ValidationCommandBehavior(IEnumerable> validators) + { + _validators = validators; + } + + public async Task Handle( + TCommand command, + CommandHandlerDelegate next, + CancellationToken cancellationToken) + { + var context = new ValidationContext(command); + var failures = _validators + .Select(v => v.Validate(context)) + .SelectMany(r => r.Errors) + .Where(f => f != null) + .ToList(); + + if (failures.Count() > 0) + { + var errors = failures + .GroupBy(f => f.PropertyName) + .ToDictionary( + g => g.Key, + g => g.Select(f => f.ErrorMessage).ToArray()); + + throw new Exceptions.ValidationException(errors); + } + + await next(); + } + } +} diff --git a/src/Cortex.Mediator/Commands/ICommand.cs b/src/Cortex.Mediator/Commands/ICommand.cs new file mode 100644 index 0000000..643631c --- /dev/null +++ b/src/Cortex.Mediator/Commands/ICommand.cs @@ -0,0 +1,10 @@ +namespace Cortex.Mediator.Commands +{ + /// + /// Represents a command in the CQRS pattern. + /// Commands are used to change the system state and do not return a value. + /// + public interface ICommand + { + } +} diff --git a/src/Cortex.Mediator/Commands/ICommandHandler.cs b/src/Cortex.Mediator/Commands/ICommandHandler.cs new file mode 100644 index 0000000..8b908b5 --- /dev/null +++ b/src/Cortex.Mediator/Commands/ICommandHandler.cs @@ -0,0 +1,20 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Commands +{ + /// + /// Defines a handler for a command. + /// + /// The type of command being handled. + public interface ICommandHandler + where TCommand : ICommand + { + /// + /// Handles the specified command. + /// + /// The command to handle. + /// The cancellation token. + Task Handle(TCommand command, CancellationToken cancellationToken); + } +} diff --git a/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs new file mode 100644 index 0000000..3fd3d08 --- /dev/null +++ b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs @@ -0,0 +1,26 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Commands +{ + /// + /// Defines a pipeline behavior for wrapping command handlers. + /// + /// The type of command being handled. + public interface ICommandPipelineBehavior + where TCommand : ICommand + { + /// + /// Handles the command and invokes the next behavior in the pipeline. + /// + Task Handle( + TCommand command, + CommandHandlerDelegate next, + CancellationToken cancellationToken); + } + + /// + /// Represents a delegate that wraps the command handler execution. + /// + public delegate Task CommandHandlerDelegate(); +} diff --git a/src/Cortex.Mediator/Cortex.Mediator.csproj b/src/Cortex.Mediator/Cortex.Mediator.csproj index aacd7de..fdb6c14 100644 --- a/src/Cortex.Mediator/Cortex.Mediator.csproj +++ b/src/Cortex.Mediator/Cortex.Mediator.csproj @@ -7,6 +7,7 @@ 1.0.0 Buildersoft Cortex Framework Buildersoft + 12 Buildersoft,EnesHoxha Copyright © Buildersoft 2025 @@ -16,7 +17,7 @@ https://github.com/buildersoftio/cortex - cortex vortex mediator eda streaming distributed + cortex vortex mediator eda cqrs streaming 1.0.0 license.md @@ -26,7 +27,7 @@ True True - Added Mediator logic for Merlin.AI + Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management! https://buildersoft.io/ @@ -46,6 +47,11 @@ + + + + + diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs new file mode 100644 index 0000000..c724388 --- /dev/null +++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs @@ -0,0 +1,57 @@ +using Cortex.Mediator.Commands; +using Cortex.Mediator.Queries; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Cortex.Mediator.DependencyInjection +{ + public class MediatorOptions + { + internal List CommandBehaviors { get; } = new(); + internal List QueryBehaviors { get; } = new(); + + public MediatorOptions AddCommandPipelineBehavior() + where TBehavior : ICommandPipelineBehavior // Add constraint + { + var behaviorType = typeof(TBehavior); + if (behaviorType.IsGenericTypeDefinition) + { + throw new ArgumentException("Open generic types must be registered using AddOpenCommandPipelineBehavior"); + } + CommandBehaviors.Add(behaviorType); + return this; + } + + public MediatorOptions AddOpenCommandPipelineBehavior(Type openGenericBehaviorType) + { + if (!openGenericBehaviorType.IsGenericTypeDefinition) + { + throw new ArgumentException("Type must be an open generic type definition"); + } + + CommandBehaviors.Add(openGenericBehaviorType); + return this; + } + + public MediatorOptions AddOpenQueryPipelineBehavior(Type openGenericBehaviorType) + { + if (!openGenericBehaviorType.IsGenericTypeDefinition) + { + throw new ArgumentException("Type must be an open generic type definition"); + } + + var queryBehaviorInterface = openGenericBehaviorType.GetInterfaces() + .FirstOrDefault(i => i.IsGenericType && + i.GetGenericTypeDefinition() == typeof(IQueryPipelineBehavior<,>)); + + if (queryBehaviorInterface == null) + { + throw new ArgumentException("Type must implement IQueryPipelineBehavior<,>"); + } + + QueryBehaviors.Add(openGenericBehaviorType); + return this; + } + } +} diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs new file mode 100644 index 0000000..6a237ef --- /dev/null +++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs @@ -0,0 +1,16 @@ +using Cortex.Mediator.Behaviors; +using Cortex.Mediator.Commands; + +namespace Cortex.Mediator.DependencyInjection +{ + public static class MediatorOptionsExtensions + { + public static MediatorOptions AddDefaultBehaviors(this MediatorOptions options) + { + return options + .AddCommandPipelineBehavior>() + .AddCommandPipelineBehavior>() + .AddCommandPipelineBehavior>(); + } + } +} diff --git a/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..5acf911 --- /dev/null +++ b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs @@ -0,0 +1,86 @@ +using Cortex.Mediator.Commands; +using Cortex.Mediator.Infrastructure; +using Cortex.Mediator.Notifications; +using Cortex.Mediator.Queries; +using FluentValidation; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; + +namespace Cortex.Mediator.DependencyInjection +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddCortexMediator( + this IServiceCollection services, + IConfiguration configuration, + Type[] handlerAssemblyMarkerTypes, + Action? configure = null) + { + var options = new MediatorOptions(); + configure?.Invoke(options); + + services.AddScoped(); + services.AddValidatorsFromAssemblies(handlerAssemblyMarkerTypes.Select(t => t.Assembly)); + services.AddUnitOfWork(); + + RegisterHandlers(services, handlerAssemblyMarkerTypes); + RegisterPipelineBehaviors(services, options); + + return services; + } + + private static void RegisterHandlers( + IServiceCollection services, + IEnumerable assemblyMarkerTypes) + { + var assemblies = assemblyMarkerTypes.Select(t => t.Assembly).ToArray(); + + services.Scan(scan => scan + .FromAssemblies(assemblies) + .AddClasses(classes => classes + .AssignableTo(typeof(ICommandHandler<>))) + .AsImplementedInterfaces() + .WithScopedLifetime()); + + services.Scan(scan => scan + .FromAssemblies(assemblies) + .AddClasses(classes => classes + .AssignableTo(typeof(IQueryHandler<,>))) + .AsImplementedInterfaces() + .WithScopedLifetime()); + + services.Scan(scan => scan + .FromAssemblies(assemblies) + .AddClasses(classes => classes + .AssignableTo(typeof(INotificationHandler<>))) + .AsImplementedInterfaces() + .WithScopedLifetime()); + } + + private static void RegisterPipelineBehaviors(IServiceCollection services, MediatorOptions options) + { + // Command behaviors + foreach (var behaviorType in options.CommandBehaviors) + { + services.AddTransient(typeof(ICommandPipelineBehavior<>), behaviorType); + } + + // Query behaviors (if needed) + foreach (var behaviorType in options.QueryBehaviors) + { + services.AddTransient(typeof(IQueryPipelineBehavior<,>), behaviorType); + } + } + + + private static void AddUnitOfWork(this IServiceCollection services) + { + services.AddScoped(provider => + new UnitOfWork(provider.GetRequiredService())); + } + } +} diff --git a/src/Cortex.Mediator/Exceptions/ValidationException.cs b/src/Cortex.Mediator/Exceptions/ValidationException.cs new file mode 100644 index 0000000..c53f88d --- /dev/null +++ b/src/Cortex.Mediator/Exceptions/ValidationException.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; + +namespace Cortex.Mediator.Exceptions +{ + /// + /// Represents errors that occur during command/query validation. + /// + public class ValidationException : Exception + { + public ValidationException(IReadOnlyDictionary errors) + : base("Validation errors occurred") + { + Errors = errors; + } + + public IReadOnlyDictionary Errors { get; } + } +} diff --git a/src/Cortex.Mediator/IMediator.cs b/src/Cortex.Mediator/IMediator.cs new file mode 100644 index 0000000..899679e --- /dev/null +++ b/src/Cortex.Mediator/IMediator.cs @@ -0,0 +1,29 @@ +using Cortex.Mediator.Commands; +using Cortex.Mediator.Notifications; +using Cortex.Mediator.Queries; +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator +{ + /// + /// Mediator interface for sending commands, queries, and notifications. + /// + public interface IMediator + { + Task SendAsync( + TCommand command, + CancellationToken cancellationToken = default) + where TCommand : ICommand; + + Task SendAsync( + TQuery query, + CancellationToken cancellationToken = default) + where TQuery : IQuery; + + Task PublishAsync( + TNotification notification, + CancellationToken cancellationToken = default) + where TNotification : INotification; + } +} diff --git a/src/Cortex.Mediator/Infrastructure/IUnitOfWork.cs b/src/Cortex.Mediator/Infrastructure/IUnitOfWork.cs new file mode 100644 index 0000000..60c6acd --- /dev/null +++ b/src/Cortex.Mediator/Infrastructure/IUnitOfWork.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Infrastructure +{ + /// + /// Represents a unit of work for transaction management. + /// + public interface IUnitOfWork + { + /// + /// Begins a new transaction. + /// + Task BeginTransactionAsync(); + } + + /// + /// Represents a transaction within a unit of work. + /// + public interface IUnitOfWorkTransaction : IAsyncDisposable + { + /// + /// Commits the transaction. + /// + Task CommitAsync(); + + /// + /// Rolls back the transaction. + /// + Task RollbackAsync(); + } +} diff --git a/src/Cortex.Mediator/Infrastructure/UnitOfWork.cs b/src/Cortex.Mediator/Infrastructure/UnitOfWork.cs new file mode 100644 index 0000000..301bf78 --- /dev/null +++ b/src/Cortex.Mediator/Infrastructure/UnitOfWork.cs @@ -0,0 +1,61 @@ +using System.Data; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Infrastructure +{ + /// + /// Default implementation of IUnitOfWork using System.Data. + /// + public class UnitOfWork : IUnitOfWork + { + private readonly IDbConnection _connection; + + public UnitOfWork(IDbConnection connection) + { + _connection = connection; + } + + public async Task BeginTransactionAsync() + { + if (_connection.State != ConnectionState.Open) + { + _connection.Open(); + } + + var transaction = _connection.BeginTransaction(); + return new UnitOfWorkTransaction(transaction); + } + + private class UnitOfWorkTransaction : IUnitOfWorkTransaction + { + private readonly IDbTransaction _transaction; + private bool _disposed; + + public UnitOfWorkTransaction(IDbTransaction transaction) + { + _transaction = transaction; + } + + public Task CommitAsync() + { + _transaction.Commit(); + return Task.CompletedTask; + } + + public Task RollbackAsync() + { + _transaction.Rollback(); + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + if (_disposed) return; + + _transaction.Dispose(); + _disposed = true; + await Task.CompletedTask; + } + } + } +} diff --git a/src/Cortex.Mediator/Mediator.cs b/src/Cortex.Mediator/Mediator.cs index dd7c232..63f070a 100644 --- a/src/Cortex.Mediator/Mediator.cs +++ b/src/Cortex.Mediator/Mediator.cs @@ -1,27 +1,107 @@ -using System; +using Cortex.Mediator.Commands; +using Cortex.Mediator.Notifications; +using Cortex.Mediator.Queries; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace Cortex.Mediator { + /// + /// Default implementation of the IMediator interface. + /// public class Mediator : IMediator { private readonly IServiceProvider _serviceProvider; + public Mediator(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } - public Task SendAsync(TCommand command) where TCommand : ICommand + public async Task SendAsync(TCommand command, CancellationToken cancellationToken = default) + where TCommand : ICommand + { + var handler = _serviceProvider.GetRequiredService>(); + + foreach (var behavior in _serviceProvider.GetServices>().Reverse()) + { + handler = new PipelineBehaviorNextDelegate(behavior, handler); + } + + await handler.Handle(command, cancellationToken); + } + + public async Task SendAsync(TQuery query, CancellationToken cancellationToken = default) + where TQuery : IQuery { - var handlerType = typeof(IHandler); + var handler = _serviceProvider.GetRequiredService>(); - var handler = _serviceProvider.GetService(handlerType) as IHandler; - if (handler == null) + foreach (var behavior in _serviceProvider.GetServices>().Reverse()) { - throw new InvalidOperationException($"No handler registered for {typeof(TCommand).Name}"); + handler = new QueryPipelineBehaviorNextDelegate(behavior, handler); } - return handler.Handle(command); + return await handler.Handle(query, cancellationToken); + } + + public async Task PublishAsync( + TNotification notification, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + var handlers = _serviceProvider.GetServices>(); + var tasks = handlers.Select(h => h.Handle(notification, cancellationToken)); + await Task.WhenAll(tasks); + } + + private class PipelineBehaviorNextDelegate : ICommandHandler + where TCommand : ICommand + { + private readonly ICommandPipelineBehavior _behavior; + private readonly ICommandHandler _next; + + public PipelineBehaviorNextDelegate( + ICommandPipelineBehavior behavior, + ICommandHandler next) + { + _behavior = behavior; + _next = next; + } + + public Task Handle(TCommand command, CancellationToken cancellationToken) + { + return _behavior.Handle( + command, + () => _next.Handle(command, cancellationToken), + cancellationToken); + } + } + + private class QueryPipelineBehaviorNextDelegate + : IQueryHandler + where TQuery : IQuery + { + private readonly IQueryPipelineBehavior _behavior; + private readonly IQueryHandler _next; + + public QueryPipelineBehaviorNextDelegate( + IQueryPipelineBehavior behavior, + IQueryHandler next) + { + _behavior = behavior; + _next = next; + } + + public Task Handle(TQuery query, CancellationToken cancellationToken) + { + return _behavior.Handle( + query, + () => _next.Handle(query, cancellationToken), + cancellationToken); + } } } } diff --git a/src/Cortex.Mediator/Notifications/INotification.cs b/src/Cortex.Mediator/Notifications/INotification.cs new file mode 100644 index 0000000..fb394b3 --- /dev/null +++ b/src/Cortex.Mediator/Notifications/INotification.cs @@ -0,0 +1,10 @@ +namespace Cortex.Mediator.Notifications +{ + /// + /// Represents a notification in the CQRS pattern. + /// Notifications are used to broadcast events to multiple handlers. + /// + public interface INotification + { + } +} diff --git a/src/Cortex.Mediator/Notifications/INotificationHandler.cs b/src/Cortex.Mediator/Notifications/INotificationHandler.cs new file mode 100644 index 0000000..418d9c6 --- /dev/null +++ b/src/Cortex.Mediator/Notifications/INotificationHandler.cs @@ -0,0 +1,19 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Notifications +{ + /// + /// Defines a handler for a notification. + /// + /// The type of notification being handled. + public interface INotificationHandler + where TNotification : INotification + { + /// + /// Handles the specified notification. + /// + Task Handle(TNotification notification, CancellationToken cancellationToken); + } + +} diff --git a/src/Cortex.Mediator/Queries/IQuery.cs b/src/Cortex.Mediator/Queries/IQuery.cs new file mode 100644 index 0000000..a444829 --- /dev/null +++ b/src/Cortex.Mediator/Queries/IQuery.cs @@ -0,0 +1,11 @@ +namespace Cortex.Mediator.Queries +{ + /// + /// Represents a query in the CQRS pattern. + /// Queries are used to read data and return a result. + /// + /// The type of result returned by the query. + public interface IQuery + { + } +} diff --git a/src/Cortex.Mediator/Queries/IQueryHandler.cs b/src/Cortex.Mediator/Queries/IQueryHandler.cs new file mode 100644 index 0000000..c252edc --- /dev/null +++ b/src/Cortex.Mediator/Queries/IQueryHandler.cs @@ -0,0 +1,20 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Queries +{ + /// + /// Defines a handler for a query. + /// + /// The type of query being handled. + /// The type of result returned. + public interface IQueryHandler + where TQuery : IQuery + { + /// + /// Handles the specified query. + /// + Task Handle(TQuery query, CancellationToken cancellationToken); + } + +} diff --git a/src/Cortex.Mediator/Queries/IQueryPipelineBehavior.cs b/src/Cortex.Mediator/Queries/IQueryPipelineBehavior.cs new file mode 100644 index 0000000..c105a84 --- /dev/null +++ b/src/Cortex.Mediator/Queries/IQueryPipelineBehavior.cs @@ -0,0 +1,28 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Cortex.Mediator.Queries +{ + /// + /// Defines a pipeline behavior for wrapping query handlers. + /// + /// The type of query being handled. + /// The type of result returned. + public interface IQueryPipelineBehavior + where TQuery : IQuery + { + /// + /// Handles the query and invokes the next behavior in the pipeline. + /// + Task Handle( + TQuery query, + QueryHandlerDelegate next, + CancellationToken cancellationToken); + } + + /// + /// Represents a delegate that wraps the query handler execution. + /// + public delegate Task QueryHandlerDelegate(); + +} diff --git a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs index 3e2008e..b527465 100644 --- a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs @@ -87,6 +87,33 @@ IBranchStreamBuilder> Aggregate stateStore = null); + /// + /// Joins the current stream with a state-backed table (right side) based on a shared key. + /// For each item in the left (current) stream, a key is extracted and matched against entries + /// in . If a match is found, the two items are combined + /// by to form a result of type . + /// + /// The type of the elements stored in the right state store. + /// The type of the key used for matching left stream elements to right elements. + /// The type of the result produced by joining a left element with a right element. + /// + /// The state store mapping keys of type to values of type . + /// + /// + /// A function that extracts the key from the left (current) stream element of type TCurrent. + /// + /// + /// A function that combines the left element (of type TCurrent) and the matching right element + /// (of type ) to produce a result of type . + /// + /// + /// An representing the pipeline after the join operation. + /// + IBranchStreamBuilder Join( + IDataStore rightStateStore, + Func keySelector, + Func joinFunction); + /// @@ -100,9 +127,5 @@ IBranchStreamBuilder> Aggregate /// A sink operator to consume data. void Sink(ISinkOperator sinkOperator); - - - - } } diff --git a/src/Cortex.Streams/Abstractions/IInitialStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IInitialStreamBuilder.cs index 620ba07..c4ad4dd 100644 --- a/src/Cortex.Streams/Abstractions/IInitialStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IInitialStreamBuilder.cs @@ -28,20 +28,5 @@ public interface IInitialStreamBuilder /// /// IInitialStreamBuilder WithTelemetry(ITelemetryProvider telemetryProvider); - - /// - /// Adds a map operator to the branch to transform data. - /// - /// The type of data after the transformation. - /// A function to transform data. - /// The branch stream builder with the new data type. - //IStreamBuilder Map(Func mapFunction); - - /// - /// Adds a filter operator to the branch. - /// - /// A function to filter data. - /// The branch stream builder for method chaining. - //IStreamBuilder Filter(Func predicate); } } diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index e237326..5b7b364 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -198,6 +198,33 @@ IStreamBuilder SessionWindow( IDataStore> sessionStateStore = null, IDataStore, TSessionOutput> sessionResultsStateStore = null); + /// + /// Joins the current stream with a state-backed table (right side) based on a shared key. + /// For each item in the left (current) stream, a key is extracted and matched against entries + /// in . If a match is found, the two items are combined + /// by to form a result of type . + /// + /// The type of the elements stored in the right state store. + /// The type of the key used for matching left stream elements to right elements. + /// The type of the result produced by joining a left element with a right element. + /// + /// The state store mapping keys of type to values of type . + /// + /// + /// A function that extracts the key from the left (current) stream element of type TCurrent. + /// + /// + /// A function that combines the left element (of type TCurrent) and the matching right element + /// (of type ) to produce a result of type . + /// + /// + /// An representing the pipeline after the join operation. + /// + IStreamBuilder Join( + IDataStore rightStateStore, + Func keySelector, + Func joinFunction); + IStreamBuilder SetNext(IOperator customOperator); } diff --git a/src/Cortex.Streams/BranchStreamBuilder.cs b/src/Cortex.Streams/BranchStreamBuilder.cs index 5780214..8993f88 100644 --- a/src/Cortex.Streams/BranchStreamBuilder.cs +++ b/src/Cortex.Streams/BranchStreamBuilder.cs @@ -268,5 +268,56 @@ public IBranchStreamBuilder FlatMap(Func + /// Joins the current stream with a state-backed table (right side) based on a shared key. + /// For each item in the left (current) stream, a key is extracted and matched against entries + /// in . If a match is found, the two items are combined + /// by to form a result of type . + /// + /// The type of the elements stored in the right state store. + /// The type of the key used for matching left stream elements to right elements. + /// The type of the result produced by joining a left element with a right element. + /// + /// The state store mapping keys of type to values of type . + /// + /// + /// A function that extracts the key from the left (current) stream element of type TCurrent. + /// + /// + /// A function that combines the left element (of type TCurrent) and the matching right element + /// (of type ) to produce a result of type . + /// + /// + /// An representing the pipeline after the join operation. + /// + public IBranchStreamBuilder Join( + IDataStore rightStateStore, + Func keySelector, + Func joinFunction) + { + var joinOperator = new StreamTableJoinOperator( + keySelector, + joinFunction, + rightStateStore); + + if (_firstOperator == null) + { + _firstOperator = joinOperator; + _lastOperator = joinOperator; + } + else + { + _lastOperator.SetNext(joinOperator); + _lastOperator = joinOperator; + } + + return new BranchStreamBuilder(_name) + { + _firstOperator = _firstOperator, + _lastOperator = _lastOperator, + _sourceAdded = _sourceAdded, + }; + } } } diff --git a/src/Cortex.Streams/Operators/Joins/StreamTableJoinOperator.cs b/src/Cortex.Streams/Operators/Joins/StreamTableJoinOperator.cs new file mode 100644 index 0000000..4cbc496 --- /dev/null +++ b/src/Cortex.Streams/Operators/Joins/StreamTableJoinOperator.cs @@ -0,0 +1,175 @@ +using Cortex.States; +using Cortex.States.Operators; +using Cortex.Telemetry; +using System; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Cortex.Streams.Operators +{ + /// + /// Joins incoming stream elements (left side) with a state-backed table (right side) based on a shared key. + /// The join operation is performed on each incoming element using the provided . + /// + /// Type of the left stream elements. + /// Type of the right table elements stored in the . + /// Type of the key used for joining left elements with right elements. + /// Type of the result produced by the join operation. + public class StreamTableJoinOperator : IOperator, IStatefulOperator, ITelemetryEnabled + { + private readonly Func _keySelector; + private readonly Func _joinFunction; + private readonly IDataStore _rightStateStore; + private IOperator _nextOperator; + + // Telemetry fields + private ITelemetryProvider _telemetryProvider; + private ICounter _processedCounter; + private IHistogram _processingTimeHistogram; + private ITracer _tracer; + private Action _incrementProcessedCounter; + private Action _recordProcessingTime; + + + /// + /// Creates a new instance of . + /// + /// A function that extracts a join key from a left stream element. + /// A function that combines a left stream element with a right element to produce a . + /// The state store that maps to right elements of type . + /// Thrown if any of the arguments are null. + public StreamTableJoinOperator( + Func keySelector, + Func joinFunction, + IDataStore rightStateStore) + { + _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); + _joinFunction = joinFunction ?? throw new ArgumentNullException(nameof(joinFunction)); + _rightStateStore = rightStateStore ?? throw new ArgumentNullException(nameof(rightStateStore)); + } + + + /// + /// Sets the telemetry provider which collects and reports metrics and tracing information. + /// + /// An implementation of . + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) + { + _telemetryProvider = telemetryProvider; + + if (_telemetryProvider != null) + { + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"stream_table_join_processed_{typeof(TLeft).Name}", "Number of items processed by StreamTableJoinOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"stream_table_join_processing_time_{typeof(TLeft).Name}", "Processing time for StreamTableJoinOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"StreamTableJoinOperator_{typeof(TLeft).Name}"); + + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); + } + else + { + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } + + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(telemetryProvider); + } + } + + /// + /// Processes an incoming item from the left stream. + /// If the item key exists in the right-hand state store, the join function is invoked, + /// and the result is pushed to the next operator. + /// + /// An input item of type to be joined. + public void Process(object input) + { + if (input is TLeft left) + { + if (_telemetryProvider != null) + { + var stopwatch = Stopwatch.StartNew(); + using (var span = _tracer.StartSpan("StreamTableJoinOperator.Process")) + { + try + { + ProcessLeft(left); + span.SetAttribute("status", "success"); + } + catch (Exception ex) + { + span.SetAttribute("status", "error"); + span.SetAttribute("exception", ex.ToString()); + throw; + } + finally + { + stopwatch.Stop(); + _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); + _incrementProcessedCounter(); + } + } + } + else + { + ProcessLeft(left); + } + } + } + + /// + /// Performs the actual lookup on the right-side + /// and, if found, applies the join function to produce a result for the next operator. + /// + /// The left input element to be joined. + private void ProcessLeft(TLeft left) + { + var key = _keySelector(left); + TRight right = default; + bool hasValue = false; + + lock (_rightStateStore) + { + if (_rightStateStore.ContainsKey(key)) + { + right = _rightStateStore.Get(key); + hasValue = true; + } + } + + if (hasValue) + { + var result = _joinFunction(left, right); + _nextOperator?.Process(result); + } + } + + /// + /// Sets the next operator in the processing chain. + /// The result of this operator's join operation is passed on to the next operator via . + /// + /// The next operator to receive joined results. + public void SetNext(IOperator nextOperator) + { + _nextOperator = nextOperator; + + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } + } + + /// + /// Retrieves all state stores that this operator uses internally. + /// In this case, the operator only returns the right-side . + /// + /// An enumerable of the operator's state stores. + public IEnumerable GetStateStores() + { + yield return _rightStateStore; + } + } +} \ No newline at end of file diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index f257efe..55a4ded 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -579,5 +579,54 @@ public IStreamBuilder FlatMap(Func(_name, _firstOperator, _lastOperator, _sourceAdded); } + + /// + /// Joins the current stream with a state-backed table (right side) based on a shared key. + /// For each item in the left (current) stream, a key is extracted and matched against entries + /// in . If a match is found, the two items are combined + /// by to form a result of type . + /// + /// The type of the elements stored in the right state store. + /// The type of the key used for matching left stream elements to right elements. + /// The type of the result produced by joining a left element with a right element. + /// + /// The state store mapping keys of type to values of type . + /// + /// + /// A function that extracts the key from the left (current) stream element of type TCurrent. + /// + /// + /// A function that combines the left element (of type TCurrent) and the matching right element + /// (of type ) to produce a result of type . + /// + /// + /// An representing the pipeline after the join operation. + /// + public IStreamBuilder Join( + IDataStore rightStateStore, + Func keySelector, + Func joinFunction) + { + var joinOperator = new StreamTableJoinOperator( + keySelector, + joinFunction, + rightStateStore); + + if (_firstOperator == null) + { + _firstOperator = joinOperator; + _lastOperator = joinOperator; + } + else + { + _lastOperator.SetNext(joinOperator); + _lastOperator = joinOperator; + } + + return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded) + { + _telemetryProvider = this._telemetryProvider + }; + } } }