diff --git a/Cortex.sln b/Cortex.sln
index 025b0b9..de5bc21 100644
--- a/Cortex.sln
+++ b/Cortex.sln
@@ -1,4 +1,3 @@
-
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.10.34607.79
@@ -57,6 +56,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.Elasticsearc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Types", "src\Cortex.Types\Cortex.Types.csproj", "{64E12D4C-FBB2-4004-8316-C886CBFC614B}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Vectors", "src\Cortex.Vectors\Cortex.Vectors.csproj", "{268BA5C7-C6FB-4A6B-875A-492659ED4573}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Mediator.Behaviors.FluentValidation", "src\Cortex.Mediator.Behaviors.FluentValidation\Cortex.Mediator.Behaviors.FluentValidation.csproj", "{44A166BD-01E9-4A4B-9BC5-7DE01B472E73}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -170,6 +173,14 @@ Global
{64E12D4C-FBB2-4004-8316-C886CBFC614B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64E12D4C-FBB2-4004-8316-C886CBFC614B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64E12D4C-FBB2-4004-8316-C886CBFC614B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {268BA5C7-C6FB-4A6B-875A-492659ED4573}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {268BA5C7-C6FB-4A6B-875A-492659ED4573}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {268BA5C7-C6FB-4A6B-875A-492659ED4573}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {268BA5C7-C6FB-4A6B-875A-492659ED4573}.Release|Any CPU.Build.0 = Release|Any CPU
+ {44A166BD-01E9-4A4B-9BC5-7DE01B472E73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {44A166BD-01E9-4A4B-9BC5-7DE01B472E73}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {44A166BD-01E9-4A4B-9BC5-7DE01B472E73}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {44A166BD-01E9-4A4B-9BC5-7DE01B472E73}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/README.md b/README.md
index 76df3ba..b730ba5 100644
--- a/README.md
+++ b/README.md
@@ -26,6 +26,7 @@
## Use Cases
+
- Real-time analytics and monitoring
- Event-driven architectures
- Stateful stream processing (e.g., aggregations, joins)
@@ -122,6 +123,12 @@
- **Cortex.Mediator:** implementation of the Mediator pattern for .NET applications, designed to power clean, modular architectures like **Vertical Slice Architecture** and **CQRS**.
[](https://www.nuget.org/packages/Cortex.Mediator)
+- **Cortex.Mediator.Behaviors.FluentValidation:** implementation of the FluentValidation validation for Commands and Queries
+[](https://www.nuget.org/packages/Cortex.Mediator.Behaviors.FluentValidation)
+
+- **Cortex.Vectors:** is a High‑performance vector types—Dense, Sparse, and Bit—for AI.
+[](https://www.nuget.org/packages/Cortex.Vectors)
+
## Getting Started
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/andyX.png b/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/andyX.png
new file mode 100644
index 0000000..101a1fb
Binary files /dev/null and b/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/andyX.png differ
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/license.md b/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/license.md
new file mode 100644
index 0000000..3c845d4
--- /dev/null
+++ b/src/Cortex.Mediator.Behaviors.FluentValidation/Assets/license.md
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2025 Buildersoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/Cortex.Mediator.Behaviors.FluentValidation.csproj b/src/Cortex.Mediator.Behaviors.FluentValidation/Cortex.Mediator.Behaviors.FluentValidation.csproj
new file mode 100644
index 0000000..f367543
--- /dev/null
+++ b/src/Cortex.Mediator.Behaviors.FluentValidation/Cortex.Mediator.Behaviors.FluentValidation.csproj
@@ -0,0 +1,72 @@
+
+
+
+ net9;net8;net7;netstandard2.1
+
+ 1.0.0
+ 1.0.0
+ Buildersoft Cortex Framework
+ Buildersoft
+ 12
+ Buildersoft,EnesHoxha
+ Copyright © Buildersoft 2025
+
+
+ Buildersoft Cortex Mediator is a library for .NET applications that implements the mediator pattern. It helps to reduce dependencies between objects by allowing in-process messaging without direct communication. Instead, objects communicate through Cortex Mediator, making them less coupled and more maintainable..
+
+
+
+ https://github.com/buildersoftio/cortex
+ cortex vortex mediator eda cqrs streaming
+
+ 1.0.0
+ license.md
+ andyX.png
+ Cortex.Mediator.Behaviors.FluentValidation
+ True
+ True
+ True
+
+ Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!
+ https://buildersoft.io/
+ Cortex Mediator Behaviors FluentValidation
+ README.md
+
+
+
+
+
+
+
+
+
+ True
+ \
+ Always
+
+
+
+
+ True
+
+
+
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/README.md b/src/Cortex.Mediator.Behaviors.FluentValidation/README.md
new file mode 100644
index 0000000..eeb265f
--- /dev/null
+++ b/src/Cortex.Mediator.Behaviors.FluentValidation/README.md
@@ -0,0 +1,181 @@
+# Cortex.Mediator 🧠
+
+**Cortex.Mediator** is a lightweight and extensible implementation of the Mediator pattern for .NET applications, designed to power clean, modular architectures like **Vertical Slice Architecture** and **CQRS**.
+
+
+Built as part of the [Cortex Data Framework](https://github.com/buildersoftio/cortex), this library simplifies command and query handling with built-in support for:
+
+
+- ✅ Commands & Queries
+- ✅ Notifications (Events)
+- ✅ Pipeline Behaviors
+- ✅ FluentValidation
+- ✅ Logging
+
+---
+
+[](https://github.com/buildersoftio/cortex/blob/master/LICENSE)
+[](https://www.nuget.org/packages/Cortex.Mediator)
+[](https://github.com/buildersoftio/cortex)
+[](https://discord.gg/JnMJV33QHu)
+
+
+## 🚀 Getting Started
+
+### Install via NuGet
+
+```bash
+dotnet add package Cortex.Mediator
+```
+
+## 🛠️ Setup
+In `Program.cs` or `Startup.cs`:
+```csharp
+builder.Services.AddCortexMediator(
+ builder.Configuration,
+ new[] { typeof(Program) }, // Assemblies to scan for handlers
+ options => options.AddDefaultBehaviors() // Logging
+);
+```
+
+## 📦 Folder Structure Example (Vertical Slice)
+```bash
+Features/
+ CreateUser/
+ CreateUserCommand.cs
+ CreateUserCommandHandler.cs
+ CreateUserValidator.cs
+ CreateUserEndpoint.cs
+```
+
+## ✏️ Defining a Command
+
+```csharp
+public class CreateUserCommand : ICommand
+{
+ public string UserName { get; set; }
+ public string Email { get; set; }
+}
+```
+
+### Handler
+```csharp
+public class CreateUserCommandHandler : ICommandHandler
+{
+ public async Task Handle(CreateUserCommand command, CancellationToken cancellationToken)
+ {
+ // Logic here
+ }
+}
+```
+
+### Validator (Optional, via FluentValidation) - Coming in the next release v1.8
+```csharp
+public class CreateUserValidator : AbstractValidator
+{
+ public CreateUserValidator()
+ {
+ RuleFor(x => x.UserName).NotEmpty();
+ RuleFor(x => x.Email).NotEmpty().EmailAddress();
+ }
+}
+```
+
+---
+
+## 🔍 Defining a Query
+
+```csharp
+public class GetUserQuery : IQuery
+{
+ public int UserId { get; set; }
+}
+```
+```csharp
+public class GetUserQueryHandler : IQueryHandler
+{
+ public async Task Handle(GetUserQuery query, CancellationToken cancellationToken)
+ {
+ return new GetUserResponse { UserId = query.UserId, UserName = "Andy" };
+ }
+}
+
+```
+
+## 📢 Notifications (Events)
+
+```csharp
+public class UserCreatedNotification : INotification
+{
+ public string UserName { get; set; }
+}
+
+public class SendWelcomeEmailHandler : INotificationHandler
+{
+ public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
+ {
+ // Send email...
+ }
+}
+```
+```csharp
+await mediator.PublishAsync(new UserCreatedNotification { UserName = "Andy" });
+```
+
+## 🔧 Pipeline Behaviors (Built-in)
+Out of the box, Cortex.Mediator supports:
+
+- `ValidationCommandBehavior` - Coming in the next release v1.8
+- `LoggingCommandBehavior`
+
+You can also register custom behaviors:
+```csharp
+options.AddOpenCommandPipelineBehavior(typeof(MyCustomBehavior<>));
+```
+
+## 💬 Contributing
+We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
+
+### 💬 How to Contribute
+1. **Fork the Repository**
+2. **Create a Feature Branch**
+```bash
+git checkout -b feature/YourFeature
+```
+3. **Commit Your Changes**
+```bash
+git commit -m "Add your feature"
+```
+4. **Push to Your Fork**
+```bash
+git push origin feature/YourFeature
+```
+5. **Open a Pull Request**
+
+Describe your changes and submit the pull request for review.
+
+## 📄 License
+This project is licensed under the MIT License.
+
+## 📚 Sponsorship
+Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
+
+### How to Sponsor
+* **Financial Contributions**: Support us through [GitHub Sponsors](https://github.com/sponsors/buildersoftio) or other preferred platforms.
+* **Corporate Sponsorship**: If your organization is interested in sponsoring Cortex, please contact us directly.
+
+Contact Us: cortex@buildersoft.io
+
+
+## Contact
+We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
+
+- Email: support@buildersoft.io
+- Website: https://buildersoft.io
+- GitHub Issues: [Cortex Data Framework Issues](https://github.com/buildersoftio/cortex/issues)
+- Join our Discord Community: [](https://discord.gg/JnMJV33QHu)
+
+
+Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
+
+Built with ❤️ by the Buildersoft team.
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationCommandBehavior.cs b/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationCommandBehavior.cs
new file mode 100644
index 0000000..b9eb2e3
--- /dev/null
+++ b/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationCommandBehavior.cs
@@ -0,0 +1,43 @@
+using Cortex.Mediator.Commands;
+using FluentValidation;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.Mediator.Behaviors
+{
+ ///
+ /// Pipeline behavior for validation command execution.
+ ///
+ public sealed class ValidationCommandBehavior : ICommandPipelineBehavior
+ where TCommand : ICommand
+ {
+ private readonly IEnumerable> _validators;
+
+
+ public async Task Handle(TCommand command, CommandHandlerDelegate next, CancellationToken cancellationToken)
+ {
+ var context = new ValidationContext(command);
+ var failures = _validators
+ .Select(v => v.Validate(context))
+ .SelectMany(r => r.Errors)
+ .Where(f => f != null)
+ .ToList();
+
+ if (failures.Count() > 0)
+ {
+ var errors = failures
+ .GroupBy(f => f.PropertyName)
+ .ToDictionary(
+ g => g.Key,
+ g => g.Select(f => f.ErrorMessage).ToArray());
+
+ throw new Exceptions.ValidationException(errors);
+ }
+
+ return await next();
+ }
+ }
+}
diff --git a/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationQueryBehavior.cs b/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationQueryBehavior.cs
new file mode 100644
index 0000000..b48c633
--- /dev/null
+++ b/src/Cortex.Mediator.Behaviors.FluentValidation/ValidationQueryBehavior.cs
@@ -0,0 +1,40 @@
+using Cortex.Mediator.Queries;
+using FluentValidation;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.Mediator.Behaviors.FluentValidation
+{
+ public sealed class ValidationQueryBehavior : IQueryPipelineBehavior
+ where TQuery : IQuery
+ {
+
+ private readonly IEnumerable> _validators;
+
+
+ public async Task Handle(TQuery query, QueryHandlerDelegate next, CancellationToken cancellationToken)
+ {
+ var context = new ValidationContext(query);
+ var failures = _validators
+ .Select(v => v.Validate(context))
+ .SelectMany(r => r.Errors)
+ .Where(f => f != null)
+ .ToList();
+
+ if (failures.Count() > 0)
+ {
+ var errors = failures
+ .GroupBy(f => f.PropertyName)
+ .ToDictionary(
+ g => g.Key,
+ g => g.Select(f => f.ErrorMessage).ToArray());
+
+ throw new Exceptions.ValidationException(errors);
+ }
+
+ return await next();
+ }
+ }
+}
diff --git a/src/Cortex.Mediator/Behaviors/VoidLoggingCommandBehavior.cs b/src/Cortex.Mediator/Behaviors/VoidLoggingCommandBehavior.cs
new file mode 100644
index 0000000..b6134f3
--- /dev/null
+++ b/src/Cortex.Mediator/Behaviors/VoidLoggingCommandBehavior.cs
@@ -0,0 +1,51 @@
+using Cortex.Mediator.Commands;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.Mediator.Behaviors
+{
+
+ public sealed class LoggingCommandBehavior : ICommandPipelineBehavior where TCommand : ICommand
+ {
+ private readonly ILogger> _logger;
+
+ public LoggingCommandBehavior(ILogger> logger)
+ {
+ _logger = logger;
+ }
+
+ public async Task Handle(
+ TCommand command,
+ CommandHandlerDelegate next,
+ CancellationToken cancellationToken)
+ {
+ var commandName = typeof(TCommand).Name;
+ _logger.LogInformation("Executing command {CommandName}", commandName);
+
+ var stopwatch = Stopwatch.StartNew(); // start timing
+ try
+ {
+ await next();
+
+ stopwatch.Stop();
+ _logger.LogInformation(
+ "Command {CommandName} executed successfully in {ElapsedMilliseconds} ms",
+ commandName,
+ stopwatch.ElapsedMilliseconds);
+ }
+ catch (Exception ex)
+ {
+ stopwatch.Stop();
+ _logger.LogError(
+ ex,
+ "Error executing command {CommandName} after {ElapsedMilliseconds} ms",
+ commandName,
+ stopwatch.ElapsedMilliseconds);
+ throw;
+ }
+ }
+ }
+}
diff --git a/src/Cortex.Mediator/Commands/ICommand.cs b/src/Cortex.Mediator/Commands/ICommand.cs
index fdc2226..47b56c9 100644
--- a/src/Cortex.Mediator/Commands/ICommand.cs
+++ b/src/Cortex.Mediator/Commands/ICommand.cs
@@ -8,4 +8,14 @@
public interface ICommand
{
}
+
+ // feature #141
+
+ ///
+ /// Represents a command in the CQRS pattern.
+ /// Commands are used to change the system state and do not return a value.
+ ///
+ public interface ICommand
+ {
+ }
}
diff --git a/src/Cortex.Mediator/Commands/ICommandHandler.cs b/src/Cortex.Mediator/Commands/ICommandHandler.cs
index 574386c..c2cf6a7 100644
--- a/src/Cortex.Mediator/Commands/ICommandHandler.cs
+++ b/src/Cortex.Mediator/Commands/ICommandHandler.cs
@@ -18,4 +18,23 @@ public interface ICommandHandler
/// The cancellation token.
Task Handle(TCommand command, CancellationToken cancellationToken);
}
+
+
+
+ // feature #141
+
+ ///
+ /// Defines a handler for a command.
+ ///
+ /// The type of command being handled.
+ public interface ICommandHandler
+ where TCommand : ICommand
+ {
+ ///
+ /// Handles the specified command.
+ ///
+ /// The command to handle.
+ /// The cancellation token.
+ Task Handle(TCommand command, CancellationToken cancellationToken);
+ }
}
diff --git a/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
index b1e4a9c..306e22e 100644
--- a/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
+++ b/src/Cortex.Mediator/Commands/ICommandPipelineBehavior.cs
@@ -19,8 +19,33 @@ Task Handle(
CancellationToken cancellationToken);
}
+
+ // For non returning commands
+ // feature #141
+
+ ///
+ /// Defines a pipeline behavior for wrapping command handlers.
+ ///
+ /// The type of command being handled.
+ public interface ICommandPipelineBehavior
+ where TCommand : ICommand
+ {
+ ///
+ /// Handles the command and invokes the next behavior in the pipeline.
+ ///
+ Task Handle(
+ TCommand command,
+ CommandHandlerDelegate next,
+ CancellationToken cancellationToken);
+ }
+
///
/// Represents a delegate that wraps the command handler execution.
///
public delegate Task CommandHandlerDelegate();
+
+ ///
+ /// Represents a delegate that wraps the command handler execution.
+ ///
+ public delegate Task CommandHandlerDelegate();
}
diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
index 415dfea..15ee488 100644
--- a/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
@@ -9,6 +9,7 @@ namespace Cortex.Mediator.DependencyInjection
public class MediatorOptions
{
internal List CommandBehaviors { get; } = new();
+ internal List VoidCommandBehaviors { get; } = new();
internal List QueryBehaviors { get; } = new();
public bool OnlyPublicClasses { get; set; } = true;
@@ -23,21 +24,25 @@ public MediatorOptions AddCommandPipelineBehavior()
var behaviorType = typeof(TBehavior);
if (behaviorType.IsGenericTypeDefinition)
- {
throw new ArgumentException("Open generic types must be registered using AddOpenCommandPipelineBehavior");
- }
- var implementsInterface = behaviorType
- .GetInterfaces()
- .Any(i => i.IsGenericType &&
- i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
+ var implementsReturning =
+ behaviorType.GetInterfaces().Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
- if (!implementsInterface)
- {
- throw new ArgumentException("Type must implement ICommandPipelineBehavior<,>");
- }
+ var implementsNonReturning =
+ behaviorType.GetInterfaces().Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<>));
+
+ if (!implementsReturning && !implementsNonReturning)
+ throw new ArgumentException("Type must implement ICommandPipelineBehavior<,> or ICommandPipelineBehavior<>");
+
+ if (implementsReturning)
+ CommandBehaviors.Add(behaviorType);
+
+ if (implementsNonReturning)
+ VoidCommandBehaviors.Add(behaviorType);
- CommandBehaviors.Add(behaviorType);
return this;
}
@@ -47,29 +52,25 @@ public MediatorOptions AddCommandPipelineBehavior()
public MediatorOptions AddOpenCommandPipelineBehavior(Type openGenericBehaviorType)
{
if (!openGenericBehaviorType.IsGenericTypeDefinition)
- {
throw new ArgumentException("Type must be an open generic type definition");
- }
- var implementsInterface = openGenericBehaviorType
- .GetInterfaces()
- .Any(i => i.IsGenericType &&
- i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
+ var implementsReturning =
+ openGenericBehaviorType.GetInterfaces().Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<,>));
- // For open generics, interface might not appear in GetInterfaces() yet; check by definition instead.
- if (!implementsInterface &&
- !(openGenericBehaviorType.IsGenericTypeDefinition &&
- openGenericBehaviorType.GetGenericTypeDefinition() == openGenericBehaviorType))
- {
- // Fall back to checking generic arguments count to give a clear error
- var ok = openGenericBehaviorType.GetGenericArguments().Length == 2;
- if (!ok)
- {
- throw new ArgumentException("Type must implement ICommandPipelineBehavior<,>");
- }
- }
+ var implementsNonReturning =
+ openGenericBehaviorType.GetInterfaces().Any(i => i.IsGenericType &&
+ i.GetGenericTypeDefinition() == typeof(ICommandPipelineBehavior<>));
+
+ if (!implementsReturning && !implementsNonReturning)
+ throw new ArgumentException("Type must implement ICommandPipelineBehavior<,> or ICommandPipelineBehavior<>");
+
+ if (implementsReturning)
+ CommandBehaviors.Add(openGenericBehaviorType);
+
+ if (implementsNonReturning)
+ VoidCommandBehaviors.Add(openGenericBehaviorType);
- CommandBehaviors.Add(openGenericBehaviorType);
return this;
}
diff --git a/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
index 76f8d07..0fce80c 100644
--- a/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/MediatorOptionsExtensions.cs
@@ -9,7 +9,8 @@ public static MediatorOptions AddDefaultBehaviors(this MediatorOptions options)
return options
// Register the open generic logging behavior for commands that return TResult
.AddOpenCommandPipelineBehavior(typeof(LoggingCommandBehavior<,>))
- .AddOpenQueryPipelineBehavior(typeof(LoggingQueryBehavior<,>));
+ .AddOpenQueryPipelineBehavior(typeof(LoggingQueryBehavior<,>))
+ .AddOpenCommandPipelineBehavior(typeof(LoggingCommandBehavior<>)); // Add void command logging
}
}
}
diff --git a/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
index fc54ea6..837fe50 100644
--- a/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs
@@ -49,6 +49,14 @@ private static void RegisterHandlers(
.AsImplementedInterfaces()
.WithScopedLifetime());
+ // feature #141 - Register void command handlers
+ services.Scan(scan => scan
+ .FromAssemblies(assemblies)
+ .AddClasses(classes => classes
+ .AssignableTo(typeof(ICommandHandler<>)), options.OnlyPublicClasses)
+ .AsImplementedInterfaces()
+ .WithScopedLifetime());
+
services.Scan(scan => scan
.FromAssemblies(assemblies)
.AddClasses(classes => classes
@@ -72,6 +80,12 @@ private static void RegisterPipelineBehaviors(IServiceCollection services, Media
services.AddTransient(typeof(ICommandPipelineBehavior<,>), behaviorType);
}
+ // feature #141 - Register non-returning command pipeline behaviors
+ foreach (var behaviorType in options.VoidCommandBehaviors)
+ {
+ services.AddTransient(typeof(ICommandPipelineBehavior<>), behaviorType);
+ }
+
// Query behaviors (if needed)
foreach (var behaviorType in options.QueryBehaviors)
{
diff --git a/src/Cortex.Mediator/IMediator.cs b/src/Cortex.Mediator/IMediator.cs
index 7eb104c..eb46dd0 100644
--- a/src/Cortex.Mediator/IMediator.cs
+++ b/src/Cortex.Mediator/IMediator.cs
@@ -16,6 +16,11 @@ Task SendCommandAsync(
CancellationToken cancellationToken = default)
where TCommand : ICommand;
+ Task SendCommandAsync(
+ TCommand command,
+ CancellationToken cancellationToken = default)
+ where TCommand : ICommand;
+
Task SendQueryAsync(
TQuery query,
CancellationToken cancellationToken = default)
diff --git a/src/Cortex.Mediator/Mediator.cs b/src/Cortex.Mediator/Mediator.cs
index cd0c521..d5087f3 100644
--- a/src/Cortex.Mediator/Mediator.cs
+++ b/src/Cortex.Mediator/Mediator.cs
@@ -22,7 +22,7 @@ public Mediator(IServiceProvider serviceProvider)
}
public async Task SendCommandAsync(TCommand command, CancellationToken cancellationToken = default)
- where TCommand : ICommand
+ where TCommand : ICommand
{
var handler = _serviceProvider.GetRequiredService>();
@@ -31,7 +31,19 @@ public async Task SendCommandAsync(TCommand command,
handler = new PipelineBehaviorNextDelegate(behavior, handler);
}
- return await handler.Handle(command, cancellationToken);
+ return await handler.Handle(command, cancellationToken);
+ }
+
+ public async Task SendCommandAsync(TCommand command, CancellationToken cancellationToken = default) where TCommand : ICommand
+ {
+ var handler = _serviceProvider.GetRequiredService>();
+
+ foreach (var behavior in _serviceProvider.GetServices>().Reverse())
+ {
+ handler = new PipelineBehaviorNextDelegate(behavior, handler);
+ }
+
+ await handler.Handle(command, cancellationToken);
}
public async Task SendQueryAsync(TQuery query, CancellationToken cancellationToken = default)
@@ -57,6 +69,7 @@ public async Task PublishAsync(
await Task.WhenAll(tasks);
}
+
private class PipelineBehaviorNextDelegate : ICommandHandler
where TCommand : ICommand
{
@@ -80,6 +93,29 @@ public Task Handle(TCommand command, CancellationToken cancellationToke
}
}
+ private class PipelineBehaviorNextDelegate : ICommandHandler
+ where TCommand : ICommand
+ {
+ private readonly ICommandPipelineBehavior _behavior;
+ private readonly ICommandHandler _next;
+
+ public PipelineBehaviorNextDelegate(
+ ICommandPipelineBehavior behavior,
+ ICommandHandler next)
+ {
+ _behavior = behavior;
+ _next = next;
+ }
+
+ public Task Handle(TCommand command, CancellationToken cancellationToken)
+ {
+ return _behavior.Handle(
+ command,
+ () => _next.Handle(command, cancellationToken),
+ cancellationToken);
+ }
+ }
+
private class QueryPipelineBehaviorNextDelegate
: IQueryHandler
where TQuery : IQuery
diff --git a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
index bd11617..d3ddb2f 100644
--- a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
+++ b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
@@ -4,8 +4,8 @@
net8.0
enable
- 1.0.1
- 1.0.1
+ 2.0.0
+ 2.0.0
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
@@ -16,7 +16,7 @@
https://github.com/buildersoftio/cortex
cortex vortex eda streaming distributed streams states kafka pulsar rocksdb
- 1.0.1
+ 2.0.0
license.md
cortex.png
Cortex.Streams.Kafka
@@ -52,9 +52,9 @@
-
-
-
+
+
+
diff --git a/src/Cortex.Streams.Kafka/KafkaKeyValueSinkOperator.cs b/src/Cortex.Streams.Kafka/KafkaKeyValueSinkOperator.cs
new file mode 100644
index 0000000..be127d6
--- /dev/null
+++ b/src/Cortex.Streams.Kafka/KafkaKeyValueSinkOperator.cs
@@ -0,0 +1,65 @@
+using Confluent.Kafka;
+using Cortex.Streams.Kafka.Serializers;
+using Cortex.Streams.Operators;
+using System;
+using System.Collections.Generic;
+
+namespace Cortex.Streams.Kafka
+{
+ ///
+ /// Kafka sink that accepts KeyValuePair so message keys are produced.
+ ///
+ public sealed class KafkaSinkOperator : ISinkOperator>
+ {
+ private readonly string _bootstrapServers;
+ private readonly string _topic;
+ private readonly IProducer _producer;
+
+ public KafkaSinkOperator(
+ string bootstrapServers,
+ string topic,
+ ProducerConfig config = null,
+ ISerializer keySerializer = null,
+ ISerializer valueSerializer = null)
+ {
+ _bootstrapServers = bootstrapServers ?? throw new ArgumentNullException(nameof(bootstrapServers));
+ _topic = topic ?? throw new ArgumentNullException(nameof(topic));
+
+ var producerConfig = config ?? new ProducerConfig
+ {
+ BootstrapServers = _bootstrapServers
+ };
+
+ keySerializer ??= new DefaultJsonSerializer();
+ valueSerializer ??= new DefaultJsonSerializer();
+
+ _producer = new ProducerBuilder(producerConfig)
+ .SetKeySerializer(keySerializer)
+ .SetValueSerializer(valueSerializer)
+ .Build();
+ }
+
+ public void Process(KeyValuePair input)
+ {
+ var msg = new Message { Key = input.Key, Value = input.Value };
+ _producer.Produce(_topic, msg, deliveryReport =>
+ {
+ if (deliveryReport.Error.IsError)
+ {
+ Console.WriteLine($"Delivery Error: {deliveryReport.Error.Reason}");
+ }
+ });
+ }
+
+ public void Start()
+ {
+ // no-op
+ }
+
+ public void Stop()
+ {
+ _producer.Flush(TimeSpan.FromSeconds(10));
+ _producer.Dispose();
+ }
+ }
+}
diff --git a/src/Cortex.Streams.Kafka/KafkaKeyValueSourceOperator.cs b/src/Cortex.Streams.Kafka/KafkaKeyValueSourceOperator.cs
new file mode 100644
index 0000000..ee33f96
--- /dev/null
+++ b/src/Cortex.Streams.Kafka/KafkaKeyValueSourceOperator.cs
@@ -0,0 +1,97 @@
+using Confluent.Kafka;
+using Cortex.Streams.Kafka.Deserializers;
+using Cortex.Streams.Operators;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.Streams.Kafka
+{
+ ///
+ /// Kafka source that emits KeyValuePair so the pipeline can use message keys.
+ ///
+ public sealed class KafkaSourceOperator : ISourceOperator>
+ {
+ private readonly string _bootstrapServers;
+ private readonly string _topic;
+ private readonly IConsumer _consumer;
+ private CancellationTokenSource _cts;
+ private Task _consumeTask;
+
+
+ public KafkaSourceOperator(string bootstrapServers,
+ string topic,
+ ConsumerConfig config = null,
+ IDeserializer keyDeserializer = null,
+ IDeserializer valueDeserializer = null)
+ {
+ _bootstrapServers = bootstrapServers ?? throw new ArgumentNullException(nameof(bootstrapServers));
+ _topic = topic ?? throw new ArgumentNullException(nameof(topic));
+
+ var consumerConfig = config ?? new ConsumerConfig
+ {
+ BootstrapServers = _bootstrapServers,
+ GroupId = Guid.NewGuid().ToString(),
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = true,
+ };
+
+ keyDeserializer ??= new DefaultJsonDeserializer();
+ valueDeserializer ??= new DefaultJsonDeserializer();
+
+ _consumer = new ConsumerBuilder(consumerConfig)
+ .SetKeyDeserializer(keyDeserializer)
+ .SetValueDeserializer(valueDeserializer)
+ .Build();
+ }
+
+
+ public void Start(Action> emit)
+ {
+ if (emit == null) throw new ArgumentNullException(nameof(emit));
+
+ _cts = new CancellationTokenSource();
+ _consumer.Subscribe(_topic);
+
+ _consumeTask = Task.Run(() =>
+ {
+ try
+ {
+ while (!_cts.Token.IsCancellationRequested)
+ {
+ var result = _consumer.Consume(_cts.Token);
+ emit(new KeyValuePair(result.Message.Key, result.Message.Value));
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // shutting down - consume loop canceled
+ }
+ finally
+ {
+ _consumer.Close();
+ }
+ }, _cts.Token);
+ }
+
+ public void Stop()
+ {
+ if (_cts == null)
+ return;
+
+ _cts.Cancel();
+ try
+ {
+ _consumeTask?.Wait();
+ }
+ catch
+ {
+ /* swallow aggregate canceled */
+ }
+
+ _consumer.Dispose();
+ _cts.Dispose();
+ }
+ }
+}
diff --git a/src/Cortex.Streams.Kafka/KafkaSinkOperator.cs b/src/Cortex.Streams.Kafka/KafkaSinkOperator.cs
index abe93af..00608fc 100644
--- a/src/Cortex.Streams.Kafka/KafkaSinkOperator.cs
+++ b/src/Cortex.Streams.Kafka/KafkaSinkOperator.cs
@@ -5,7 +5,7 @@
namespace Cortex.Streams.Kafka
{
- public class KafkaSinkOperator : ISinkOperator
+ public sealed class KafkaSinkOperator : ISinkOperator
{
private readonly string _bootstrapServers;
private readonly string _topic;
diff --git a/src/Cortex.Streams.Kafka/KafkaSourceOperator.cs b/src/Cortex.Streams.Kafka/KafkaSourceOperator.cs
index 4553e4e..7e668f1 100644
--- a/src/Cortex.Streams.Kafka/KafkaSourceOperator.cs
+++ b/src/Cortex.Streams.Kafka/KafkaSourceOperator.cs
@@ -7,7 +7,7 @@
namespace Cortex.Streams.Kafka
{
- public class KafkaSourceOperator : ISourceOperator
+ public sealed class KafkaSourceOperator : ISourceOperator
{
private readonly string _bootstrapServers;
private readonly string _topic;
@@ -15,7 +15,10 @@ public class KafkaSourceOperator : ISourceOperator
private CancellationTokenSource _cts;
private Task _consumeTask;
- public KafkaSourceOperator(string bootstrapServers, string topic, ConsumerConfig config = null, IDeserializer deserializer = null)
+ public KafkaSourceOperator(string bootstrapServers,
+ string topic,
+ ConsumerConfig config = null,
+ IDeserializer deserializer = null)
{
_bootstrapServers = bootstrapServers;
_topic = topic;
@@ -53,7 +56,7 @@ public void Start(Action emit)
}
catch (OperationCanceledException)
{
- // Consume loop canceled
+ // shutting down - consume loop canceled
}
finally
{
@@ -64,8 +67,21 @@ public void Start(Action emit)
public void Stop()
{
+ if (_cts == null)
+ return;
+
_cts.Cancel();
- _consumeTask.Wait();
+ try
+ {
+ _consumeTask?.Wait();
+ }
+ catch
+ {
+ /* swallow aggregate canceled */
+ }
+
+ _consumer.Dispose();
+ _cts.Dispose();
}
}
}
diff --git a/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj b/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj
index 52ddf04..5228381 100644
--- a/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj
+++ b/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj
@@ -32,11 +32,20 @@
README.md
+
+
+
+
+
+
+
+ True
+ \
+ Always
+
+
+
-
- True
- \
-
True
@@ -53,10 +62,10 @@
-
+
-
-
+
+
diff --git a/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs b/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
index a33bf9e..c8edf5a 100644
--- a/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
+++ b/src/Cortex.Streams.Pulsar/PulsarSinkOperator.cs
@@ -16,7 +16,12 @@ public class PulsarSinkOperator : ISinkOperator
private readonly IPulsarClient _client;
private IProducer> _producer;
- public PulsarSinkOperator(string serviceUrl, string topic, ISerializer serializer = null)
+ private readonly Func _keySelector; // optional
+
+ public PulsarSinkOperator(string serviceUrl,
+ string topic,
+ Func keySelector = null,
+ ISerializer serializer = null)
{
_serviceUrl = serviceUrl;
_topic = topic;
@@ -24,6 +29,7 @@ public PulsarSinkOperator(string serviceUrl, string topic, ISerializer s
_serializer ??= new DefaultJsonSerializer();
+ _keySelector = keySelector;
_client = PulsarClient.Builder()
.ServiceUrl(new Uri(_serviceUrl))
@@ -44,7 +50,15 @@ public void Start()
public void Process(TInput input)
{
var data = _serializer.Serialize(input);
- _producer.Send(data);
+ if (_keySelector is null)
+ {
+ _producer.Send(data); // current behavior :contentReference[oaicite:17]{index=17}
+ }
+ else
+ {
+ var metadata = new MessageMetadata { Key = _keySelector(input) };
+ _producer.Send(metadata, new ReadOnlySequence(data));
+ }
}
public void Stop()
diff --git a/src/Cortex.Streams.Pulsar/PulsarSourceOperator.cs b/src/Cortex.Streams.Pulsar/PulsarSourceOperator.cs
index 30873b4..3a62955 100644
--- a/src/Cortex.Streams.Pulsar/PulsarSourceOperator.cs
+++ b/src/Cortex.Streams.Pulsar/PulsarSourceOperator.cs
@@ -1,16 +1,17 @@
-using DotPulsar.Abstractions;
+using Cortex.Streams.Operators;
+using Cortex.Streams.Pulsar.Deserializers;
using DotPulsar;
+using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using Cortex.Streams.Pulsar.Deserializers;
+using System;
using System.Buffers;
+using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using System;
-using Cortex.Streams.Operators;
namespace Cortex.Streams.Pulsar
{
- public class PulsarSourceOperator : ISourceOperator
+ public class PulsarSourceOperator : ISourceOperator>
{
private readonly string _serviceUrl;
private readonly ConsumerOptions> _consumerOptions;
@@ -51,24 +52,17 @@ public PulsarSourceOperator(string serviceUrl, ConsumerOptions emit)
+ public void Start(Action> emit)
{
_cts = new CancellationTokenSource();
- if (_consumerOptions == null)
- {
- _consumer = _client.NewConsumer()
+ _consumer = _consumerOptions == null
+ ? _client.NewConsumer()
.Topic(_topic)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName($"subscription-{Guid.NewGuid()}")
- .Create();
- }
- else
- {
- _consumer = _client
- .CreateConsumer(_consumerOptions);
- }
-
+ .Create()
+ : _client.CreateConsumer(_consumerOptions);
_consumeTask = Task.Run(async () =>
{
@@ -76,15 +70,15 @@ public void Start(Action emit)
{
await foreach (var message in _consumer.Messages(_cts.Token))
{
- var data = message.Data;
- var output = _deserializer.Deserialize(data);
- emit(output);
+ var key = message.Key ?? string.Empty; // Handle null keys gracefully
+ var output = _deserializer.Deserialize(message.Data);
+ emit(new KeyValuePair(key, output));
await _consumer.Acknowledge(message, _cts.Token);
}
}
catch (OperationCanceledException)
{
- // Consume loop canceled
+ // Cancellation requested
}
finally
{
diff --git a/src/Cortex.Streams.Pulsar/README.md b/src/Cortex.Streams.Pulsar/README.md
new file mode 100644
index 0000000..3b611fb
--- /dev/null
+++ b/src/Cortex.Streams.Pulsar/README.md
@@ -0,0 +1,127 @@
+# Cortex.Streams.Pulsar 🧠
+
+**Cortex.Streams.Pulsar** is a streaming connector for [Apache Pulsar](https://pulsar.apache.org/), designed to work seamlessly within the **Cortex Data Framework**. It enables real-time data ingestion and publication from/to Pulsar topics, now with full support for **message keys** alongside values.
+
+---
+
+## 🌟 Features
+
+- 🔄 **Pulsar Source Operator**: Consume messages (key + value) from Pulsar topics.
+- 🚀 **Pulsar Sink Operator**: Publish messages to Pulsar topics with optional keys.
+- 🧩 **Key Support**: Allows key-based partitioning and stream grouping.
+- 📦 **Seamless DSL Integration**: Easily compose with other Cortex stream operations.
+- ⚡ **Built for Scale**: Backed by Pulsar’s distributed, high-throughput architecture.
+
+---
+
+[](https://github.com/buildersoftio/cortex/blob/master/LICENSE)
+[](https://www.nuget.org/packages/Cortex.Streams.Pulsar)
+[](https://github.com/buildersoftio/cortex)
+[](https://discord.gg/JnMJV33QHu)
+
+
+## 🚀 Getting Started
+
+### Install via NuGet
+
+```bash
+dotnet add package Cortex.Streams.Pulsar
+```
+
+## ✅ Pulsar Sink Operator
+In `Program.cs` or `Startup.cs`:
+```csharp
+using Cortex.Streams;
+using Cortex.Streams.Pulsar;
+
+var pulsarSink = new PulsarSinkOperator("pulsar://localhost:6650", "persistent://public/default/input-topic");
+
+var stream = StreamBuilder
+ .CreateNewStream("PulsarIngester")
+ .Stream()
+ .Sink(pulsarSink)
+ .Build();
+
+stream.Start();
+
+stream.Emit("data1");
+stream.Emit("data2");
+stream.Emit("data3");
+```
+
+## ✅ Pulsar Source Operator
+
+```csharp
+using Cortex.Streams;
+using Cortex.Streams.Pulsar;
+
+var pulsarSource = new PulsarSourceOperator("pulsar://localhost:6650", "persistent://public/default/input-topic");
+
+var stream = StreamBuilder
+ .CreateNewStream("PulsarProcessor")
+ .Stream(pulsarSource)
+ .Map(message => message.ToUpper())
+ .Sink(processed => Console.WriteLine($"Processed: {processed}"))
+ .Build();
+
+stream.Start();
+```
+
+## 🔐 Key Use Cases
+
+- Partition-aware processing using message keys
+- Sessionization and user-based aggregations
+- Scalable event ingestion pipelines
+
+🧱 Prerequisites
+- .NET 6.0 SDK or later
+- Apache Pulsar running locally or remotely
+- Add Cortex.Streams base package
+
+
+## 💬 Contributing
+We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
+
+### 💬 How to Contribute
+1. **Fork the Repository**
+2. **Create a Feature Branch**
+```bash
+git checkout -b feature/YourFeature
+```
+3. **Commit Your Changes**
+```bash
+git commit -m "Add your feature"
+```
+4. **Push to Your Fork**
+```bash
+git push origin feature/YourFeature
+```
+5. **Open a Pull Request**
+
+Describe your changes and submit the pull request for review.
+
+## 📄 License
+This project is licensed under the MIT License.
+
+## 📚 Sponsorship
+Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
+
+### How to Sponsor
+* **Financial Contributions**: Support us through [GitHub Sponsors](https://github.com/sponsors/buildersoftio) or other preferred platforms.
+* **Corporate Sponsorship**: If your organization is interested in sponsoring Cortex, please contact us directly.
+
+Contact Us: cortex@buildersoft.io
+
+
+## Contact
+We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
+
+- Email: cortex@buildersoft.io
+- Website: https://buildersoft.io
+- GitHub Issues: [Cortex Data Framework Issues](https://github.com/buildersoftio/cortex/issues)
+- Join our Discord Community: [](https://discord.gg/JnMJV33QHu)
+
+
+Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
+
+Built with ❤️ by the Buildersoft team.
diff --git a/src/Cortex.Streams/Abstractions/IStream.cs b/src/Cortex.Streams/Abstractions/IStream.cs
index 2c9e895..d382080 100644
--- a/src/Cortex.Streams/Abstractions/IStream.cs
+++ b/src/Cortex.Streams/Abstractions/IStream.cs
@@ -1,15 +1,41 @@
using Cortex.States;
using Cortex.Streams.Operators;
using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
namespace Cortex.Streams
{
public interface IStream
{
+ ///
+ /// Start the stream processing.
+ ///
void Start();
+
+ ///
+ /// Stops the stream processing.
+ ///
void Stop();
+
+ ///
+ /// Processes the specified input value and emits it to the underlying stream.
+ ///
+ /// The input value to be emitted. The meaning and requirements of this value depend on the implementation.
void Emit(TIn value);
- string GetStatus();
+
+ // feature #102: Support async emit with cancellation token
+
+ ///
+ /// Asynchronously emits the specified value to the underlying stream.
+ ///
+ /// The value to emit. The meaning and requirements of this value depend on the implementation.
+ /// A cancellation token that can be used to cancel the emit operation.
+ /// A task that represents the asynchronous emit operation.
+ Task EmitAsync(TIn value, CancellationToken cancellationToken = default);
+
+ StreamStatuses GetStatus();
+
IReadOnlyDictionary> GetBranches();
TStateStore GetStateStoreByName(string name) where TStateStore : IDataStore;
diff --git a/src/Cortex.Streams/Stream.cs b/src/Cortex.Streams/Stream.cs
index d062a94..acc2425 100644
--- a/src/Cortex.Streams/Stream.cs
+++ b/src/Cortex.Streams/Stream.cs
@@ -5,6 +5,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
namespace Cortex.Streams
{
@@ -87,9 +89,9 @@ public void Stop()
/// Gets the current status of the stream.
///
/// A string indicating whether the stream is running or stopped.
- public string GetStatus()
+ public StreamStatuses GetStatus()
{
- return _isStarted ? "Running" : "Stopped";
+ return _isStarted ? StreamStatuses.RUNNING : StreamStatuses.NOT_RUNNING;
}
///
@@ -113,6 +115,34 @@ public void Emit(TIn value)
}
}
+ // feature #102: Support async emit with cancellation token
+
+ ///
+ /// Asynchronously Emits data into the stream when no source operator is used.
+ ///
+ /// The value to emit. The meaning and requirements of this value depend on the implementation.
+ /// A cancellation token that can be used to cancel the emit operation.
+ /// A task that represents the asynchronous emit operation.
+ public Task EmitAsync(TIn value, CancellationToken cancellationToken = default)
+ {
+ if (!_isStarted)
+ throw new InvalidOperationException("Stream has not been started.");
+
+ if (_operatorChain is SourceOperatorAdapter)
+ throw new InvalidOperationException("Cannot manually emit data to a stream with a source operator.");
+
+ // We can only cancel before we queue the work, since operators are synchronous today.
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // Dispatch pipeline work off the caller thread.
+ return Task.Run(() =>
+ {
+ // If you ever add cooperative cancellation to operators,
+ // plumb 'cancellationToken' through and honor it there.
+ _operatorChain.Process(value);
+ }, cancellationToken);
+ }
+
public IReadOnlyDictionary> GetBranches()
{
var branchDict = new Dictionary>();
diff --git a/src/Cortex.Streams/StreamStatuses.cs b/src/Cortex.Streams/StreamStatuses.cs
new file mode 100644
index 0000000..7b0c77d
--- /dev/null
+++ b/src/Cortex.Streams/StreamStatuses.cs
@@ -0,0 +1,8 @@
+namespace Cortex.Streams
+{
+ public enum StreamStatuses
+ {
+ RUNNING,
+ NOT_RUNNING,
+ }
+}
diff --git a/src/Cortex.Vectors/Abstractions/IVector.cs b/src/Cortex.Vectors/Abstractions/IVector.cs
new file mode 100644
index 0000000..f0a3b44
--- /dev/null
+++ b/src/Cortex.Vectors/Abstractions/IVector.cs
@@ -0,0 +1,26 @@
+using System.Collections.Generic;
+using System.Numerics;
+
+namespace Cortex.Vectors
+{
+ ///
+ /// Common contract for vector collections.
+ /// Provides dimension metadata and core linear‑algebra operations.
+ ///
+ /// Any IEEE‑754 floating‑point numeric type (float, double, Half, decimal).
+ public interface IVector : IReadOnlyList where T : IFloatingPointIeee754
+ {
+ /// Gets the number of components in the vector.
+ int Dimension { get; }
+
+ /// Dot (inner) product with another vector.
+ /// Thrown when vector dimensions differ.
+ T Dot(IVector other);
+
+ /// Euclidean norm (L2).
+ T Norm();
+
+ /// Returns a unit‑length copy of this vector.
+ IVector Normalize();
+ }
+}
diff --git a/src/Cortex.Vectors/Assets/andyX.png b/src/Cortex.Vectors/Assets/andyX.png
new file mode 100644
index 0000000..101a1fb
Binary files /dev/null and b/src/Cortex.Vectors/Assets/andyX.png differ
diff --git a/src/Cortex.Vectors/Assets/license.md b/src/Cortex.Vectors/Assets/license.md
new file mode 100644
index 0000000..3c845d4
--- /dev/null
+++ b/src/Cortex.Vectors/Assets/license.md
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2025 Buildersoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/src/Cortex.Vectors/BitVector.cs b/src/Cortex.Vectors/BitVector.cs
new file mode 100644
index 0000000..69c9c99
--- /dev/null
+++ b/src/Cortex.Vectors/BitVector.cs
@@ -0,0 +1,124 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Numerics;
+using System.Runtime.CompilerServices;
+
+namespace Cortex.Vectors
+{
+ ///
+ /// Fixed‑length bit‑packed vector that implements for any IEEE‑754 type .
+ /// Each bit encodes 0 → , 1 → .
+ ///
+ public sealed class BitVector : IVector, IEquatable> where T : IFloatingPointIeee754
+ {
+ private readonly ulong[] _blocks;
+
+ public int Dimension { get; }
+ public int Count => Dimension;
+
+ #region Construction
+ public BitVector(int dimension)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(dimension);
+ Dimension = dimension;
+ _blocks = new ulong[(dimension + 63) >> 6];
+ }
+
+ /// Create from indices that should be set to 1.
+ public BitVector(int dimension, IEnumerable oneIndices) : this(dimension)
+ {
+ foreach (var idx in oneIndices) SetBit(idx, true);
+ }
+
+ /// Create from a span of bools.
+ public BitVector(ReadOnlySpan bits) : this(bits.Length)
+ {
+ for (int i = 0; i < bits.Length; i++) if (bits[i]) SetBit(i, true);
+ }
+ #endregion
+
+ #region Bit helpers
+ [MethodImpl(MethodImplOptions.AggressiveInlining)] private static (int blk, int off) Loc(int idx) => (idx >> 6, idx & 63);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool GetBit(int idx)
+ {
+ if ((uint)idx >= Dimension) throw new IndexOutOfRangeException();
+ var (b, o) = Loc(idx);
+ return (_blocks[b] & (1UL << o)) != 0UL;
+ }
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void SetBit(int idx, bool value)
+ {
+ if ((uint)idx >= Dimension) throw new IndexOutOfRangeException();
+ var (b, o) = Loc(idx);
+ if (value) _blocks[b] |= 1UL << o; else _blocks[b] &= ~(1UL << o);
+ }
+ public int PopCount()
+ {
+ int c = 0; foreach (var v in _blocks) c += BitOperations.PopCount(v); return c;
+ }
+ #endregion
+
+ #region IVector Implementation
+ public T this[int index]
+ {
+ get => GetBit(index) ? T.One : T.Zero;
+ }
+
+ public T Dot(IVector other)
+ {
+ if (other is BitVector bv)
+ {
+ ValidateSameDimension(bv);
+ int count = 0;
+ for (int i = 0; i < _blocks.Length; i++) count += BitOperations.PopCount(_blocks[i] & bv._blocks[i]);
+ return T.CreateTruncating(count);
+ }
+ else
+ {
+ ValidateSameDimension(other);
+ T sum = T.Zero;
+ for (int i = 0; i < Dimension; i++) sum += this[i] * other[i];
+ return sum;
+ }
+ }
+
+ public T Norm() => T.Sqrt(T.CreateTruncating(PopCount()));
+
+ public IVector Normalize()
+ {
+ var n = Norm();
+ if (n == T.Zero) throw new InvalidOperationException("Cannot normalize zero bit‑vector.");
+ var inv = T.One / n;
+ var data = new T[Dimension];
+ for (int i = 0; i < Dimension; i++) if (GetBit(i)) data[i] = inv; // zeros already default
+ return new DenseVector(data);
+ }
+ #endregion
+
+ #region IEnumerable
+ public IEnumerator GetEnumerator()
+ {
+ for (int i = 0; i < Dimension; i++) yield return this[i];
+ }
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ #endregion
+
+ #region Equality & HashCode
+ public bool Equals(BitVector? other)
+ {
+ if (other is null || other.Dimension != Dimension) return false;
+ for (int i = 0; i < _blocks.Length; i++) if (_blocks[i] != other._blocks[i]) return false;
+ return true;
+ }
+ public override bool Equals(object? obj) => obj is BitVector bv && Equals(bv);
+ public override int GetHashCode() => HashCode.Combine(Dimension, _blocks.Length > 0 ? _blocks[0] : 0UL);
+ #endregion
+
+ private void ValidateSameDimension(IVector other)
+ {
+ if (other.Dimension != Dimension) throw new ArgumentException("Vector dimensions must match.", nameof(other));
+ }
+ }
+}
diff --git a/src/Cortex.Vectors/Cortex.Vectors.csproj b/src/Cortex.Vectors/Cortex.Vectors.csproj
new file mode 100644
index 0000000..8e1bda8
--- /dev/null
+++ b/src/Cortex.Vectors/Cortex.Vectors.csproj
@@ -0,0 +1,59 @@
+
+
+
+ net9.0;net8.0
+
+ 2.0.0
+ 2.0.0
+ Buildersoft Cortex Framework
+ Buildersoft
+ Buildersoft,EnesHoxha
+ Copyright © Buildersoft 2025
+
+ Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
+
+
+ https://github.com/buildersoftio/cortex
+ cortex;machine‑learning;vector;ai;streaming
+
+ 2.0.0
+ license.md
+ andyX.png
+ Cortex.Vectors
+ True
+ True
+ True
+ git
+ Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!
+ https://buildersoft.io/
+ README.md
+
+
+
+
+
+
+
+
+
+ True
+ \
+ Always
+
+
+
+
+ True
+
+
+
+ True
+
+
+
+
+
+
+
+
+
diff --git a/src/Cortex.Vectors/DenseVector.cs b/src/Cortex.Vectors/DenseVector.cs
new file mode 100644
index 0000000..15c71e8
--- /dev/null
+++ b/src/Cortex.Vectors/DenseVector.cs
@@ -0,0 +1,151 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Numerics;
+using System.Runtime.CompilerServices;
+
+namespace Cortex.Vectors
+{
+ ///
+ /// Contiguous dense representation of a mathematical vector.
+ /// Suitable for small‑to‑medium dimensions (< 10⁶) that are mostly non‑zero.
+ ///
+ /// Floating‑point element type.
+ public sealed class DenseVector : IVector, IEquatable> where T : IFloatingPointIeee754
+ {
+ private readonly T[] _data;
+
+ #region Construction
+
+ public DenseVector(ReadOnlySpan span)
+ {
+ _data = span.ToArray();
+ }
+
+ public DenseVector(params T[] values)
+ {
+ _data = values.Length == 0
+ ? throw new ArgumentException("Vector must have at least one component.", nameof(values))
+ : (T[])values.Clone();
+ }
+
+ /// Creates a zero‑filled vector of given dimension.
+ public static DenseVector Zeros(int dimension)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(dimension);
+ return new DenseVector(new T[dimension]);
+ }
+
+ /// Creates a vector where every component equals .
+ public static DenseVector Filled(int dimension, T value)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(dimension);
+ var data = new T[dimension];
+ Array.Fill(data, value);
+ return new DenseVector(data);
+ }
+
+ #endregion
+
+ #region IVector & IReadOnlyList Implementation
+
+ public int Dimension => _data.Length;
+
+ public int Count => _data.Length; // IReadOnlyCollection implementation
+
+ public T this[int index] => _data[index];
+
+ public T Dot(IVector other)
+ {
+ ValidateSameDimension(other);
+ T sum = T.Zero;
+ for (int i = 0; i < _data.Length; i++)
+ sum += _data[i] * other[i];
+ return sum;
+ }
+
+ public T Norm()
+ {
+ T sumSq = T.Zero;
+ foreach (var v in _data)
+ sumSq += v * v;
+ return T.Sqrt(sumSq);
+ }
+
+ public IVector Normalize()
+ {
+ var n = Norm();
+ if (n == T.Zero)
+ throw new InvalidOperationException("Cannot normalize the zero vector.");
+ var scaled = new T[_data.Length];
+ for (int i = 0; i < _data.Length; i++)
+ scaled[i] = _data[i] / n;
+ return new DenseVector(scaled);
+ }
+
+ #endregion
+
+ #region Arithmetic Operators
+
+ public static DenseVector operator +(DenseVector left, DenseVector right)
+ {
+ left.ValidateSameDimension(right);
+ var result = new T[left.Dimension];
+ for (int i = 0; i < result.Length; i++)
+ result[i] = left._data[i] + right._data[i];
+ return new DenseVector(result);
+ }
+
+ public static DenseVector operator -(DenseVector left, DenseVector right)
+ {
+ left.ValidateSameDimension(right);
+ var result = new T[left.Dimension];
+ for (int i = 0; i < result.Length; i++)
+ result[i] = left._data[i] - right._data[i];
+ return new DenseVector(result);
+ }
+
+ public static DenseVector operator *(DenseVector vector, T scalar)
+ {
+ var result = new T[vector.Dimension];
+ for (int i = 0; i < result.Length; i++)
+ result[i] = vector._data[i] * scalar;
+ return new DenseVector(result);
+ }
+
+ public static DenseVector operator *(T scalar, DenseVector vector) => vector * scalar;
+
+ #endregion
+
+ #region Equality & Hash
+
+ public bool Equals(DenseVector? other)
+ {
+ if (other is null || other.Dimension != Dimension) return false;
+ for (int i = 0; i < Dimension; i++)
+ if (_data[i] != other._data[i]) return false;
+ return true;
+ }
+
+ public override bool Equals(object? obj) => obj is DenseVector v && Equals(v);
+
+ public override int GetHashCode() => HashCode.Combine(Dimension, _data[0], _data[^1]);
+
+ #endregion
+
+ #region IEnumerable Implementation
+
+ public IEnumerator GetEnumerator() => ((IEnumerable)_data).GetEnumerator();
+ IEnumerator IEnumerable.GetEnumerator() => _data.GetEnumerator();
+
+ #endregion
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void ValidateSameDimension(IVector other)
+ {
+ if (other.Dimension != Dimension)
+ throw new ArgumentException($"Vector dimensions must match (this: {Dimension}, other: {other.Dimension}).", nameof(other));
+ }
+ }
+
+}
diff --git a/src/Cortex.Vectors/README.md b/src/Cortex.Vectors/README.md
new file mode 100644
index 0000000..773ac55
--- /dev/null
+++ b/src/Cortex.Vectors/README.md
@@ -0,0 +1,123 @@
+# Cortex.Vectors 🧠
+
+**Cortex.Vectors** is a High‑performance vector types—Dense, Sparse, and Bit—for AI & for .NET.
+
+
+Built as part of the [Cortex Data Framework](https://github.com/buildersoftio/cortex), this library offers High‑performance vector types—Dense, Sparse, and Bit—for AI for:
+
+
+- ✨ Generic‑math powered (IFloatingPointIeee754): works with float, double, decimal, …
+- 🟢 DenseVector – contiguous storage, SIMD‑friendly operations
+- 🔵 SparseVector – dictionary‑backed, memory‑efficient for huge, mostly‑zero spaces
+- 🟡 BitVector – bit‑packed booleans with popcount & logical ops
+- ⚙️ Core ops out‑of‑the‑box: dot product, L2 norm, cosine similarity, scaling, +/‑
+
+---
+
+[](https://github.com/buildersoftio/cortex/blob/master/LICENSE)
+[](https://www.nuget.org/packages/Cortex.Vectors)
+[](https://github.com/buildersoftio/cortex)
+[](https://discord.gg/JnMJV33QHu)
+
+
+## 🚀 Getting Started
+
+### Install via NuGet
+
+```bash
+dotnet add package Cortex.Vectors
+```
+
+## DenseVector
+```csharp
+using Cortex.Vectors;
+
+// (1, 2, 3)
+var a = new DenseVector(1f, 2f, 3f);
+
+// (0.5, 0.5, 0.5)
+var b = DenseVector.Filled(3, 0.5f);
+
+float dot = a.Dot(b); // = 3.0
+var normA = a.Norm(); // ≈ 3.7417
+var unitA = a.Normalize(); // unit length
+float cosine = a.CosineSimilarity(b);
+```
+
+## SparseVector
+```csharp
+using Cortex.Vectors;
+using System.Collections.Generic;
+
+// 1‑million‑dimensional vector with two non‑zeros
+var sv = new SparseVector(
+ dimension: 1_000_000,
+ nonZero: new[]
+ {
+ new KeyValuePair(42, 1.0),
+ new KeyValuePair(123456, 2.5)
+ });
+
+double l2 = sv.Norm(); // √(1² + 2.5²)
+var unit = sv.Normalize();
+```
+
+## BitVector
+
+```csharp
+using Cortex.Vectors;
+
+// length 128, bits 0, 3, and 5 set to 1
+var bv = new BitVector(128, new[] { 0, 3, 5 });
+
+int ones = bv.PopCount(); // 3
+float selfDot = bv.Dot(bv); // 3.0 (generic type ⇒ float)
+var l2 = bv.Norm(); // √3
+```
+
+## 💬 Contributing
+We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
+
+### 💬 How to Contribute
+1. **Fork the Repository**
+2. **Create a Feature Branch**
+```bash
+git checkout -b feature/YourFeature
+```
+3. **Commit Your Changes**
+```bash
+git commit -m "Add your feature"
+```
+4. **Push to Your Fork**
+```bash
+git push origin feature/YourFeature
+```
+5. **Open a Pull Request**
+
+Describe your changes and submit the pull request for review.
+
+## 📄 License
+This project is licensed under the MIT License.
+
+## 📚 Sponsorship
+Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
+
+### How to Sponsor
+* **Financial Contributions**: Support us through [GitHub Sponsors](https://github.com/sponsors/buildersoftio) or other preferred platforms.
+* **Corporate Sponsorship**: If your organization is interested in sponsoring Cortex, please contact us directly.
+
+Contact Us: cortex@buildersoft.io
+
+
+## Contact
+We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
+
+- Email: cortex@buildersoft.io
+- Website: https://buildersoft.io
+- GitHub Issues: [Cortex Data Framework Issues](https://github.com/buildersoftio/cortex/issues)
+- Join our Discord Community: [](https://discord.gg/JnMJV33QHu)
+
+
+Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
+
+Built with ❤️ by the Buildersoft team.
diff --git a/src/Cortex.Vectors/SparseVector.cs b/src/Cortex.Vectors/SparseVector.cs
new file mode 100644
index 0000000..bea7c51
--- /dev/null
+++ b/src/Cortex.Vectors/SparseVector.cs
@@ -0,0 +1,145 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Numerics;
+using System.Runtime.CompilerServices;
+
+namespace Cortex.Vectors
+{
+ public sealed class SparseVector : IVector, IEquatable> where T : IFloatingPointIeee754
+ {
+ private readonly int _dimension;
+ private readonly Dictionary _values;
+
+ #region Construction
+ public SparseVector(int dimension) : this(dimension, Enumerable.Empty>()) { }
+
+ public SparseVector(int dimension, IEnumerable> nonZero)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(dimension);
+ _dimension = dimension;
+ _values = new Dictionary();
+ foreach (var (i, val) in nonZero)
+ {
+ if (i < 0 || i >= dimension) throw new ArgumentOutOfRangeException(nameof(nonZero), "Index out of range.");
+ if (val != T.Zero) _values[i] = val;
+ }
+ }
+
+ /// Creates a sparse vector where the provided indices hold the same .
+ public static SparseVector FromIndices(int dimension, IEnumerable indices, T value)
+ => new SparseVector(dimension, indices.Select(i => new KeyValuePair(i, value)));
+ #endregion
+
+ #region IReadOnlyList Implementation
+ public int Dimension => _dimension;
+ public int Count => _dimension; // total logical length
+ public int NonZeroCount => _values.Count;
+
+ public T this[int index]
+ {
+ get
+ {
+ if ((uint)index >= _dimension) throw new IndexOutOfRangeException();
+ return _values.TryGetValue(index, out var v) ? v : T.Zero;
+ }
+ }
+ #endregion
+
+ #region Core Vector Operations
+ public T Dot(IVector other)
+ {
+ ValidateSameDimension(other);
+ T sum = T.Zero;
+ foreach (var (i, v) in _values) sum += v * other[i];
+ return sum;
+ }
+
+ public T Norm()
+ {
+ T sumSq = T.Zero;
+ foreach (var v in _values.Values) sumSq += v * v;
+ return T.Sqrt(sumSq);
+ }
+
+ public IVector Normalize()
+ {
+ var n = Norm();
+ if (n == T.Zero) throw new InvalidOperationException("Cannot normalize zero vector.");
+ var scaled = _values.Select(kv => new KeyValuePair(kv.Key, kv.Value / n));
+ return new SparseVector(_dimension, scaled);
+ }
+ #endregion
+
+ #region Operators
+ public static SparseVector operator +(SparseVector a, SparseVector b)
+ {
+ a.ValidateSameDimension(b);
+ var result = new Dictionary(a._values);
+ foreach (var (i, v) in b._values)
+ {
+ if (result.TryGetValue(i, out var existing))
+ {
+ var sum = existing + v;
+ if (sum == T.Zero) result.Remove(i); else result[i] = sum;
+ }
+ else result[i] = v;
+ }
+ return new SparseVector(a._dimension, result);
+ }
+
+ public static SparseVector operator -(SparseVector a, SparseVector b)
+ {
+ a.ValidateSameDimension(b);
+ var result = new Dictionary(a._values);
+ foreach (var (i, v) in b._values)
+ {
+ if (result.TryGetValue(i, out var existing))
+ {
+ var diff = existing - v;
+ if (diff == T.Zero) result.Remove(i); else result[i] = diff;
+ }
+ else if (v != T.Zero) result[i] = -v;
+ }
+ return new SparseVector(a._dimension, result);
+ }
+
+ public static SparseVector operator *(SparseVector vector, T scalar)
+ {
+ if (scalar == T.Zero) return new SparseVector(vector._dimension);
+ var result = vector._values.ToDictionary(k => k.Key, k => k.Value * scalar);
+ return new SparseVector(vector._dimension, result);
+ }
+
+ public static SparseVector operator *(T scalar, SparseVector vector) => vector * scalar;
+ #endregion
+
+ #region Equality & Hashing
+ public bool Equals(SparseVector? other)
+ {
+ if (other is null || other._dimension != _dimension || other._values.Count != _values.Count) return false;
+ foreach (var kv in _values)
+ if (!other._values.TryGetValue(kv.Key, out var v) || v != kv.Value) return false;
+ return true;
+ }
+
+ public override bool Equals(object? obj) => obj is SparseVector sv && Equals(sv);
+ public override int GetHashCode() => HashCode.Combine(_dimension, _values.Count);
+ #endregion
+
+ #region Enumeration
+ public IEnumerator GetEnumerator()
+ {
+ for (int i = 0; i < _dimension; i++) yield return this[i];
+ }
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ #endregion
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void ValidateSameDimension(IVector other)
+ {
+ if (other.Dimension != _dimension) throw new ArgumentException("Vector dimensions must match.", nameof(other));
+ }
+ }
+}
diff --git a/src/Cortex.Vectors/VectorExtensions.cs b/src/Cortex.Vectors/VectorExtensions.cs
new file mode 100644
index 0000000..6ee463e
--- /dev/null
+++ b/src/Cortex.Vectors/VectorExtensions.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+using System.Numerics;
+
+namespace Cortex.Vectors
+{
+ public static class VectorExtensions
+ {
+ public static T CosineSimilarity(this IVector a, IVector b) where T : IFloatingPointIeee754
+ {
+ var denom = a.Norm() * b.Norm();
+ return denom == T.Zero ? T.Zero : a.Dot(b) / denom;
+ }
+
+ public static SparseVector ToSparse(this DenseVector v) where T : IFloatingPointIeee754
+ => new SparseVector(v.Dimension,
+ Enumerable.Range(0, v.Dimension)
+ .Where(i => v[i] != T.Zero)
+ .Select(i => new KeyValuePair(i, v[i])));
+ }
+}