diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..5531f68
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,15 @@
+# Cortex Code of Conduct
+
+**All creatures are welcome**: We aim to create a safe space for all community members, regardless of their age, race, gender, sexual orientation, physical appearance or disability, choice of text editor, or any other qualities by which living beings can be discriminated.
+
+**Be excellent to each other**: We do not tolerate verbal or physical harassment, violence or intimidation.
+
+We do not tolerate life forms who refuse to share this openness and respect towards others: Creatures that are not excellent to others are not welcome.
+
+We continuously strive to make our community a better place for everyone – in the best tradition of hackers we "build, test, improve, reiterate". In this ongoing adventure, we rely on the support, courage, and creativity of all members of the Cortex community.
+
+If you are made uncomfortable in your role as Cortex community member, please let us know: You can reach us at cortex@buildersoft.io. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident.
+
+#### Attribution
+
+This Code of Conduct is adapted from the [CCC event CoC](https://www.ccc.de/en/updates/2016/a-reminder-to-be-excellent-to-each-other)
\ No newline at end of file
diff --git a/Cortex.sln b/Cortex.sln
index a87ef46..5449f85 100644
--- a/Cortex.sln
+++ b/Cortex.sln
@@ -45,6 +45,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.ClickHouse",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.Http", "src\Cortex.Streams.Http\Cortex.Streams.Http.csproj", "{20BD7107-8199-4CA8-815B-4D156B522B82}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.SQLite", "src\Cortex.States.SQLite\Cortex.States.SQLite.csproj", "{19167D25-6383-46B4-9449-B9E364F809FF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.MSSqlServer", "src\Cortex.Streams.MSSqlServer\Cortex.Streams.MSSqlServer.csproj", "{81A01446-A8AA-4F9D-BB9B-B66E21B2C348}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.PostgreSQL", "src\Cortex.Streams.PostgreSQL\Cortex.Streams.PostgreSQL.csproj", "{0E60F75D-C44B-428A-9252-A11C365E2C56}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.MongoDb", "src\Cortex.Streams.MongoDb\Cortex.Streams.MongoDb.csproj", "{FC86D3AB-778D-45D7-AF36-1F89FC16DE55}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -134,6 +142,22 @@ Global
{20BD7107-8199-4CA8-815B-4D156B522B82}.Debug|Any CPU.Build.0 = Debug|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Release|Any CPU.ActiveCfg = Release|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Release|Any CPU.Build.0 = Release|Any CPU
+ {19167D25-6383-46B4-9449-B9E364F809FF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {19167D25-6383-46B4-9449-B9E364F809FF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {19167D25-6383-46B4-9449-B9E364F809FF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {19167D25-6383-46B4-9449-B9E364F809FF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {81A01446-A8AA-4F9D-BB9B-B66E21B2C348}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {81A01446-A8AA-4F9D-BB9B-B66E21B2C348}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {81A01446-A8AA-4F9D-BB9B-B66E21B2C348}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {81A01446-A8AA-4F9D-BB9B-B66E21B2C348}.Release|Any CPU.Build.0 = Release|Any CPU
+ {0E60F75D-C44B-428A-9252-A11C365E2C56}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0E60F75D-C44B-428A-9252-A11C365E2C56}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0E60F75D-C44B-428A-9252-A11C365E2C56}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0E60F75D-C44B-428A-9252-A11C365E2C56}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FC86D3AB-778D-45D7-AF36-1F89FC16DE55}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FC86D3AB-778D-45D7-AF36-1F89FC16DE55}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FC86D3AB-778D-45D7-AF36-1F89FC16DE55}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FC86D3AB-778D-45D7-AF36-1F89FC16DE55}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/README.md b/README.md
index 4cd9b0e..66e606b 100644
--- a/README.md
+++ b/README.md
@@ -72,6 +72,15 @@
- **Cortex.Streams.Http:** Implementation of Http Source and Sink operators.
[](https://www.nuget.org/packages/Cortex.Streams.Http)
+- **Cortex.Streams.CDC.MSSqlServer:** CDC adapter for Microsoft Sql Server.
+[](https://www.nuget.org/packages/Cortex.Streams.CDC.MSSqlServer)
+
+- **Cortex.Streams.CDC.PostgreSQL:** CDC adapter for PostgreSQL.
+[](https://www.nuget.org/packages/Cortex.Streams.CDC.PostgreSQL)
+
+- **Cortex.Streams.CDC.MongoDb:** CDC adapter for MongoDb.
+[](https://www.nuget.org/packages/Cortex.Streams.CDC.MongoDb)
+
- **Cortex.States:** Core state management functionalities.
[](https://www.nuget.org/packages/Cortex.States)
@@ -93,6 +102,9 @@
- **Cortex.States.ClickHouse:** Persistent state storage using Clickhouse.
[](https://www.nuget.org/packages/Cortex.States.ClickHouse)
+- **Cortex.States.SQLite:** Persistent state storage using SQLite.
+[](https://www.nuget.org/packages/Cortex.States.SQLite)
+
- **Cortex.Telemetry:** Core library to add support for Tracing and Matrics.
[](https://www.nuget.org/packages/Cortex.Telemetry)
diff --git a/src/Cortex.Mediator/Assets/license.md b/src/Cortex.Mediator/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Mediator/Assets/license.md
+++ b/src/Cortex.Mediator/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Mediator/Cortex.Mediator.csproj b/src/Cortex.Mediator/Cortex.Mediator.csproj
index 0dc2c70..aacd7de 100644
--- a/src/Cortex.Mediator/Cortex.Mediator.csproj
+++ b/src/Cortex.Mediator/Cortex.Mediator.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Buildersoft Cortex Mediator is a library for .NET applications that implements the mediator pattern. It helps to reduce dependencies between objects by allowing in-process messaging without direct communication. Instead, objects communicate through Cortex Mediator, making them less coupled and more maintainable..
diff --git a/src/Cortex.States.Cassandra/Assets/license.md b/src/Cortex.States.Cassandra/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.Cassandra/Assets/license.md
+++ b/src/Cortex.States.Cassandra/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.Cassandra/CassandraStateStore.cs b/src/Cortex.States.Cassandra/CassandraStateStore.cs
index 2c358eb..d750b9e 100644
--- a/src/Cortex.States.Cassandra/CassandraStateStore.cs
+++ b/src/Cortex.States.Cassandra/CassandraStateStore.cs
@@ -9,7 +9,7 @@
namespace Cortex.States.Cassandra
{
- public class CassandraStateStore : IStateStore
+ public class CassandraStateStore : IDataStore
{
private readonly ISession _session;
private readonly string _keyspace;
diff --git a/src/Cortex.States.Cassandra/Cortex.States.Cassandra.csproj b/src/Cortex.States.Cassandra/Cortex.States.Cassandra.csproj
index 30b781c..ee91411 100644
--- a/src/Cortex.States.Cassandra/Cortex.States.Cassandra.csproj
+++ b/src/Cortex.States.Cassandra/Cortex.States.Cassandra.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.ClickHouse/Assets/license.md b/src/Cortex.States.ClickHouse/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.ClickHouse/Assets/license.md
+++ b/src/Cortex.States.ClickHouse/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.ClickHouse/ClickHouseStateStore.cs b/src/Cortex.States.ClickHouse/ClickHouseStateStore.cs
index 9236d32..c6844c6 100644
--- a/src/Cortex.States.ClickHouse/ClickHouseStateStore.cs
+++ b/src/Cortex.States.ClickHouse/ClickHouseStateStore.cs
@@ -8,7 +8,7 @@
namespace Cortex.States.ClickHouse
{
- public class ClickHouseStateStore : IStateStore, IDisposable
+ public class ClickHouseStateStore : IDataStore, IDisposable
where TValue : new()
{
private readonly string _connectionString;
diff --git a/src/Cortex.States.ClickHouse/Cortex.States.ClickHouse.csproj b/src/Cortex.States.ClickHouse/Cortex.States.ClickHouse.csproj
index e40cbcb..92a3681 100644
--- a/src/Cortex.States.ClickHouse/Cortex.States.ClickHouse.csproj
+++ b/src/Cortex.States.ClickHouse/Cortex.States.ClickHouse.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.MSSqlServer/Assets/license.md b/src/Cortex.States.MSSqlServer/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.MSSqlServer/Assets/license.md
+++ b/src/Cortex.States.MSSqlServer/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.MSSqlServer/Cortex.States.MSSqlServer.csproj b/src/Cortex.States.MSSqlServer/Cortex.States.MSSqlServer.csproj
index 86bf0bc..7ca2e78 100644
--- a/src/Cortex.States.MSSqlServer/Cortex.States.MSSqlServer.csproj
+++ b/src/Cortex.States.MSSqlServer/Cortex.States.MSSqlServer.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.MSSqlServer/SqlServerKeyValueStateStore.cs b/src/Cortex.States.MSSqlServer/SqlServerKeyValueStateStore.cs
index 615fefc..0c930ee 100644
--- a/src/Cortex.States.MSSqlServer/SqlServerKeyValueStateStore.cs
+++ b/src/Cortex.States.MSSqlServer/SqlServerKeyValueStateStore.cs
@@ -7,7 +7,7 @@
namespace Cortex.States.MSSqlServer
{
- public class SqlServerKeyValueStateStore : IStateStore, IDisposable
+ public class SqlServerKeyValueStateStore : IDataStore, IDisposable
{
private readonly string _connectionString;
private readonly string _schemaName;
diff --git a/src/Cortex.States.MSSqlServer/SqlServerStateStore.cs b/src/Cortex.States.MSSqlServer/SqlServerStateStore.cs
index 807cd01..4a71176 100644
--- a/src/Cortex.States.MSSqlServer/SqlServerStateStore.cs
+++ b/src/Cortex.States.MSSqlServer/SqlServerStateStore.cs
@@ -8,7 +8,7 @@
namespace Cortex.States.MSSqlServer
{
- public class SqlServerStateStore : IStateStore, IDisposable
+ public class SqlServerStateStore : IDataStore, IDisposable
where TValue : new()
{
private readonly string _connectionString;
diff --git a/src/Cortex.States.MongoDb/Assets/license.md b/src/Cortex.States.MongoDb/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.MongoDb/Assets/license.md
+++ b/src/Cortex.States.MongoDb/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.MongoDb/Cortex.States.MongoDb.csproj b/src/Cortex.States.MongoDb/Cortex.States.MongoDb.csproj
index d65c4e7..2135316 100644
--- a/src/Cortex.States.MongoDb/Cortex.States.MongoDb.csproj
+++ b/src/Cortex.States.MongoDb/Cortex.States.MongoDb.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.MongoDb/MongoDbStateStore.cs b/src/Cortex.States.MongoDb/MongoDbStateStore.cs
index 3cf999c..ae9332b 100644
--- a/src/Cortex.States.MongoDb/MongoDbStateStore.cs
+++ b/src/Cortex.States.MongoDb/MongoDbStateStore.cs
@@ -11,7 +11,7 @@ namespace Cortex.States.MongoDb
///
/// The type of the key. Must be serializable by MongoDB.
/// The type of the value. Must be serializable by MongoDB.
- public class MongoDbStateStore : IStateStore
+ public class MongoDbStateStore : IDataStore
{
private readonly IMongoCollection> _collection;
public string Name { get; }
diff --git a/src/Cortex.States.PostgreSQL/Assets/license.md b/src/Cortex.States.PostgreSQL/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.PostgreSQL/Assets/license.md
+++ b/src/Cortex.States.PostgreSQL/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.PostgreSQL/Cortex.States.PostgreSQL.csproj b/src/Cortex.States.PostgreSQL/Cortex.States.PostgreSQL.csproj
index fd0a4eb..84285e2 100644
--- a/src/Cortex.States.PostgreSQL/Cortex.States.PostgreSQL.csproj
+++ b/src/Cortex.States.PostgreSQL/Cortex.States.PostgreSQL.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.PostgreSQL/PostgresKeyValueStateStore.cs b/src/Cortex.States.PostgreSQL/PostgresKeyValueStateStore.cs
index 82578d9..df007e3 100644
--- a/src/Cortex.States.PostgreSQL/PostgresKeyValueStateStore.cs
+++ b/src/Cortex.States.PostgreSQL/PostgresKeyValueStateStore.cs
@@ -7,7 +7,7 @@
namespace Cortex.States.PostgreSQL
{
- public class PostgresKeyValueStateStore : IStateStore, IDisposable
+ public class PostgresKeyValueStateStore : IDataStore, IDisposable
{
private readonly string _connectionString;
private readonly string _schemaName;
diff --git a/src/Cortex.States.PostgreSQL/PostgresStateStore.cs b/src/Cortex.States.PostgreSQL/PostgresStateStore.cs
index 03acb60..344d5f2 100644
--- a/src/Cortex.States.PostgreSQL/PostgresStateStore.cs
+++ b/src/Cortex.States.PostgreSQL/PostgresStateStore.cs
@@ -7,7 +7,7 @@
namespace Cortex.States.PostgreSQL
{
- public class PostgresStateStore : IStateStore, IDisposable where TValue : new()
+ public class PostgresStateStore : IDataStore, IDisposable where TValue : new()
{
private readonly string _connectionString;
private readonly string _schemaName;
diff --git a/src/Cortex.States.RocksDb/Assets/license.md b/src/Cortex.States.RocksDb/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States.RocksDb/Assets/license.md
+++ b/src/Cortex.States.RocksDb/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj b/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj
index 96660c6..44c31b0 100644
--- a/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj
+++ b/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States.RocksDb/RocksDbStateStore.cs b/src/Cortex.States.RocksDb/RocksDbStateStore.cs
index 2b840bf..487ed63 100644
--- a/src/Cortex.States.RocksDb/RocksDbStateStore.cs
+++ b/src/Cortex.States.RocksDb/RocksDbStateStore.cs
@@ -6,7 +6,7 @@
namespace Cortex.States.RocksDb
{
- public class RocksDbStateStore : IStateStore
+ public class RocksDbStateStore : IDataStore
{
private readonly RocksDbSharp.RocksDb _db;
private readonly ColumnFamilyHandle _handle;
diff --git a/src/Cortex.States.SQLite/Assets/cortex.png b/src/Cortex.States.SQLite/Assets/cortex.png
new file mode 100644
index 0000000..a4f9727
Binary files /dev/null and b/src/Cortex.States.SQLite/Assets/cortex.png differ
diff --git a/src/Cortex.States.SQLite/Assets/license.md b/src/Cortex.States.SQLite/Assets/license.md
new file mode 100644
index 0000000..3c845d4
--- /dev/null
+++ b/src/Cortex.States.SQLite/Assets/license.md
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2025 Buildersoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/src/Cortex.States.SQLite/Cortex.States.SQLite.csproj b/src/Cortex.States.SQLite/Cortex.States.SQLite.csproj
new file mode 100644
index 0000000..7581fd5
--- /dev/null
+++ b/src/Cortex.States.SQLite/Cortex.States.SQLite.csproj
@@ -0,0 +1,55 @@
+
+
+
+ net9.0;net8.0;net7.0
+
+ 1.0.1
+ 1.0.1
+ Buildersoft Cortex Framework
+ Buildersoft
+ Buildersoft,EnesHoxha
+ Copyright © Buildersoft 2025
+
+ Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
+
+
+ https://github.com/buildersoftio/cortex
+ cortex vortex mediator eda streaming distributed streams states sqlite
+
+ 1.0.1
+ license.md
+ cortex.png
+ Cortex.States.SQLite
+ True
+ True
+ True
+
+ Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!
+ https://buildersoft.io/
+ README.md
+
+
+
+
+ True
+ \
+
+
+ True
+
+
+
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Cortex.States.SQLite/SqliteKeyValueStateStore.cs b/src/Cortex.States.SQLite/SqliteKeyValueStateStore.cs
new file mode 100644
index 0000000..fe9db30
--- /dev/null
+++ b/src/Cortex.States.SQLite/SqliteKeyValueStateStore.cs
@@ -0,0 +1,238 @@
+using Microsoft.Data.Sqlite;
+using System;
+using System.Collections.Generic;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Cortex.States.Sqlite
+{
+ public class SqliteKeyValueStateStore : IDataStore, IDisposable
+ {
+ private readonly string _connectionString;
+ private readonly string _tableName;
+ private readonly Func _keySerializer;
+ private readonly Func _valueSerializer;
+ private readonly Func _keyDeserializer;
+ private readonly Func _valueDeserializer;
+
+ private static readonly SemaphoreSlim _initializationLock = new SemaphoreSlim(1, 1);
+ private volatile bool _isInitialized;
+
+ public string Name { get; }
+
+ ///
+ /// Initializes a new instance of the SqliteKeyValueStateStore.
+ ///
+ /// A friendly name for the store.
+ ///
+ /// The connection string to the SQLite database.
+ /// For example, "Data Source=MyDatabase.db" or an in-memory DB "Data Source=:memory:".
+ ///
+ /// The name of the table to use for storing state entries.
+ /// Optional key serializer. If not provided, JSON serialization is used.
+ /// Optional value serializer. If not provided, JSON serialization is used.
+ /// Optional key deserializer. If not provided, JSON deserialization is used.
+ /// Optional value deserializer. If not provided, JSON deserialization is used.
+ public SqliteKeyValueStateStore(
+ string name,
+ string connectionString,
+ string tableName,
+ Func keySerializer = null,
+ Func valueSerializer = null,
+ Func keyDeserializer = null,
+ Func valueDeserializer = null)
+ {
+ if (string.IsNullOrWhiteSpace(name))
+ throw new ArgumentNullException(nameof(name));
+ if (string.IsNullOrWhiteSpace(connectionString))
+ throw new ArgumentNullException(nameof(connectionString));
+ if (string.IsNullOrWhiteSpace(tableName))
+ throw new ArgumentNullException(nameof(tableName));
+
+ Name = name;
+ _connectionString = connectionString;
+ _tableName = tableName;
+
+ // Assign custom or default (JSON-based) serializers/deserializers
+ _keySerializer = keySerializer ?? (key => JsonSerializer.Serialize(key));
+ _valueSerializer = valueSerializer ?? (value => JsonSerializer.Serialize(value));
+ _keyDeserializer = keyDeserializer ?? (str => JsonSerializer.Deserialize(str)!);
+ _valueDeserializer = valueDeserializer ?? (str => JsonSerializer.Deserialize(str)!);
+
+ // Initialize the table
+ InitializeAsync().GetAwaiter().GetResult();
+ }
+
+ private async Task InitializeAsync()
+ {
+ if (_isInitialized) return;
+
+ await _initializationLock.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ if (_isInitialized) return;
+
+ using (var connection = new SqliteConnection(_connectionString))
+ {
+ await connection.OpenAsync().ConfigureAwait(false);
+
+ // Create the table if it does not exist
+ var createTableSql = $@"
+ CREATE TABLE IF NOT EXISTS [{_tableName}] (
+ [key] TEXT NOT NULL PRIMARY KEY,
+ [value] TEXT
+ );";
+
+ using (var cmd = new SqliteCommand(createTableSql, connection))
+ {
+ await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
+ }
+ }
+
+ _isInitialized = true;
+ }
+ finally
+ {
+ _initializationLock.Release();
+ }
+ }
+
+ private void EnsureInitialized()
+ {
+ if (!_isInitialized)
+ {
+ throw new InvalidOperationException("SqliteKeyValueStateStore is not properly initialized.");
+ }
+ }
+
+ public TValue Get(TKey key)
+ {
+ EnsureInitialized();
+
+ var serializedKey = _keySerializer(key);
+
+ var sql = $@"SELECT [value] FROM [{_tableName}] WHERE [key] = @key;";
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ cmd.Parameters.AddWithValue("@key", (object)serializedKey ?? DBNull.Value);
+
+ connection.Open();
+ var result = cmd.ExecuteScalar() as string;
+ if (result == null)
+ return default;
+ return _valueDeserializer(result);
+ }
+ }
+
+ public void Put(TKey key, TValue value)
+ {
+ EnsureInitialized();
+
+ var serializedKey = _keySerializer(key);
+ var serializedValue = _valueSerializer(value);
+
+ // In SQLite, a common "upsert" pattern is using INSERT OR REPLACE (or INSERT ON CONFLICT).
+ var sql = $@"
+ INSERT OR REPLACE INTO [{_tableName}] ([key], [value])
+ VALUES (@key, @value);";
+
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ cmd.Parameters.AddWithValue("@key", (object)serializedKey ?? DBNull.Value);
+ cmd.Parameters.AddWithValue("@value", (object)serializedValue ?? DBNull.Value);
+
+ connection.Open();
+ cmd.ExecuteNonQuery();
+ }
+ }
+
+ public bool ContainsKey(TKey key)
+ {
+ EnsureInitialized();
+
+ var serializedKey = _keySerializer(key);
+
+ var sql = $@"SELECT COUNT(*) FROM [{_tableName}] WHERE [key] = @key;";
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ cmd.Parameters.AddWithValue("@key", (object)serializedKey ?? DBNull.Value);
+
+ connection.Open();
+ var count = (long)cmd.ExecuteScalar()!; // In SQLite, COUNT returns long
+ return count > 0;
+ }
+ }
+
+ public void Remove(TKey key)
+ {
+ EnsureInitialized();
+
+ var serializedKey = _keySerializer(key);
+
+ var sql = $@"DELETE FROM [{_tableName}] WHERE [key] = @key;";
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ cmd.Parameters.AddWithValue("@key", (object)serializedKey ?? DBNull.Value);
+
+ connection.Open();
+ cmd.ExecuteNonQuery();
+ }
+ }
+
+ public IEnumerable> GetAll()
+ {
+ EnsureInitialized();
+
+ var sql = $@"SELECT [key], [value] FROM [{_tableName}];";
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ connection.Open();
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var serializedKey = reader.GetString(0);
+ var serializedValue = reader.IsDBNull(1) ? null : reader.GetString(1);
+
+ var key = _keyDeserializer(serializedKey);
+ var value = serializedValue == null ? default : _valueDeserializer(serializedValue);
+
+ yield return new KeyValuePair(key, value!);
+ }
+ }
+ }
+ }
+
+ public IEnumerable GetKeys()
+ {
+ EnsureInitialized();
+
+ var sql = $@"SELECT [key] FROM [{_tableName}];";
+ using (var connection = new SqliteConnection(_connectionString))
+ using (var cmd = new SqliteCommand(sql, connection))
+ {
+ connection.Open();
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var serializedKey = reader.GetString(0);
+ yield return _keyDeserializer(serializedKey);
+ }
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ // Nothing special to dispose in this implementation.
+ _initializationLock.Dispose();
+ }
+ }
+}
diff --git a/src/Cortex.States/Abstractions/IStateStore.cs b/src/Cortex.States/Abstractions/IStateStore.cs
index 2195f46..7b17990 100644
--- a/src/Cortex.States/Abstractions/IStateStore.cs
+++ b/src/Cortex.States/Abstractions/IStateStore.cs
@@ -2,13 +2,12 @@
namespace Cortex.States
{
- public interface IStateStore
+ public interface IDataStore
{
string Name { get; }
}
-
- public interface IStateStore : IStateStore
+ public interface IDataStore : IDataStore
{
TValue Get(TKey key);
void Put(TKey key, TValue value);
diff --git a/src/Cortex.States/Abstractions/ITable.cs b/src/Cortex.States/Abstractions/ITable.cs
new file mode 100644
index 0000000..9b129dc
--- /dev/null
+++ b/src/Cortex.States/Abstractions/ITable.cs
@@ -0,0 +1,12 @@
+using System.Collections.Generic;
+
+namespace Cortex.States.Abstractions
+{
+ public interface ITable
+ {
+ TValue Get(TKey key);
+ bool ContainsKey(TKey key);
+ IEnumerable> GetAll();
+ IEnumerable GetKeys();
+ }
+}
diff --git a/src/Cortex.States/Assets/license.md b/src/Cortex.States/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.States/Assets/license.md
+++ b/src/Cortex.States/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.States/Cortex.States.csproj b/src/Cortex.States/Cortex.States.csproj
index 0e7ec29..e09b8c0 100644
--- a/src/Cortex.States/Cortex.States.csproj
+++ b/src/Cortex.States/Cortex.States.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.States/InMemoryStateStore.cs b/src/Cortex.States/InMemoryStateStore.cs
index a7b0fbb..e898aed 100644
--- a/src/Cortex.States/InMemoryStateStore.cs
+++ b/src/Cortex.States/InMemoryStateStore.cs
@@ -4,7 +4,7 @@
namespace Cortex.States
{
- public class InMemoryStateStore : IStateStore
+ public class InMemoryStateStore : IDataStore
{
private readonly ConcurrentDictionary _store = new ConcurrentDictionary();
diff --git a/src/Cortex.States/Operators/IStatefulOperator.cs b/src/Cortex.States/Operators/IStatefulOperator.cs
index 810c164..47cd0c8 100644
--- a/src/Cortex.States/Operators/IStatefulOperator.cs
+++ b/src/Cortex.States/Operators/IStatefulOperator.cs
@@ -4,6 +4,6 @@ namespace Cortex.States.Operators
{
public interface IStatefulOperator
{
- IEnumerable GetStateStores();
+ IEnumerable GetStateStores();
}
}
diff --git a/src/Cortex.States/Table.cs b/src/Cortex.States/Table.cs
new file mode 100644
index 0000000..9b5c9de
--- /dev/null
+++ b/src/Cortex.States/Table.cs
@@ -0,0 +1,20 @@
+using Cortex.States.Abstractions;
+using System.Collections.Generic;
+
+namespace Cortex.States
+{
+ public class Table : ITable
+ {
+ private readonly IDataStore _dataStore;
+
+ public Table(IDataStore dataStore)
+ {
+ _dataStore = dataStore;
+ }
+
+ public TValue Get(TKey key) => _dataStore.Get(key);
+ public bool ContainsKey(TKey key) => _dataStore.ContainsKey(key);
+ public IEnumerable> GetAll() => _dataStore.GetAll();
+ public IEnumerable GetKeys() => _dataStore.GetKeys();
+ }
+}
diff --git a/src/Cortex.Streams.AWSSQS/Assets/license.md b/src/Cortex.Streams.AWSSQS/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.AWSSQS/Assets/license.md
+++ b/src/Cortex.Streams.AWSSQS/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj b/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj
index e2b5f7a..36b4f05 100644
--- a/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj
+++ b/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj
@@ -9,7 +9,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.AzureBlobStorage/Assets/license.md b/src/Cortex.Streams.AzureBlobStorage/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.AzureBlobStorage/Assets/license.md
+++ b/src/Cortex.Streams.AzureBlobStorage/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj b/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj
index 78851a2..2825d59 100644
--- a/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj
+++ b/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj
@@ -9,7 +9,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.AzureServiceBus/Assets/license.md b/src/Cortex.Streams.AzureServiceBus/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.AzureServiceBus/Assets/license.md
+++ b/src/Cortex.Streams.AzureServiceBus/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj b/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj
index 0529ad9..54f975e 100644
--- a/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj
+++ b/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj
@@ -9,7 +9,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.Files/Assets/license.md b/src/Cortex.Streams.Files/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.Files/Assets/license.md
+++ b/src/Cortex.Streams.Files/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj b/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj
index 8c6de69..cbe0811 100644
--- a/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj
+++ b/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj
@@ -9,7 +9,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.Http/Assets/license.md b/src/Cortex.Streams.Http/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.Http/Assets/license.md
+++ b/src/Cortex.Streams.Http/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.Http/Cortex.Streams.Http.csproj b/src/Cortex.Streams.Http/Cortex.Streams.Http.csproj
index 4916512..6213806 100644
--- a/src/Cortex.Streams.Http/Cortex.Streams.Http.csproj
+++ b/src/Cortex.Streams.Http/Cortex.Streams.Http.csproj
@@ -8,7 +8,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.Kafka/Assets/license.md b/src/Cortex.Streams.Kafka/Assets/license.md
index 530f621..3c845d4 100644
--- a/src/Cortex.Streams.Kafka/Assets/license.md
+++ b/src/Cortex.Streams.Kafka/Assets/license.md
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2024 Buildersoft
+Copyright (c) 2025 Buildersoft
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
index 5005fc5..67b0f6a 100644
--- a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
+++ b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj
@@ -9,7 +9,7 @@
Buildersoft Cortex Framework
Buildersoft
Buildersoft,EnesHoxha
- Copyright © Buildersoft 2024
+ Copyright © Buildersoft 2025
Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
diff --git a/src/Cortex.Streams.MSSqlServer/Assets/cortex.png b/src/Cortex.Streams.MSSqlServer/Assets/cortex.png
new file mode 100644
index 0000000..a4f9727
Binary files /dev/null and b/src/Cortex.Streams.MSSqlServer/Assets/cortex.png differ
diff --git a/src/Cortex.Streams.MSSqlServer/Assets/license.md b/src/Cortex.Streams.MSSqlServer/Assets/license.md
new file mode 100644
index 0000000..3c845d4
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/Assets/license.md
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2025 Buildersoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/src/Cortex.Streams.MSSqlServer/Cortex.Streams.MSSqlServer.csproj b/src/Cortex.Streams.MSSqlServer/Cortex.Streams.MSSqlServer.csproj
new file mode 100644
index 0000000..7a6d362
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/Cortex.Streams.MSSqlServer.csproj
@@ -0,0 +1,56 @@
+
+
+
+ net9.0;net8.0;net7.0
+
+ 1.0.1
+ 1.0.1
+ Buildersoft Cortex Framework
+ Buildersoft
+ Buildersoft,EnesHoxha
+ Copyright © Buildersoft 2025
+
+ Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
+
+
+ https://github.com/buildersoftio/cortex
+ cortex cdc mediator eda streaming distributed streams states mssql
+
+ 1.0.1
+ license.md
+ cortex.png
+ Cortex.Streams.CDC.MSSqlServer
+ True
+ True
+ True
+
+ Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!
+ https://buildersoft.io/
+ README.md
+
+
+
+
+ True
+ \
+
+
+ True
+
+
+
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Cortex.Streams.MSSqlServer/SqlServerCDCSourceOperator.cs b/src/Cortex.Streams.MSSqlServer/SqlServerCDCSourceOperator.cs
new file mode 100644
index 0000000..6c58872
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/SqlServerCDCSourceOperator.cs
@@ -0,0 +1,536 @@
+using Cortex.States;
+using Cortex.Streams.Operators;
+using Microsoft.Data.SqlClient;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+
+namespace Cortex.Streams.MSSqlServer
+{
+ ///
+ /// MSSQL CDC (Change Data Capture) Source Operator that optionally performs an initial load of the table,
+ /// then continues reading incremental changes via CDC.
+ /// Duplicates are skipped by storing and comparing a hash of the last record emitted.
+ ///
+ public class SqlServerCDCSourceOperator : ISourceOperator
+ {
+ private readonly string _connectionString;
+ private readonly string _schemaName;
+ private readonly string _tableName;
+ private readonly bool _autoEnableCdc;
+ private readonly bool _doInitialLoad;
+ private readonly int _pollIntervalMs;
+ private Thread _pollingThread;
+ private bool _stopRequested;
+
+ // Checkpoint store
+ private readonly IDataStore _checkpointStore;
+
+ // Key for the last LSN processed
+ private readonly string _checkpointKey;
+
+ // Key to track if the initial load is done
+ private readonly string _initialLoadCheckpointKey;
+
+ // Key to store the last emitted record's hash
+ private readonly string _lastRecordHashKey;
+
+ // Optional logger (may be null)
+ private readonly ILogger _logger;
+
+ ///
+ /// Creates a new instance of which can optionally
+ /// perform an initial load of the table, then read incremental changes via CDC.
+ /// Duplicates are skipped by storing a hash of the last record emitted.
+ ///
+ /// The connection string to the SQL Server instance.
+ /// The schema name containing the table.
+ /// The table name from which to read changes.
+ /// Optional settings to configure CDC and polling intervals.
+ /// Optional data store for saving checkpoints (LSN, last-hash, etc.).
+ ///
+ /// An optional . If null, messages will be written to the console.
+ ///
+ public SqlServerCDCSourceOperator(
+ string connectionString,
+ string schemaName,
+ string tableName,
+ SqlServerSettings sqlServerSettings = null,
+ IDataStore checkpointStore = null,
+ ILogger logger = null) // <-- Optional ILogger
+ {
+ _connectionString = connectionString;
+ _schemaName = schemaName;
+ _tableName = tableName;
+
+ sqlServerSettings ??= new SqlServerSettings();
+ _autoEnableCdc = sqlServerSettings.ConfigureCDCInServer;
+ _doInitialLoad = sqlServerSettings.DoInitialLoad;
+
+ // Using Milliseconds from the original code
+ _pollIntervalMs = sqlServerSettings.PullInterval.Milliseconds;
+
+ _checkpointStore = checkpointStore
+ ?? new InMemoryStateStore($"{_schemaName}.{_tableName}.STORE");
+
+ // A unique key that identifies this table’s LSN checkpoint
+ _checkpointKey = $"{_schemaName}.{_tableName}.CDC.LSN";
+
+ // A unique key that identifies if the initial load is done
+ _initialLoadCheckpointKey = $"{_schemaName}.{_tableName}.INITIAL_LOAD_DONE";
+
+ // A unique key to store the last emitted record's hash
+ _lastRecordHashKey = $"{_schemaName}.{_tableName}.CDC.LAST_HASH";
+
+ // Store the logger (can be null)
+ _logger = logger;
+ }
+
+ ///
+ /// Starts the CDC operator by optionally enabling CDC on the table (if requested),
+ /// performing a one-time initial load (if requested), and starting the background thread
+ /// that polls for new changes.
+ ///
+ /// Action to call for each new record.
+ public void Start(Action emit)
+ {
+ // Optionally enable CDC if requested
+ if (_autoEnableCdc && !IsCdcEnabledForTable())
+ {
+ LogInformation("Enabling CDC for table...");
+ EnableCdcForTable();
+
+ // Sleep for 10s to allow MS SQL Server to enable CDC on the table
+ Thread.Sleep(10000);
+ }
+
+ // 1. If doInitialLoad = true and we haven't done it yet, run initial load
+ if (_doInitialLoad && _checkpointStore.Get(_initialLoadCheckpointKey) == null)
+ {
+ LogInformation("Starting one-time initial load...");
+ RunInitialLoad(emit);
+
+ // Mark initial load as completed
+ _checkpointStore.Put(_initialLoadCheckpointKey, new byte[] { 0x01 });
+ LogInformation("Initial load completed.");
+ }
+ else
+ {
+ LogInformation("Skipping initial load (already done or disabled).");
+ }
+
+ // 2. Initialize the LSN checkpoint if we don’t already have one
+ byte[] lastCommittedLsn = _checkpointStore.Get(_checkpointKey);
+ if (lastCommittedLsn == null)
+ {
+ // By default, start from "current" so we only see future changes
+ lastCommittedLsn = GetMaxLsn();
+ if (lastCommittedLsn != null)
+ {
+ _checkpointStore.Put(_checkpointKey, lastCommittedLsn);
+ LogInformation("Initialized LSN checkpoint to the current max LSN.");
+ }
+ }
+
+ // 3. Start CDC polling
+ _stopRequested = false;
+ _pollingThread = new Thread(() => PollCdcChanges(emit))
+ {
+ IsBackground = true
+ };
+ _pollingThread.Start();
+ LogInformation("CDC polling thread started.");
+ }
+
+ ///
+ /// Stops the CDC operator by signaling the background thread to stop and waiting for it to finish.
+ ///
+ public void Stop()
+ {
+ LogInformation("Stop requested. Waiting for polling thread to complete...");
+ _stopRequested = true;
+ _pollingThread?.Join();
+ LogInformation("Polling thread stopped.");
+ }
+
+ ///
+ /// Polls the CDC function table periodically for new changes, using the last LSN from
+ /// the checkpoint store, and emits each new record found.
+ ///
+ /// Action to call for each new record.
+ private void PollCdcChanges(Action emit)
+ {
+ string captureInstanceName = $"{_schemaName}_{_tableName}";
+
+ while (!_stopRequested)
+ {
+ try
+ {
+ byte[] lastCommittedLsn = _checkpointStore.Get(_checkpointKey);
+ byte[] maxLsn = GetMaxLsn();
+ if (maxLsn == null)
+ {
+ Thread.Sleep(_pollIntervalMs);
+ continue;
+ }
+
+ // If maxLSN <= lastCommitted, there's nothing new
+ if (CompareLsn(maxLsn, lastCommittedLsn) <= 0)
+ {
+ Thread.Sleep(_pollIntervalMs);
+ continue;
+ }
+
+ // Get changes
+ var changes = GetChangesSinceLastLsn(captureInstanceName, lastCommittedLsn, maxLsn);
+
+ // Retrieve the last record's hash we stored
+ var lastHashBytes = _checkpointStore.Get(_lastRecordHashKey);
+ string lastHash = lastHashBytes == null ? null : Encoding.UTF8.GetString(lastHashBytes);
+
+ // Process changes in LSN order to ensure proper checkpointing
+ foreach (var change in changes)
+ {
+ if (_stopRequested)
+ break;
+
+ // Compute a hash of this record
+ string currentHash = ComputeHash(change);
+
+ // If it's the same as the last emitted, skip
+ if (currentHash == lastHash)
+ continue;
+
+ // Otherwise, emit
+ emit(change);
+
+ // Update last-hash checkpoint
+ lastHash = currentHash;
+ _checkpointStore.Put(_lastRecordHashKey, Encoding.UTF8.GetBytes(lastHash));
+ }
+
+ // Update the LSN checkpoint if new changes arrived
+ if (changes.Count > 0)
+ {
+ _checkpointStore.Put(_checkpointKey, maxLsn);
+ LogInformation($"Updated LSN checkpoint to: {BitConverter.ToString(maxLsn)}");
+ }
+
+ Thread.Sleep(_pollIntervalMs);
+ }
+ catch (Exception ex)
+ {
+ LogError("Error in CDC polling.", ex);
+ // Backoff a bit
+ Thread.Sleep(5000);
+ }
+ }
+ }
+
+ ///
+ /// Reads all data from the base table once and emits it, primarily used for the one-time initial load.
+ ///
+ /// Action to call for each record.
+ private void RunInitialLoad(Action emit)
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText = $"SELECT * FROM [{_schemaName}].[{_tableName}] (NOLOCK)";
+
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var record = new SqlServerRecord
+ {
+ Operation = "InitialLoad",
+ Data = new Dictionary(),
+ ChangeTime = DateTime.UtcNow,
+ };
+
+ // Populate the data dictionary with all columns
+ for (int i = 0; i < reader.FieldCount; i++)
+ {
+ string colName = reader.GetName(i);
+ if (colName.StartsWith("__$"))
+ continue;
+
+ object value = reader.GetValue(i);
+ record.Data[colName] = value == DBNull.Value ? null : value;
+ }
+
+ emit(record);
+ }
+ }
+ }
+ }
+
+ ///
+ /// Checks whether CDC is enabled for the current database and table.
+ ///
+ /// True if CDC is enabled for this table, otherwise false.
+ private bool IsCdcEnabledForTable()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+
+ // Check if database-level CDC is enabled
+ cmd.CommandText = @"
+ SELECT is_cdc_enabled
+ FROM sys.databases
+ WHERE name = DB_NAME();";
+
+ bool isDbCdcEnabled = Convert.ToBoolean(cmd.ExecuteScalar());
+ if (!isDbCdcEnabled)
+ return false;
+
+ // Check if table-level CDC is enabled
+ cmd.CommandText = @"
+ SELECT COUNT(*)
+ FROM sys.tables t
+ JOIN sys.schemas s ON t.schema_id = s.schema_id
+ WHERE s.name = @schemaName
+ AND t.name = @tableName
+ AND t.is_tracked_by_cdc = 1;";
+
+ cmd.Parameters.AddWithValue("@schemaName", _schemaName);
+ cmd.Parameters.AddWithValue("@tableName", _tableName);
+
+ int count = (int)cmd.ExecuteScalar();
+ return (count > 0);
+ }
+ }
+
+ ///
+ /// Enables CDC at the database level (if needed) and on the specific table.
+ ///
+ private void EnableCdcForTable()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+
+ // 1. Enable CDC at the database level if not already
+ cmd.CommandText = @"
+ IF NOT EXISTS (
+ SELECT 1 FROM sys.databases
+ WHERE name = DB_NAME() AND is_cdc_enabled = 1
+ )
+ BEGIN
+ EXEC sys.sp_cdc_enable_db;
+ END
+ ";
+ cmd.ExecuteNonQuery();
+
+ // 2. Enable CDC on the table
+ cmd.CommandText = $@"
+ EXEC sys.sp_cdc_enable_table
+ @source_schema = '{_schemaName}',
+ @source_name = '{_tableName}',
+ @capture_instance = '{_schemaName}_{_tableName}',
+ @role_name = NULL,
+ @supports_net_changes = 0;
+ ";
+ cmd.ExecuteNonQuery();
+
+ LogInformation($"CDC enabled for table [{_schemaName}].[{_tableName}].");
+ }
+ }
+
+ ///
+ /// Retrieves the current maximum LSN from the server.
+ ///
+ /// The varbinary(10) max LSN, or null if none.
+ private byte[] GetMaxLsn()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText = "SELECT sys.fn_cdc_get_max_lsn();";
+ object result = cmd.ExecuteScalar();
+ if (result == DBNull.Value) return null;
+ return (byte[])result;
+ }
+ }
+
+ ///
+ /// Retrieves the minimum LSN for the given capture instance.
+ ///
+ /// The capture instance name (schema_table).
+ /// The varbinary(10) min LSN, or null if none.
+ private byte[] GetMinLsn(string captureInstanceName)
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText = $"SELECT sys.fn_cdc_get_min_lsn('{captureInstanceName}');";
+ object result = cmd.ExecuteScalar();
+ if (result == DBNull.Value) return null;
+ return (byte[])result;
+ }
+ }
+
+ ///
+ /// Queries the CDC function table for changes between the specified LSN range.
+ /// Filters out "old" updates (operation 3).
+ ///
+ /// The capture instance name (schema_table).
+ /// Starting LSN (inclusive).
+ /// Ending LSN (inclusive).
+ /// A list of new or updated records.
+ private List GetChangesSinceLastLsn(
+ string captureInstanceName,
+ byte[] fromLsn,
+ byte[] toLsn)
+ {
+ var changes = new List();
+ string functionName = $"cdc.fn_cdc_get_all_changes_{captureInstanceName}";
+
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ // Use 'all update old' to get both old & new update rows
+ cmd.CommandText = $@"
+ SELECT *
+ FROM {functionName}(@from_lsn, @to_lsn, 'all update old')
+ ";
+
+ cmd.Parameters.Add(new SqlParameter("@from_lsn", SqlDbType.VarBinary, 10) { Value = fromLsn });
+ cmd.Parameters.Add(new SqlParameter("@to_lsn", SqlDbType.VarBinary, 10) { Value = toLsn });
+
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var record = new SqlServerRecord
+ {
+ Operation = GetOperationName(Convert.ToInt32(reader["__$operation"])),
+ Data = new Dictionary(),
+ ChangeTime = DateTime.UtcNow,
+ };
+
+ for (int i = 0; i < reader.FieldCount; i++)
+ {
+ string colName = reader.GetName(i);
+ if (colName.StartsWith("__$")) continue;
+
+ object value = reader.GetValue(i);
+ record.Data[colName] = (value == DBNull.Value) ? null : value;
+ }
+
+ changes.Add(record);
+ }
+ }
+ }
+
+ // Filter out rows that are the "old" side of an update (operation code 3 = "Update (old)")
+ changes = changes
+ .Where(c => c.Operation != "Update (old)")
+ .ToList();
+
+ return changes;
+ }
+
+ ///
+ /// Returns a human-readable operation name from a CDC operation code.
+ ///
+ /// The integer CDC operation code.
+ /// A descriptive string for the operation.
+ private string GetOperationName(int operationCode)
+ {
+ switch (operationCode)
+ {
+ case 1: return "Delete (old)";
+ case 2: return "Insert";
+ case 3: return "Update (old)";
+ case 4: return "Update (new)";
+ case 5: return "Delete (new)";
+ default: return $"Unknown ({operationCode})";
+ }
+ }
+
+ ///
+ /// Compare two varbinary(10) LSNs:
+ /// -1 if lsnA < lsnB,
+ /// 0 if lsnA == lsnB,
+ /// 1 if lsnA > lsnB.
+ ///
+ private int CompareLsn(byte[] lsnA, byte[] lsnB)
+ {
+ if (lsnA == null && lsnB == null) return 0;
+ if (lsnA == null) return -1;
+ if (lsnB == null) return 1;
+
+ for (int i = 0; i < 10; i++)
+ {
+ if (lsnA[i] < lsnB[i]) return -1;
+ if (lsnA[i] > lsnB[i]) return 1;
+ }
+ return 0;
+ }
+
+ ///
+ /// Computes an MD5 hash from the SqlServerRecord's Data dictionary
+ /// to detect duplicates. The data is sorted by key for deterministic ordering.
+ ///
+ /// The to hash.
+ /// A Base64-encoded MD5 hash string.
+ private string ComputeHash(SqlServerRecord record)
+ {
+ var sb = new StringBuilder();
+ foreach (var kv in record.Data.OrderBy(x => x.Key))
+ {
+ // "Key=Value;"
+ sb.Append(kv.Key).Append('=').Append(kv.Value ?? "null").Append(';');
+ }
+
+ using (var md5 = MD5.Create())
+ {
+ var bytes = Encoding.UTF8.GetBytes(sb.ToString());
+ var hashBytes = md5.ComputeHash(bytes);
+ return Convert.ToBase64String(hashBytes);
+ }
+ }
+
+ #region Logging Helpers
+
+ private void LogInformation(string message)
+ {
+ if (_logger != null)
+ {
+ _logger.LogInformation(message);
+ }
+ else
+ {
+ Console.WriteLine(message);
+ }
+ }
+
+ private void LogError(string message, Exception ex)
+ {
+ if (_logger != null)
+ {
+ _logger.LogError(ex, message);
+ }
+ else
+ {
+ Console.WriteLine($"ERROR: {message}\n{ex}");
+ }
+ }
+
+ #endregion
+ }
+}
diff --git a/src/Cortex.Streams.MSSqlServer/SqlServerRecord.cs b/src/Cortex.Streams.MSSqlServer/SqlServerRecord.cs
new file mode 100644
index 0000000..4ab7f29
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/SqlServerRecord.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Collections.Generic;
+
+namespace Cortex.Streams.MSSqlServer
+{
+ ///
+ /// Represents a generic CDC record, carrying enough information
+ /// to understand what changed and how.
+ ///
+ public class SqlServerRecord
+ {
+ ///
+ /// The operation name or type, e.g. 'INSERT', 'UPDATE', 'DELETE'.
+ ///
+ public string Operation { get; set; } // Insert, Update, Delete, etc.
+
+
+ ///
+ /// The primary key or other columns in the changed record,
+ /// serialized for demonstration.
+ ///
+ public Dictionary Data { get; set; } // Column names -> values
+
+ ///
+ /// Timestamp or time of the record, if needed.
+ ///
+ public DateTime ChangeTime { get; set; }
+ }
+}
diff --git a/src/Cortex.Streams.MSSqlServer/SqlServerSettings.cs b/src/Cortex.Streams.MSSqlServer/SqlServerSettings.cs
new file mode 100644
index 0000000..6746728
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/SqlServerSettings.cs
@@ -0,0 +1,18 @@
+using System;
+
+namespace Cortex.Streams.MSSqlServer
+{
+ public class SqlServerSettings
+ {
+ public bool DoInitialLoad { get; set; }
+ public TimeSpan PullInterval { get; set; }
+ public bool ConfigureCDCInServer { get; set; }
+
+ public SqlServerSettings()
+ {
+ DoInitialLoad = true;
+ PullInterval = TimeSpan.FromSeconds(3);
+ ConfigureCDCInServer = false;
+ }
+ }
+}
diff --git a/src/Cortex.Streams.MSSqlServer/SqlServerSourceOperatorExperiment.cs b/src/Cortex.Streams.MSSqlServer/SqlServerSourceOperatorExperiment.cs
new file mode 100644
index 0000000..bd42be2
--- /dev/null
+++ b/src/Cortex.Streams.MSSqlServer/SqlServerSourceOperatorExperiment.cs
@@ -0,0 +1,483 @@
+using Cortex.States;
+using Cortex.Streams.Operators;
+using Microsoft.Data.SqlClient;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+
+namespace Cortex.Streams.MSSqlServer
+{
+ ///
+ /// MSSQL CDC Source Operator that optionally performs an initial load of the table,
+ /// then continues reading incremental changes via CDC.
+ /// Now we skip duplicates by storing a hash of the last record we emitted.
+ ///
+ internal class SqlServerSourceOperatorExperiment : ISourceOperator
+ {
+ private readonly string _connectionString;
+ private readonly string _schemaName;
+ private readonly string _tableName;
+ private readonly bool _autoEnableCdc;
+ private readonly bool _doInitialLoad;
+
+ private readonly int _pollIntervalMs;
+ private Thread _pollingThread;
+ private bool _stopRequested;
+
+ // Checkpoint store
+ private readonly IDataStore _checkpointStore;
+
+ // Key for the last LSN processed
+ private readonly string _checkpointKey;
+
+ // Key to track if the initial load is done
+ private readonly string _initialLoadCheckpointKey;
+
+ // Key to store the last emitted record's hash
+ private readonly string _lastRecordHashKey;
+
+ public SqlServerSourceOperatorExperiment(
+ string connectionString,
+ string schemaName,
+ string tableName,
+ SqlServerSettings sqlServerSettings = null,
+ IDataStore checkpointStore = null)
+ {
+ _connectionString = connectionString;
+ _schemaName = schemaName;
+ _tableName = tableName;
+
+ sqlServerSettings ??= new SqlServerSettings();
+ _autoEnableCdc = sqlServerSettings.ConfigureCDCInServer;
+ _doInitialLoad = sqlServerSettings.DoInitialLoad;
+
+ // Using Milliseconds from the original code
+ _pollIntervalMs = sqlServerSettings.PullInterval.Milliseconds;
+
+ _checkpointStore = checkpointStore
+ ?? new InMemoryStateStore($"{_schemaName}.{_tableName}.STORE");
+
+ // A unique key that identifies this table’s LSN checkpoint
+ _checkpointKey = $"{_schemaName}.{_tableName}.CDC.LSN";
+
+ // A unique key that identifies if the initial load is done
+ _initialLoadCheckpointKey = $"{_schemaName}.{_tableName}.INITIAL_LOAD_DONE";
+
+ // A unique key to store the last emitted record's hash
+ _lastRecordHashKey = $"{_schemaName}.{_tableName}.CDC.LAST_HASH";
+ }
+
+ public void Start(Action emit)
+ {
+ // Optionally enable CDC if requested
+ if (_autoEnableCdc && !IsCdcEnabledForTable())
+ {
+ EnableCdcForTable();
+
+ // Wait for the capture instance to appear,
+ // so we don’t get “insufficient arguments” right away
+ string captureInstanceName = $"{_schemaName}_{_tableName}";
+ WaitForCaptureInstance(captureInstanceName, 5000);
+ Thread.Sleep(5000);
+ }
+
+ // 1. If doInitialLoad = true and we haven't done it yet, run initial load
+ if (_doInitialLoad && _checkpointStore.Get(_initialLoadCheckpointKey) == null)
+ {
+ Console.WriteLine("Starting one-time initial load...");
+ RunInitialLoad(emit);
+
+ // Mark initial load as completed
+ _checkpointStore.Put(_initialLoadCheckpointKey, new byte[] { 0x01 });
+ Console.WriteLine("Initial load completed.");
+ }
+ else
+ {
+ Console.WriteLine("Skipping initial load (already done or disabled).");
+ }
+
+ // 2. Initialize the LSN checkpoint if we don’t already have one
+ byte[] lastCommittedLsn = _checkpointStore.Get(_checkpointKey);
+ if (lastCommittedLsn == null)
+ {
+ // By default, start from "current" so we only see future changes
+ lastCommittedLsn = GetMaxLsn();
+ _checkpointStore.Put(_checkpointKey, lastCommittedLsn);
+ }
+
+ // 3. Start CDC polling
+ _stopRequested = false;
+ _pollingThread = new Thread(() => PollCdcChanges(emit));
+ _pollingThread.IsBackground = true;
+ _pollingThread.Start();
+ }
+
+ public void Stop()
+ {
+ _stopRequested = true;
+ _pollingThread?.Join();
+ }
+
+ ///
+ /// Polls the CDC table periodically for new changes,
+ /// using the last LSN from the checkpoint store.
+ ///
+ private void PollCdcChanges(Action emit)
+ {
+ string captureInstanceName = $"{_schemaName}_{_tableName}";
+
+ while (!_stopRequested)
+ {
+ try
+ {
+ byte[] lastCommittedLsn = _checkpointStore.Get(_checkpointKey);
+ byte[] maxLsn = GetMaxLsn();
+ if (maxLsn == null)
+ {
+ Thread.Sleep(_pollIntervalMs);
+ continue;
+ }
+
+ // If maxLSN <= lastCommitted, there's nothing new
+ if (CompareLsn(maxLsn, lastCommittedLsn) <= 0)
+ {
+ Thread.Sleep(_pollIntervalMs);
+ continue;
+ }
+
+ // Get changes
+ var changes = GetChangesSinceLastLsn(captureInstanceName, lastCommittedLsn, maxLsn);
+
+ // Retrieve the last record's hash we stored
+ var lastHashBytes = _checkpointStore.Get(_lastRecordHashKey);
+ string lastHash = lastHashBytes == null ? null : Encoding.UTF8.GetString(lastHashBytes);
+
+ // Process changes in LSN order to ensure proper checkpointing
+ foreach (var change in changes)
+ {
+ if (_stopRequested)
+ break;
+
+ // Compute a hash of this record
+ string currentHash = ComputeHash(change);
+
+ // If it's the same as the last emitted, skip
+ if (currentHash == lastHash)
+ {
+ continue;
+ }
+
+ // Otherwise, emit
+ emit(change);
+
+ // Update last-hash checkpoint
+ lastHash = currentHash;
+ _checkpointStore.Put(_lastRecordHashKey, Encoding.UTF8.GetBytes(lastHash));
+ }
+
+ // Update the LSN checkpoint if new changes arrived
+ if (changes.Count() > 0)
+ {
+ _checkpointStore.Put(_checkpointKey, maxLsn);
+ }
+
+ Thread.Sleep(_pollIntervalMs);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Error in CDC polling: {ex}");
+ Thread.Sleep(5000);
+ }
+ }
+ }
+
+ ///
+ /// Reads all data from the base table and emits it once.
+ ///
+ private void RunInitialLoad(Action emit)
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText = $"SELECT * FROM [{_schemaName}].[{_tableName}] (NOLOCK)";
+
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var record = new SqlServerRecord
+ {
+ Operation = "InitialLoad",
+ Data = new Dictionary(),
+ ChangeTime = DateTime.UtcNow,
+ };
+
+ // Populate the data dictionary with all columns
+ for (int i = 0; i < reader.FieldCount; i++)
+ {
+ string colName = reader.GetName(i);
+ if (colName.StartsWith("__$"))
+ continue;
+
+ object value = reader.GetValue(i);
+ record.Data[colName] = value == DBNull.Value ? null : value;
+ }
+
+ emit(record);
+ }
+ }
+ }
+ }
+
+ private bool IsCdcEnabledForTable()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+
+ // Check if database-level CDC is enabled
+ cmd.CommandText = @"
+ SELECT is_cdc_enabled
+ FROM sys.databases
+ WHERE name = DB_NAME();";
+
+ bool isDbCdcEnabled = Convert.ToBoolean(cmd.ExecuteScalar());
+ if (!isDbCdcEnabled)
+ return false;
+
+ // Check if table-level CDC is enabled
+ cmd.CommandText = @"
+ SELECT COUNT(*)
+ FROM sys.tables t
+ JOIN sys.schemas s ON t.schema_id = s.schema_id
+ WHERE s.name = @schemaName
+ AND t.name = @tableName
+ AND t.is_tracked_by_cdc = 1;";
+
+ cmd.Parameters.AddWithValue("@schemaName", _schemaName);
+ cmd.Parameters.AddWithValue("@tableName", _tableName);
+
+ int count = (int)cmd.ExecuteScalar();
+ return (count > 0);
+ }
+ }
+
+ private void EnableCdcForTable()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+
+ // 1. Enable CDC at the database level if not already
+ cmd.CommandText = @"
+ IF NOT EXISTS (
+ SELECT 1 FROM sys.databases
+ WHERE name = DB_NAME() AND is_cdc_enabled = 1
+ )
+ BEGIN
+ EXEC sys.sp_cdc_enable_db;
+ END
+ ";
+ cmd.ExecuteNonQuery();
+
+ // 2. Enable CDC on the table
+ cmd.CommandText = $@"
+ EXEC sys.sp_cdc_enable_table
+ @source_schema = '{_schemaName}',
+ @source_name = '{_tableName}',
+ @capture_instance = '{_schemaName}_{_tableName}',
+ @role_name = NULL,
+ @supports_net_changes = 0;
+ ";
+ cmd.ExecuteNonQuery();
+
+ Console.WriteLine($"CDC enabled for table [{_schemaName}].[{_tableName}].");
+ }
+ }
+
+
+ ///
+ /// Poll cdc.change_tables to confirm the capture instance is created.
+ /// This prevents "insufficient arguments" if we call fn_cdc_get_all_changes too soon.
+ ///
+ private void WaitForCaptureInstance(string captureInstanceName, int timeoutMs)
+ {
+ int elapsed = 0;
+ while (elapsed < timeoutMs)
+ {
+ Thread.Sleep(500);
+
+ if (CaptureInstanceExists(captureInstanceName))
+ return; // Found it
+
+ elapsed += 500;
+ }
+ Console.WriteLine($"Warning: capture instance '{captureInstanceName}' not found within {timeoutMs} ms.");
+ }
+
+ private bool CaptureInstanceExists(string captureInstanceName)
+ {
+ using var conn = new SqlConnection(_connectionString);
+ conn.Open();
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = @"
+ SELECT COUNT(*)
+ FROM cdc.change_tables
+ WHERE capture_instance = @capInst;
+ ";
+ cmd.Parameters.AddWithValue("@capInst", captureInstanceName);
+
+ int count = (int)cmd.ExecuteScalar();
+ return count > 0;
+ }
+
+
+ private byte[] GetMaxLsn()
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText = "SELECT sys.fn_cdc_get_max_lsn();";
+ object result = cmd.ExecuteScalar();
+ if (result == DBNull.Value) return null;
+ return (byte[])result;
+ }
+ }
+
+ private byte[] GetMinLsn(string captureInstanceName)
+ {
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ cmd.CommandText =
+ $"SELECT sys.fn_cdc_get_min_lsn('{captureInstanceName}');";
+ object result = cmd.ExecuteScalar();
+ if (result == DBNull.Value) return null;
+ return (byte[])result;
+ }
+ }
+
+ private List GetChangesSinceLastLsn(
+ string captureInstanceName,
+ byte[] fromLsn,
+ byte[] toLsn)
+ {
+ var changes = new List();
+ string functionName = $"cdc.fn_cdc_get_all_changes_{captureInstanceName}";
+
+ using (var conn = new SqlConnection(_connectionString))
+ using (var cmd = conn.CreateCommand())
+ {
+ conn.Open();
+ // Use 'all update old' to get both old & new update rows
+ cmd.CommandText = $@"
+ SELECT *
+ FROM {functionName}(@from_lsn, @to_lsn, 'all update old')
+ ";
+
+ cmd.Parameters.Add(new SqlParameter("@from_lsn", SqlDbType.VarBinary, 10) { Value = fromLsn });
+ cmd.Parameters.Add(new SqlParameter("@to_lsn", SqlDbType.VarBinary, 10) { Value = toLsn });
+
+ using (var reader = cmd.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ var record = new SqlServerRecord
+ {
+ Operation = GetOperationName(Convert.ToInt32(reader["__$operation"])),
+ Data = new Dictionary(),
+ ChangeTime = DateTime.UtcNow,
+ };
+
+ for (int i = 0; i < reader.FieldCount; i++)
+ {
+ string colName = reader.GetName(i);
+ if (colName.StartsWith("__$")) continue;
+
+ object value = reader.GetValue(i);
+ record.Data[colName] = (value == DBNull.Value) ? null : value;
+ }
+
+ changes.Add(record);
+ }
+ }
+ }
+
+ // Filter out rows that are the "old" side of an update
+ // (operation code 3 = "Update (old)")
+ changes = changes
+ .Where(c => c.Operation != "Update (old)")
+ .ToList();
+
+ return changes;
+ }
+
+
+
+ private string GetOperationName(int operationCode)
+ {
+ switch (operationCode)
+ {
+ case 1: return "Delete (old)";
+ case 2: return "Insert";
+ case 3: return "Update (old)";
+ case 4: return "Update (new)";
+ case 5: return "Delete (new)";
+ default: return $"Unknown ({operationCode})";
+ }
+ }
+
+ ///
+ /// Compare two varbinary(10) LSNs:
+ /// -1 if lsnA < lsnB
+ /// 0 if lsnA == lsnB
+ /// 1 if lsnA > lsnB
+ ///
+ private int CompareLsn(byte[] lsnA, byte[] lsnB)
+ {
+ if (lsnA == null && lsnB == null) return 0;
+ if (lsnA == null) return -1;
+ if (lsnB == null) return 1;
+
+ for (int i = 0; i < 10; i++)
+ {
+ if (lsnA[i] < lsnB[i]) return -1;
+ if (lsnA[i] > lsnB[i]) return 1;
+ }
+ return 0;
+ }
+
+ ///
+ /// Computes an MD5 hash from the SqlServerRecord's Data dictionary.
+ /// You could also use JSON serialization, SHA256, etc.
+ ///
+ private string ComputeHash(SqlServerRecord record)
+ {
+ // Build a stable string from the record's Data
+ // Sort by key so that the order is deterministic
+ var sb = new StringBuilder();
+ foreach (var kv in record.Data.OrderBy(x => x.Key))
+ {
+ // "Key=Value;"
+ sb.Append(kv.Key).Append('=').Append(kv.Value ?? "null").Append(';');
+ }
+
+ // Get MD5 hash of that string
+ using (var md5 = MD5.Create())
+ {
+ var bytes = Encoding.UTF8.GetBytes(sb.ToString());
+ var hashBytes = md5.ComputeHash(bytes);
+ return Convert.ToBase64String(hashBytes);
+ }
+ }
+ }
+}
diff --git a/src/Cortex.Streams.MongoDb/Assets/cortex.png b/src/Cortex.Streams.MongoDb/Assets/cortex.png
new file mode 100644
index 0000000..a4f9727
Binary files /dev/null and b/src/Cortex.Streams.MongoDb/Assets/cortex.png differ
diff --git a/src/Cortex.Streams.MongoDb/Assets/license.md b/src/Cortex.Streams.MongoDb/Assets/license.md
new file mode 100644
index 0000000..3c845d4
--- /dev/null
+++ b/src/Cortex.Streams.MongoDb/Assets/license.md
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2025 Buildersoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/src/Cortex.Streams.MongoDb/Cortex.Streams.MongoDb.csproj b/src/Cortex.Streams.MongoDb/Cortex.Streams.MongoDb.csproj
new file mode 100644
index 0000000..a315882
--- /dev/null
+++ b/src/Cortex.Streams.MongoDb/Cortex.Streams.MongoDb.csproj
@@ -0,0 +1,55 @@
+
+
+
+ net9.0;net8.0;net7.0
+
+ 1.0.1
+ 1.0.1
+ Buildersoft Cortex Framework
+ Buildersoft
+ Buildersoft,EnesHoxha
+ Copyright © Buildersoft 2025
+
+ Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
+
+
+ https://github.com/buildersoftio/cortex
+ cortex cdc eda streaming distributed streams states mongodb
+
+ 1.0.1
+ license.md
+ cortex.png
+ Cortex.Streams.CDC.MongoDb
+ True
+ True
+ True
+
+ Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!
+ https://buildersoft.io/
+ README.md
+
+
+
+
+ True
+ \
+
+
+ True
+
+
+
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Cortex.Streams.MongoDb/MongoDbCDCSettings.cs b/src/Cortex.Streams.MongoDb/MongoDbCDCSettings.cs
new file mode 100644
index 0000000..f7e383d
--- /dev/null
+++ b/src/Cortex.Streams.MongoDb/MongoDbCDCSettings.cs
@@ -0,0 +1,36 @@
+using System;
+
+namespace Cortex.Streams.MongoDb
+{
+ public class MongoDbCDCSettings
+ {
+ ///
+ /// Whether to perform an initial scan/load of the entire collection
+ /// before starting the Change Stream.
+ ///
+ public bool DoInitialLoad { get; set; }
+
+ ///
+ /// For MongoDB, we use a Change Stream (which blocks). However, we may still
+ /// use this interval as a minimal delay between reconnection attempts or
+ /// to check if the operator was stopped.
+ ///
+ public TimeSpan Delay { get; set; }
+
+ ///
+ /// Maximum back-off in seconds if repeated errors occur while listening
+ /// to the MongoDB Change Stream.
+ ///
+ public int MaxBackOffSeconds { get; set; }
+
+ public MongoDbCDCSettings()
+ {
+ DoInitialLoad = true;
+
+ // Typically not used for "polling" as we do streaming, but can be used
+ // for a small wait loop to check cancellation or errors.
+ Delay = TimeSpan.FromSeconds(3);
+ MaxBackOffSeconds = 60;
+ }
+ }
+}
diff --git a/src/Cortex.Streams.MongoDb/MongoDbCDCSourceOperator.cs b/src/Cortex.Streams.MongoDb/MongoDbCDCSourceOperator.cs
new file mode 100644
index 0000000..52f11f0
--- /dev/null
+++ b/src/Cortex.Streams.MongoDb/MongoDbCDCSourceOperator.cs
@@ -0,0 +1,417 @@
+using Cortex.States;
+using Cortex.Streams.Operators;
+using Microsoft.Extensions.Logging;
+using MongoDB.Bson;
+using MongoDB.Driver;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+
+namespace Cortex.Streams.MongoDb
+{
+ ///
+ /// MongoDB CDC Source Operator that:
+ /// 1. Optionally performs an initial collection load if requested.
+ /// 2. Uses MongoDB Change Streams to capture inserts, updates, deletes, and replaces.
+ /// 3. Stores the last resume token to allow restart from the correct position.
+ /// 4. Skips duplicate records using a hash checkpoint.
+ /// 5. Uses robust error handling and exponential back-off on failures.
+ ///
+ public class MongoDbCDCSourceOperator : ISourceOperator, IDisposable
+ {
+ private readonly IMongoDatabase _database;
+ private readonly string _collectionName;
+
+ private readonly bool _doInitialLoad;
+ private readonly int _delayMs;
+ private readonly int _maxBackOffSeconds;
+
+ private readonly IDataStore _checkpointStore;
+
+ // Keys for checkpoint store
+ private readonly string _checkpointKey;
+ private readonly string _initialLoadCheckpointKey;
+ private readonly string _lastRecordHashKey;
+
+ private IMongoCollection _collection;
+
+ // Thread & cancellation
+ private Thread _pollingThread;
+ private bool _stopRequested;
+ private bool _disposed;
+
+ // Optional logger
+ private readonly ILogger _logger;
+
+
+ public MongoDbCDCSourceOperator(IMongoDatabase database,
+ string collectionName,
+ MongoDbCDCSettings mongoSettings = null,
+ IDataStore checkpointStore = null,
+ ILogger logger = null)
+ {
+ if (database == null)
+ throw new ArgumentNullException(nameof(database));
+ if (string.IsNullOrWhiteSpace(collectionName))
+ throw new ArgumentException("Collection name cannot be null or empty.", nameof(collectionName));
+
+ mongoSettings ??= new MongoDbCDCSettings();
+
+ // If no checkpoint store is provided, default to an in-memory store.
+ _checkpointStore = checkpointStore
+ ?? new InMemoryStateStore($"{database.DatabaseNamespace.DatabaseName}.{collectionName}.STORE");
+
+ _database = database;
+ _collectionName = collectionName;
+
+ _doInitialLoad = mongoSettings.DoInitialLoad;
+ _delayMs = (int)mongoSettings.Delay.TotalMilliseconds;
+ _maxBackOffSeconds = mongoSettings.MaxBackOffSeconds;
+
+ // Define checkpoint keys
+ var dbName = database.DatabaseNamespace.DatabaseName;
+ _checkpointKey = $"{dbName}.{collectionName}.CDC.ResumeToken";
+ _initialLoadCheckpointKey = $"{dbName}.{collectionName}.INITIAL_LOAD_DONE";
+ _lastRecordHashKey = $"{dbName}.{collectionName}.CDC.LAST_HASH";
+
+ _logger = logger;
+ }
+
+ ///
+ /// Start the operator. Uses the provided IMongoDatabase to set up the collection,
+ /// performs optional initial load, and launches the background thread
+ /// that reads from the change stream.
+ ///
+ public void Start(Action emit)
+ {
+ if (emit == null) throw new ArgumentNullException(nameof(emit));
+
+ var dbName = _database.DatabaseNamespace.DatabaseName;
+ LogInformation($"Starting MongoDB CDC operator for {dbName}.{_collectionName}...");
+
+ // 1. Select the collection
+ _collection = _database.GetCollection(_collectionName);
+
+ // 2. Perform initial load if needed
+ if (_doInitialLoad && _checkpointStore.Get(_initialLoadCheckpointKey) == null)
+ {
+ LogInformation($"Performing initial load for {dbName}.{_collectionName}...");
+ RunInitialLoad(emit);
+ _checkpointStore.Put(_initialLoadCheckpointKey, new byte[] { 0x01 });
+ LogInformation($"Initial load completed for {dbName}.{_collectionName}.");
+ }
+ else
+ {
+ LogInformation($"Skipping initial load for {dbName}.{_collectionName} (already done or disabled).");
+ }
+
+ // 3. Check if we have an existing resume token
+ var lastResumeTokenBytes = _checkpointStore.Get(_checkpointKey);
+ if (lastResumeTokenBytes == null)
+ {
+ LogInformation($"No existing resume token found for {dbName}.{_collectionName}. Will start fresh.");
+ }
+ else
+ {
+ LogInformation($"Found existing resume token for {dbName}.{_collectionName}.");
+ }
+
+ // 4. Spin up the background polling (streaming) thread
+ _stopRequested = false;
+ _pollingThread = new Thread(() => PollCdcChanges(emit))
+ {
+ IsBackground = true,
+ Name = $"MongoCdcPolling_{dbName}_{_collectionName}"
+ };
+ _pollingThread.Start();
+ }
+
+ ///
+ /// Stops the operator gracefully.
+ ///
+ public void Stop()
+ {
+ var dbName = _database.DatabaseNamespace.DatabaseName;
+ LogInformation($"Stop requested for MongoDB CDC operator {dbName}.{_collectionName}.");
+ _stopRequested = true;
+ _pollingThread?.Join();
+ LogInformation($"MongoDB CDC operator stopped for {dbName}.{_collectionName}.");
+ }
+
+ ///
+ /// The main CDC logic that connects to the MongoDB Change Stream and reads events.
+ /// Includes exponential back-off if errors occur.
+ ///
+ private void PollCdcChanges(Action emit)
+ {
+ var dbName = _database.DatabaseNamespace.DatabaseName;
+ int backOffSeconds = 1;
+
+ while (!_stopRequested)
+ {
+ try
+ {
+ // 1. Retrieve the last resume token from checkpoint store
+ BsonDocument resumeToken = null;
+ var lastResumeTokenBytes = _checkpointStore.Get(_checkpointKey);
+ if (lastResumeTokenBytes != null)
+ {
+ var json = Encoding.UTF8.GetString(lastResumeTokenBytes);
+ resumeToken = BsonDocument.Parse(json);
+ }
+
+ // 2. Create a change stream with optional resume token
+ var options = new ChangeStreamOptions
+ {
+ FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
+ ResumeAfter = resumeToken
+ };
+
+ var pipeline = new EmptyPipelineDefinition>();
+
+ using var cursor = _collection.Watch(pipeline, options);
+ LogInformation($"MongoDB Change Stream opened for {dbName}.{_collectionName}.");
+
+ // 3. Retrieve last-record-hash to skip duplicates
+ var lastHashBytes = _checkpointStore.Get(_lastRecordHashKey);
+ string lastHash = lastHashBytes == null ? null : Encoding.UTF8.GetString(lastHashBytes);
+
+ // 4. Read changes
+ foreach (var change in cursor.ToEnumerable())
+ {
+ if (_stopRequested) break;
+
+ // Update resume token checkpoint
+ var currentResumeToken = change.ResumeToken?.AsBsonDocument;
+ if (currentResumeToken != null)
+ {
+ var currentResumeTokenString = currentResumeToken.ToJson();
+ _checkpointStore.Put(_checkpointKey, Encoding.UTF8.GetBytes(currentResumeTokenString));
+ }
+
+ // Convert the change stream document to a MongoDbRecord
+ var record = ConvertChangeToRecord(change);
+
+ // Deduplicate
+ var currentHash = ComputeHash(record);
+ if (currentHash == lastHash)
+ {
+ LogInformation($"Skipping duplicate record for {dbName}.{_collectionName}.");
+ continue;
+ }
+
+ // Emit the record
+ emit(record);
+
+ // Update lastHash checkpoint
+ lastHash = currentHash;
+ _checkpointStore.Put(_lastRecordHashKey, Encoding.UTF8.GetBytes(lastHash));
+ }
+
+ Thread.Sleep(_delayMs);
+ // Reset back-off if we successfully processed
+ backOffSeconds = 1;
+ }
+ catch (Exception ex)
+ {
+ LogError($"Error in MongoDB CDC polling loop for {dbName}.{_collectionName}.", ex);
+
+ // Exponential back-off
+ Thread.Sleep(TimeSpan.FromSeconds(backOffSeconds));
+ backOffSeconds = Math.Min(backOffSeconds * 2, _maxBackOffSeconds);
+ }
+ }
+ }
+
+ ///
+ /// Reads the entire collection once and emits each document as Operation = 'InitialLoad'.
+ /// For very large collections, consider chunking/paging to avoid memory issues.
+ ///
+ private void RunInitialLoad(Action emit)
+ {
+ var filter = Builders.Filter.Empty;
+ var cursor = _collection.Find(filter).ToCursor();
+
+ while (cursor.MoveNext())
+ {
+ foreach (var doc in cursor.Current)
+ {
+ if (_stopRequested) break;
+
+ var record = new MongoDbRecord
+ {
+ Operation = "InitialLoad",
+ Data = BsonDocumentToDictionary(doc),
+ ChangeTime = DateTime.UtcNow
+ };
+
+ emit(record);
+ }
+ if (_stopRequested) break;
+ }
+ }
+
+ ///
+ /// Converts a change stream document to a .
+ ///
+ private MongoDbRecord ConvertChangeToRecord(ChangeStreamDocument change)
+ {
+ var record = new MongoDbRecord
+ {
+ Data = new Dictionary(),
+ ChangeTime = DateTime.UtcNow
+ };
+
+ switch (change.OperationType)
+ {
+ case ChangeStreamOperationType.Insert:
+ record.Operation = "INSERT";
+ record.Data = BsonDocumentToDictionary(change.FullDocument);
+ break;
+
+ case ChangeStreamOperationType.Update:
+ record.Operation = "UPDATE";
+ record.Data = BsonDocumentToDictionary(change.FullDocument);
+ break;
+
+ case ChangeStreamOperationType.Replace:
+ record.Operation = "REPLACE";
+ record.Data = BsonDocumentToDictionary(change.FullDocument);
+ break;
+
+ case ChangeStreamOperationType.Delete:
+ record.Operation = "DELETE";
+ // DocumentKey typically has the _id
+ if (change.DocumentKey != null)
+ {
+ var docKeyDict = BsonDocumentToDictionary(change.DocumentKey);
+ foreach (var kv in docKeyDict)
+ {
+ record.Data[kv.Key] = kv.Value;
+ }
+ }
+ break;
+
+ default:
+ record.Operation = change.OperationType.ToString().ToUpperInvariant();
+ break;
+ }
+
+ return record;
+ }
+
+ ///
+ /// Helper to convert a BsonDocument to a Dictionary.
+ ///
+ private Dictionary BsonDocumentToDictionary(BsonDocument doc)
+ {
+ if (doc == null) return new Dictionary();
+
+ var dict = new Dictionary();
+ foreach (var element in doc.Elements)
+ {
+ dict[element.Name] = BsonValueToNative(element.Value);
+ }
+ return dict;
+ }
+
+ ///
+ /// Recursively converts a BsonValue to a .NET object (e.g. string, int, nested dictionaries, etc.).
+ ///
+ private object BsonValueToNative(BsonValue value)
+ {
+ if (value == null || value.IsBsonNull) return null;
+
+ switch (value.BsonType)
+ {
+ case BsonType.Document:
+ return BsonDocumentToDictionary(value.AsBsonDocument);
+
+ case BsonType.Array:
+ var list = new List