From 0ae8c20f483cb9210431f7a43e20af7e03ccecf4 Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 3 Dec 2024 16:04:07 +0100 Subject: [PATCH 1/2] v1/feature/47 Re-implementation of Windows: Update SessionWindow, Update SlidingWindow, Update TumblingWindow, Add Session and Window Key and State. Unit-tests added/changed --- .../RocksDbStateStore.cs | 21 ++ src/Cortex.States/Abstractions/IStateStore.cs | 1 + src/Cortex.States/InMemoryStateStore.cs | 5 + .../Abstractions/IStreamBuilder.cs | 67 ++-- .../Operators/SessionWindowOperator.cs | 312 +++++++++--------- .../Operators/SlidingWindowOperator.cs | 310 +++++++++-------- .../Operators/TumblingWindowOperator.cs | 309 +++++++++-------- src/Cortex.Streams/StreamBuilder.cs | 102 ++++-- src/Cortex.Streams/Windows/SessionKey.cs | 35 ++ src/Cortex.Streams/Windows/SessionState.cs | 16 + src/Cortex.Streams/Windows/WindowKey.cs | 33 ++ src/Cortex.Streams/Windows/WindowState.cs | 15 + .../Tests/SessionWindowOperatorTests.cs | 258 ++++++++++++--- .../Tests/SlidingWindowOperatorTests.cs | 96 ++++-- .../Tests/TumblingWindowOperatorTests.cs | 190 +++++++++-- 15 files changed, 1166 insertions(+), 604 deletions(-) create mode 100644 src/Cortex.Streams/Windows/SessionKey.cs create mode 100644 src/Cortex.Streams/Windows/SessionState.cs create mode 100644 src/Cortex.Streams/Windows/WindowKey.cs create mode 100644 src/Cortex.Streams/Windows/WindowState.cs diff --git a/src/Cortex.States.RocksDb/RocksDbStateStore.cs b/src/Cortex.States.RocksDb/RocksDbStateStore.cs index 2144d18..2b840bf 100644 --- a/src/Cortex.States.RocksDb/RocksDbStateStore.cs +++ b/src/Cortex.States.RocksDb/RocksDbStateStore.cs @@ -121,5 +121,26 @@ public void Dispose() _db?.Dispose(); _lock?.Dispose(); } + + public IEnumerable GetKeys() + { + _lock.EnterReadLock(); + try + { + using (var iterator = _db.NewIterator()) + { + for (iterator.SeekToFirst(); iterator.Valid(); iterator.Next()) + { + var keyBytes = iterator.Key(); + var key = _keyDeserializer(keyBytes); + yield return key; + } + } + } + finally + { + _lock.ExitReadLock(); + } + } } } diff --git a/src/Cortex.States/Abstractions/IStateStore.cs b/src/Cortex.States/Abstractions/IStateStore.cs index 166613b..2195f46 100644 --- a/src/Cortex.States/Abstractions/IStateStore.cs +++ b/src/Cortex.States/Abstractions/IStateStore.cs @@ -15,5 +15,6 @@ public interface IStateStore : IStateStore bool ContainsKey(TKey key); void Remove(TKey key); IEnumerable> GetAll(); + IEnumerable GetKeys(); } } diff --git a/src/Cortex.States/InMemoryStateStore.cs b/src/Cortex.States/InMemoryStateStore.cs index 5a043d7..a7b0fbb 100644 --- a/src/Cortex.States/InMemoryStateStore.cs +++ b/src/Cortex.States/InMemoryStateStore.cs @@ -42,5 +42,10 @@ public IEnumerable> GetAll() { return _store; } + + public IEnumerable GetKeys() + { + return _store.Keys; + } } } diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index eb431ce..9170315 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.Streams.Operators; +using Cortex.Streams.Windows; using System; using System.Collections.Generic; @@ -98,64 +99,66 @@ public interface IStreamBuilder /// A function to extract the key from data. /// The duration of the tumbling window. /// A function to process the data in the window. - /// Optional name for the state store. - /// Optional state store instance. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for window state. + /// Optional state store instance for window results. /// A stream builder with the new data type. IStreamBuilder TumblingWindow( Func keySelector, TimeSpan windowDuration, Func, TWindowOutput> windowFunction, - string stateStoreName = null, + string windowStateStoreName = null, string windowResultsStateStoreName = null, - IStateStore> stateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null); + IStateStore> windowStateStore = null, + IStateStore, TWindowOutput> windowResultsStateStore = null); /// - /// Adds Sliding window operator to the stream + /// Adds a sliding window operator to the stream. /// /// The type of the key to group by. /// The type of the output after windowing. /// A function to extract the key from data. - /// - /// - /// - /// - /// - /// - /// - /// + /// The duration of the sliding window. + /// The interval at which the window slides. + /// A function to process the data in the window. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for window state. + /// Optional state store instance for window results. + /// A stream builder with the new data type. IStreamBuilder SlidingWindow( Func keySelector, - TimeSpan windowSize, - TimeSpan advanceBy, + TimeSpan windowDuration, + TimeSpan slideInterval, Func, TWindowOutput> windowFunction, string windowStateStoreName = null, string windowResultsStateStoreName = null, - IStateStore> windowStateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null); + IStateStore, List> windowStateStore = null, + IStateStore, TWindowOutput> windowResultsStateStore = null); /// - /// Adds Session window operator to the stream + /// Adds a session window operator to the stream. /// /// The type of the key to group by. - /// The type of the output after windowing. + /// The type of the output after session windowing. /// A function to extract the key from data. - /// - /// - /// - /// - /// - /// - /// - IStreamBuilder SessionWindow( + /// The inactivity gap duration to define session boundaries. + /// A function to process the data in the session. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for session state. + /// Optional state store instance for session results. + /// A stream builder with the new data type. + IStreamBuilder SessionWindow( Func keySelector, TimeSpan inactivityGap, - Func, TWindowOutput> windowFunction, + Func, TSessionOutput> sessionFunction, string sessionStateStoreName = null, - string windowResultsStateStoreName = null, - IStateStore> sessionStateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null); + string sessionResultsStateStoreName = null, + IStateStore> sessionStateStore = null, + IStateStore, TSessionOutput> sessionResultsStateStore = null); IStreamBuilder SetNext(IOperator customOperator); diff --git a/src/Cortex.Streams/Operators/SessionWindowOperator.cs b/src/Cortex.Streams/Operators/SessionWindowOperator.cs index 4ab628d..f6caec4 100644 --- a/src/Cortex.Streams/Operators/SessionWindowOperator.cs +++ b/src/Cortex.Streams/Operators/SessionWindowOperator.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Windows; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -8,18 +9,18 @@ namespace Cortex.Streams.Operators { /// - /// An operator that implements session window functionality. + /// An operator that performs session window aggregation. /// /// The type of input data. /// The type of the key to group by. - /// The type of the output after windowing. - public class SessionWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled, IDisposable + /// The type of the output after session windowing. + public class SessionWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled { private readonly Func _keySelector; private readonly TimeSpan _inactivityGap; - private readonly Func, TWindowOutput> _windowFunction; - private readonly IStateStore> _sessionStateStore; - private readonly IStateStore<(TKey, DateTime), TWindowOutput> _windowResultsStateStore; + private readonly Func, TSessionOutput> _sessionFunction; + private readonly IStateStore> _sessionStateStore; + private readonly IStateStore, TSessionOutput> _sessionResultsStateStore; private IOperator _nextOperator; // Telemetry fields @@ -30,128 +31,72 @@ public class SessionWindowOperator : IOperator, ISt private Action _incrementProcessedCounter; private Action _recordProcessingTime; - // Timer management - private readonly Timer _timer; - private readonly object _lock = new object(); + // Timer for checking inactive sessions + private readonly Timer _sessionExpirationTimer; + private readonly object _stateLock = new object(); public SessionWindowOperator( Func keySelector, TimeSpan inactivityGap, - Func, TWindowOutput> windowFunction, - IStateStore> sessionStateStore, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + Func, TSessionOutput> sessionFunction, + IStateStore> sessionStateStore, + IStateStore, TSessionOutput> sessionResultsStateStore = null) { - _keySelector = keySelector; + _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); _inactivityGap = inactivityGap; - _windowFunction = windowFunction; - _sessionStateStore = sessionStateStore; - _windowResultsStateStore = windowResultsStateStore; + _sessionFunction = sessionFunction ?? throw new ArgumentNullException(nameof(sessionFunction)); + _sessionStateStore = sessionStateStore ?? throw new ArgumentNullException(nameof(sessionStateStore)); + _sessionResultsStateStore = sessionResultsStateStore; - // Initialize the timer to periodically check for sessions to close - _timer = new Timer(_ => CheckForInactiveSessions(), null, _inactivityGap, _inactivityGap); + // Set up a timer to periodically check for inactive sessions + _sessionExpirationTimer = new Timer(SessionExpirationCallback, null, inactivityGap, inactivityGap); } - private void CheckForInactiveSessions() + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) { - List<(TKey key, SessionWindowState session)> sessionsToClose = new List<(TKey, SessionWindowState)>(); - var now = DateTime.UtcNow; + _telemetryProvider = telemetryProvider; - lock (_lock) + if (_telemetryProvider != null) { - foreach (var item in _sessionStateStore.GetAll()) - { - var session = _sessionStateStore.Get(item.Key); - if (session == null) - continue; + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"session_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by SessionWindowOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"session_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for SessionWindowOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"SessionWindowOperator_{typeof(TInput).Name}"); - if (now - session.LastEventTime >= _inactivityGap) - { - sessionsToClose.Add((item.Key, session)); - _sessionStateStore.Remove(item.Key); - } - } + // Cache delegates + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); } - - // Process closed sessions outside the lock - foreach (var (key, session) in sessionsToClose) + else { - TWindowOutput result; - if (_telemetryProvider != null) - { - var stopwatch = Stopwatch.StartNew(); - using (var span = _tracer.StartSpan("SessionWindowOperator.ProcessSession")) - { - try - { - result = _windowFunction(session.Events); - span.SetAttribute("key", key.ToString()); - span.SetAttribute("sessionStart", session.StartTime.ToString()); - span.SetAttribute("sessionEnd", session.LastEventTime.ToString()); - span.SetAttribute("status", "success"); - } - catch (Exception ex) - { - span.SetAttribute("status", "error"); - span.SetAttribute("exception", ex.ToString()); - throw; - } - finally - { - stopwatch.Stop(); - _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); - } - } - } - else - { - result = _windowFunction(session.Events); - } - - // Store the session result in the state store if provided - if (_windowResultsStateStore != null) - { - _windowResultsStateStore.Put((key, session.StartTime), result); - } + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } - // Continue processing - _nextOperator?.Process(result); + // Propagate telemetry to the next operator + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } } public void Process(object input) { - var typedInput = (TInput)input; - var key = _keySelector(typedInput); - var timestamp = DateTime.UtcNow; + if (input == null) + throw new ArgumentNullException(nameof(input)); + + if (!(input is TInput typedInput)) + throw new ArgumentException($"Expected input of type {typeof(TInput).Name}, but received {input.GetType().Name}"); if (_telemetryProvider != null) { var stopwatch = Stopwatch.StartNew(); + using (var span = _tracer.StartSpan("SessionWindowOperator.Process")) { try { - lock (_lock) - { - var session = _sessionStateStore.Get(key); - if (session == null) - { - session = new SessionWindowState - { - StartTime = timestamp, - LastEventTime = timestamp, - Events = new List { typedInput } - }; - _sessionStateStore.Put(key, session); - } - else - { - session.Events.Add(typedInput); - session.LastEventTime = timestamp; - } - } - span.SetAttribute("key", key.ToString()); - span.SetAttribute("timestamp", timestamp.ToString()); + ProcessInput(typedInput); span.SetAttribute("status", "success"); } catch (Exception ex) @@ -170,90 +115,149 @@ public void Process(object input) } else { - lock (_lock) + ProcessInput(typedInput); + } + } + + private void ProcessInput(TInput input) + { + var key = _keySelector(input); + var currentTime = DateTime.UtcNow; + + lock (_stateLock) + { + SessionState sessionState; + + if (!_sessionStateStore.ContainsKey(key)) { - var session = _sessionStateStore.Get(key); - if (session == null) + // Start a new session + sessionState = new SessionState { - session = new SessionWindowState - { - StartTime = timestamp, - LastEventTime = timestamp, - Events = new List { typedInput } - }; - _sessionStateStore.Put(key, session); + SessionStartTime = currentTime, + LastEventTime = currentTime, + Events = new List { input } + }; + _sessionStateStore.Put(key, sessionState); + } + else + { + sessionState = _sessionStateStore.Get(key); + + var timeSinceLastEvent = currentTime - sessionState.LastEventTime; + + if (timeSinceLastEvent <= _inactivityGap) + { + // Continue the current session + sessionState.Events.Add(input); + sessionState.LastEventTime = currentTime; + _sessionStateStore.Put(key, sessionState); } else { - session.Events.Add(typedInput); - session.LastEventTime = timestamp; + // Session has expired, process it + ProcessSession(key, sessionState); + + // Start a new session + sessionState = new SessionState + { + SessionStartTime = currentTime, + LastEventTime = currentTime, + Events = new List { input } + }; + _sessionStateStore.Put(key, sessionState); } } } } - public void SetNext(IOperator nextOperator) + private void ProcessSession(TKey key, SessionState sessionState) { - _nextOperator = nextOperator; + var sessionOutput = _sessionFunction(sessionState.Events); - // Propagate telemetry - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + // Optionally store the session result + if (_sessionResultsStateStore != null) { - nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + var resultKey = new SessionKey + { + Key = key, + SessionStartTime = sessionState.SessionStartTime, + SessionEndTime = sessionState.LastEventTime + }; + _sessionResultsStateStore.Put(resultKey, sessionOutput); } + + // Emit the session output + _nextOperator?.Process(sessionOutput); + + // Remove the session state + _sessionStateStore.Remove(key); } - public IEnumerable GetStateStores() + private void SessionExpirationCallback(object state) { - yield return _sessionStateStore; - if (_windowResultsStateStore != null) + try { - yield return _windowResultsStateStore; - } - } + var currentTime = DateTime.UtcNow; + var keysToProcess = new List(); - public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) - { - _telemetryProvider = telemetryProvider; + lock (_stateLock) + { + var allKeys = _sessionStateStore.GetKeys(); - if (_telemetryProvider != null) - { - var metricsProvider = _telemetryProvider.GetMetricsProvider(); - _processedCounter = metricsProvider.CreateCounter($"session_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by SessionWindowOperator"); - _processingTimeHistogram = metricsProvider.CreateHistogram($"session_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for SessionWindowOperator"); - _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"SessionWindowOperator_{typeof(TInput).Name}"); + foreach (var key in allKeys) + { + var sessionState = _sessionStateStore.Get(key); + if (sessionState != null) + { + var timeSinceLastEvent = currentTime - sessionState.LastEventTime; - // Cache delegates - _incrementProcessedCounter = () => _processedCounter.Increment(); - _recordProcessingTime = value => _processingTimeHistogram.Record(value); + if (timeSinceLastEvent > _inactivityGap) + { + // Session has expired + keysToProcess.Add(key); + } + } + } + } + + // Process expired sessions outside the lock + foreach (var key in keysToProcess) + { + SessionState sessionState; + + lock (_stateLock) + { + sessionState = _sessionStateStore.Get(key); + if (sessionState == null) + continue; // Already processed + } + + ProcessSession(key, sessionState); + } } - else + catch (Exception ex) { - _incrementProcessedCounter = null; - _recordProcessingTime = null; + // Log or handle exceptions as necessary + Console.WriteLine($"Error in SessionExpirationCallback: {ex.Message}"); } + } + + public IEnumerable GetStateStores() + { + yield return _sessionStateStore; + if (_sessionResultsStateStore != null) + yield return _sessionResultsStateStore; + } + + public void SetNext(IOperator nextOperator) + { + _nextOperator = nextOperator; // Propagate telemetry to the next operator - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) { nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } } - - public void Dispose() - { - _timer?.Dispose(); - } - } - - /// - /// Represents the state of a session window. - /// - /// The type of input data. - public class SessionWindowState - { - public DateTime StartTime { get; set; } - public DateTime LastEventTime { get; set; } - public List Events { get; set; } } } diff --git a/src/Cortex.Streams/Operators/SlidingWindowOperator.cs b/src/Cortex.Streams/Operators/SlidingWindowOperator.cs index 1068656..89d128b 100644 --- a/src/Cortex.Streams/Operators/SlidingWindowOperator.cs +++ b/src/Cortex.Streams/Operators/SlidingWindowOperator.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Windows; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -10,19 +11,19 @@ namespace Cortex.Streams.Operators { /// - /// An operator that implements sliding window functionality. + /// An operator that performs sliding window aggregation. /// /// The type of input data. /// The type of the key to group by. /// The type of the output after windowing. - public class SlidingWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled, IDisposable + public class SlidingWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled { private readonly Func _keySelector; - private readonly TimeSpan _windowSize; - private readonly TimeSpan _advanceBy; + private readonly TimeSpan _windowDuration; + private readonly TimeSpan _slideInterval; private readonly Func, TWindowOutput> _windowFunction; - private readonly IStateStore> _windowStateStore; - private readonly IStateStore<(TKey, DateTime), TWindowOutput> _windowResultsStateStore; + private readonly IStateStore, List> _windowStateStore; + private readonly IStateStore, TWindowOutput> _windowResultsStateStore; private IOperator _nextOperator; // Telemetry fields @@ -33,151 +34,74 @@ public class SlidingWindowOperator : IOperator, ISt private Action _incrementProcessedCounter; private Action _recordProcessingTime; - // Timer management fields - private readonly Timer _timer; - private readonly object _lock = new object(); - private readonly Dictionary> _windowStartTimes = new Dictionary>(); + // Timer for window processing + private readonly Timer _windowProcessingTimer; + private readonly object _stateLock = new object(); public SlidingWindowOperator( Func keySelector, - TimeSpan windowSize, - TimeSpan advanceBy, + TimeSpan windowDuration, + TimeSpan slideInterval, Func, TWindowOutput> windowFunction, - IStateStore> windowStateStore, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + IStateStore, List> windowStateStore, + IStateStore, TWindowOutput> windowResultsStateStore = null) { - _keySelector = keySelector; - _windowSize = windowSize; - _advanceBy = advanceBy; - _windowFunction = windowFunction; - _windowStateStore = windowStateStore; + _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); + _windowDuration = windowDuration; + _slideInterval = slideInterval; + _windowFunction = windowFunction ?? throw new ArgumentNullException(nameof(windowFunction)); + _windowStateStore = windowStateStore ?? throw new ArgumentNullException(nameof(windowStateStore)); _windowResultsStateStore = windowResultsStateStore; - // Initialize the timer to trigger window evaluations at the smallest interval between window size and advance interval - var timerInterval = _advanceBy < TimeSpan.FromSeconds(1) ? TimeSpan.FromSeconds(1) : _advanceBy; - _timer = new Timer(_ => EvaluateWindows(), null, timerInterval, timerInterval); + // Set up a timer to periodically process windows + _windowProcessingTimer = new Timer(WindowProcessingCallback, null, _slideInterval, _slideInterval); } - private void EvaluateWindows() + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) { - List<(TKey key, DateTime windowStart, List windowData)> windowsToProcess = new List<(TKey, DateTime, List)>(); + _telemetryProvider = telemetryProvider; - lock (_lock) + if (_telemetryProvider != null) { - var now = DateTime.UtcNow; - foreach (var window in _windowStateStore.GetAll()) - { - var events = _windowStateStore.Get(window.Key); - if (events == null || events.Count == 0) - continue; - - // Initialize window start times for the key if not present - if (!_windowStartTimes.ContainsKey(window.Key)) - { - _windowStartTimes[window.Key] = new List { events[0].timestamp }; - } - - // Remove expired events - events.RemoveAll(e => now - e.timestamp > _windowSize); - - // Generate new window start times based on advanceBy - var windowStarts = _windowStartTimes[window.Key]; - var latestWindowStart = windowStarts.Last(); - while (now - latestWindowStart >= _advanceBy) - { - latestWindowStart = latestWindowStart.Add(_advanceBy); - windowStarts.Add(latestWindowStart); - } + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"sliding_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by SlidingWindowOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"sliding_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for SlidingWindowOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"SlidingWindowOperator_{typeof(TInput).Name}"); - // Evaluate windows - foreach (var windowStart in windowStarts.ToList()) - { - if (now - windowStart >= _windowSize) - { - // Get events within the window - var windowData = events - .Where(e => e.timestamp >= windowStart && e.timestamp < windowStart + _windowSize) - .Select(e => e.input) - .ToList(); - - if (windowData.Count > 0) - { - windowsToProcess.Add((window.Key, windowStart, windowData)); - } - - // Remove old window start times - windowStarts.Remove(windowStart); - } - } - } + // Cache delegates + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); } - - // Process windows outside lock - foreach (var (key, windowStart, windowData) in windowsToProcess) + else { - TWindowOutput result; - if (_telemetryProvider != null) - { - var stopwatch = Stopwatch.StartNew(); - using (var span = _tracer.StartSpan("SlidingWindowOperator.ProcessWindow")) - { - try - { - result = _windowFunction(windowData); - span.SetAttribute("key", key.ToString()); - span.SetAttribute("windowStart", windowStart.ToString()); - span.SetAttribute("status", "success"); - } - catch (Exception ex) - { - span.SetAttribute("status", "error"); - span.SetAttribute("exception", ex.ToString()); - throw; - } - finally - { - stopwatch.Stop(); - _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); - } - } - } - else - { - result = _windowFunction(windowData); - } - - // Store the window result in the state store if provided - if (_windowResultsStateStore != null) - { - _windowResultsStateStore.Put((key, windowStart), result); - } + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } - // Continue processing - _nextOperator?.Process(result); + // Propagate telemetry to the next operator + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } } public void Process(object input) { - var typedInput = (TInput)input; - var key = _keySelector(typedInput); - var timestamp = DateTime.UtcNow; + if (input == null) + throw new ArgumentNullException(nameof(input)); + + if (!(input is TInput typedInput)) + throw new ArgumentException($"Expected input of type {typeof(TInput).Name}, but received {input.GetType().Name}"); if (_telemetryProvider != null) { var stopwatch = Stopwatch.StartNew(); + using (var span = _tracer.StartSpan("SlidingWindowOperator.Process")) { try { - lock (_lock) - { - var events = _windowStateStore.Get(key) ?? new List<(TInput, DateTime)>(); - events.Add((typedInput, timestamp)); - _windowStateStore.Put(key, events); - } - span.SetAttribute("key", key.ToString()); - span.SetAttribute("timestamp", timestamp.ToString()); + ProcessInput(typedInput); span.SetAttribute("status", "success"); } catch (Exception ex) @@ -196,66 +120,140 @@ public void Process(object input) } else { - lock (_lock) - { - var events = _windowStateStore.Get(key) ?? new List<(TInput, DateTime)>(); - events.Add((typedInput, timestamp)); - _windowStateStore.Put(key, events); - } + ProcessInput(typedInput); } } - public void SetNext(IOperator nextOperator) + private void ProcessInput(TInput input) { - _nextOperator = nextOperator; + var key = _keySelector(input); + var currentTime = DateTime.UtcNow; - // Propagate telemetry - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + var windowStartTimes = GetWindowStartTimes(currentTime); + + lock (_stateLock) { - nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + foreach (var windowStartTime in windowStartTimes) + { + var windowKey = new WindowKey + { + Key = key, + WindowStartTime = windowStartTime + }; + + List windowEvents; + if (!_windowStateStore.ContainsKey(windowKey)) + { + windowEvents = new List(); + } + else + { + windowEvents = _windowStateStore.Get(windowKey); + } + + windowEvents.Add(input); + _windowStateStore.Put(windowKey, windowEvents); + } } } - public IEnumerable GetStateStores() + private void WindowProcessingCallback(object state) { - yield return _windowStateStore; - if (_windowResultsStateStore != null) + try { - yield return _windowResultsStateStore; + var currentTime = DateTime.UtcNow; + var expiredWindowKeys = new List>(); + + lock (_stateLock) + { + var allWindowKeys = _windowStateStore.GetKeys(); + + foreach (var windowKey in allWindowKeys) + { + if (currentTime >= windowKey.WindowStartTime + _windowDuration) + { + // Window has expired + expiredWindowKeys.Add(windowKey); + } + } + } + + // Process expired windows outside the lock + foreach (var windowKey in expiredWindowKeys) + { + List windowEvents; + + lock (_stateLock) + { + windowEvents = _windowStateStore.Get(windowKey); + if (windowEvents == null) + continue; // Already processed + } + + ProcessWindow(windowKey, windowEvents); + } + } + catch (Exception ex) + { + // Log or handle exceptions as necessary + Console.WriteLine($"Error in WindowProcessingCallback: {ex.Message}"); } } - public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) + private void ProcessWindow(WindowKey windowKey, List windowEvents) { - _telemetryProvider = telemetryProvider; + var windowOutput = _windowFunction(windowEvents); - if (_telemetryProvider != null) + // Optionally store the window result + if (_windowResultsStateStore != null) { - var metricsProvider = _telemetryProvider.GetMetricsProvider(); - _processedCounter = metricsProvider.CreateCounter($"sliding_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by SlidingWindowOperator"); - _processingTimeHistogram = metricsProvider.CreateHistogram($"sliding_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for SlidingWindowOperator"); - _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"SlidingWindowOperator_{typeof(TInput).Name}"); - - // Cache delegates - _incrementProcessedCounter = () => _processedCounter.Increment(); - _recordProcessingTime = value => _processingTimeHistogram.Record(value); + _windowResultsStateStore.Put(windowKey, windowOutput); } - else + + // Emit the window output + _nextOperator?.Process(windowOutput); + + // Remove the window state + lock (_stateLock) { - _incrementProcessedCounter = null; - _recordProcessingTime = null; + _windowStateStore.Remove(windowKey); } + } - // Propagate telemetry to the next operator - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + private List GetWindowStartTimes(DateTime eventTime) + { + var windowStartTimes = new List(); + var firstWindowStartTime = eventTime - _windowDuration + _slideInterval; + var windowCount = (int)(_windowDuration.TotalMilliseconds / _slideInterval.TotalMilliseconds); + + for (int i = 0; i < windowCount; i++) { - nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + var windowStartTime = firstWindowStartTime + TimeSpan.FromMilliseconds(i * _slideInterval.TotalMilliseconds); + if (windowStartTime <= eventTime && eventTime < windowStartTime + _windowDuration) + { + windowStartTimes.Add(windowStartTime); + } } + + return windowStartTimes; + } + + public IEnumerable GetStateStores() + { + yield return _windowStateStore; + if (_windowResultsStateStore != null) + yield return _windowResultsStateStore; } - public void Dispose() + public void SetNext(IOperator nextOperator) { - _timer?.Dispose(); + _nextOperator = nextOperator; + + // Propagate telemetry to the next operator + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } } } } diff --git a/src/Cortex.Streams/Operators/TumblingWindowOperator.cs b/src/Cortex.Streams/Operators/TumblingWindowOperator.cs index 53d7895..d79a2d1 100644 --- a/src/Cortex.Streams/Operators/TumblingWindowOperator.cs +++ b/src/Cortex.Streams/Operators/TumblingWindowOperator.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Windows; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -9,31 +10,18 @@ namespace Cortex.Streams.Operators { /// - /// An operator that implements tumbling window functionality. + /// An operator that performs tumbling window aggregation. /// /// The type of input data. /// The type of the key to group by. /// The type of the output after windowing. - - /// - /// An operator that implements tumbling window functionality. - /// - /// The type of input data. - /// The type of the key to group by. - /// The type of the output after windowing. - /// - /// An operator that implements tumbling window functionality and stores window results. - /// - /// The type of input data. - /// The type of the key to group by. - /// The type of the output after windowing. - public class TumblingWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled, IDisposable + public class TumblingWindowOperator : IOperator, IStatefulOperator, ITelemetryEnabled { private readonly Func _keySelector; private readonly TimeSpan _windowDuration; private readonly Func, TWindowOutput> _windowFunction; - private readonly IStateStore> _windowStateStore; - private readonly IStateStore<(TKey, DateTime), TWindowOutput> _windowResultsStateStore; + private readonly IStateStore> _windowStateStore; + private readonly IStateStore, TWindowOutput> _windowResultsStateStore; private IOperator _nextOperator; // Telemetry fields @@ -44,121 +32,72 @@ public class TumblingWindowOperator : IOperator, IS private Action _incrementProcessedCounter; private Action _recordProcessingTime; - // Timer management fields - private readonly Dictionary _windowStartTimes = new Dictionary(); - private readonly Timer _timer; - private readonly object _lock = new object(); + // Timer for window expiration + private readonly Timer _windowExpirationTimer; + private readonly object _stateLock = new object(); public TumblingWindowOperator( Func keySelector, TimeSpan windowDuration, Func, TWindowOutput> windowFunction, - IStateStore> windowStateStore, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + IStateStore> windowStateStore, + IStateStore, TWindowOutput> windowResultsStateStore = null) { - _keySelector = keySelector; + _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); _windowDuration = windowDuration; - _windowFunction = windowFunction; - _windowStateStore = windowStateStore; + _windowFunction = windowFunction ?? throw new ArgumentNullException(nameof(windowFunction)); + _windowStateStore = windowStateStore ?? throw new ArgumentNullException(nameof(windowStateStore)); _windowResultsStateStore = windowResultsStateStore; - // Initialize the timer - _timer = new Timer(_ => CloseWindows(), null, _windowDuration, _windowDuration); + // Set up a timer to periodically check for window expirations + _windowExpirationTimer = new Timer(WindowExpirationCallback, null, _windowDuration, _windowDuration); } - private void CloseWindows() + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) { - List<(TKey key, List windowData, DateTime windowStart)> windowsToProcess = new List<(TKey, List, DateTime)>(); - lock (_lock) + _telemetryProvider = telemetryProvider; + + if (_telemetryProvider != null) { - var now = DateTime.UtcNow; - foreach (var kvp in _windowStartTimes) - { - var key = kvp.Key; - var windowStart = kvp.Value; - if (now - windowStart >= _windowDuration) - { - var windowData = _windowStateStore.Get(key); - if (windowData != null && windowData.Count > 0) - { - windowsToProcess.Add((key, windowData, windowStart)); - } + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"tumbling_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by TumblingWindowOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"tumbling_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for TumblingWindowOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"TumblingWindowOperator_{typeof(TInput).Name}"); - // Reset window - _windowStateStore.Put(key, new List()); - _windowStartTimes[key] = now; - } - } + // Cache delegates + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); } - - // Process windows outside lock - foreach (var (key, windowData, windowStart) in windowsToProcess) + else { - TWindowOutput result; - if (_telemetryProvider != null) - { - var stopwatch = Stopwatch.StartNew(); - using (var span = _tracer.StartSpan("TumblingWindowOperator.ProcessWindow")) - { - try - { - result = _windowFunction(windowData); - span.SetAttribute("key", key.ToString()); - span.SetAttribute("status", "success"); - } - catch (Exception ex) - { - span.SetAttribute("status", "error"); - span.SetAttribute("exception", ex.ToString()); - throw; - } - finally - { - stopwatch.Stop(); - _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); - } - } - } - else - { - result = _windowFunction(windowData); - } - - // Store the window result in the state store if provided - if (_windowResultsStateStore != null) - { - _windowResultsStateStore.Put((key, windowStart), result); - } + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } - // Continue processing - _nextOperator?.Process(result); + // Propagate telemetry to the next operator + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } } public void Process(object input) { - var typedInput = (TInput)input; - var key = _keySelector(typedInput); + if (input == null) + throw new ArgumentNullException(nameof(input)); + + if (!(input is TInput typedInput)) + throw new ArgumentException($"Expected input of type {typeof(TInput).Name}, but received {input.GetType().Name}"); if (_telemetryProvider != null) { var stopwatch = Stopwatch.StartNew(); + using (var span = _tracer.StartSpan("TumblingWindowOperator.Process")) { try { - lock (_lock) - { - var windowData = _windowStateStore.Get(key); - if (windowData == null) - { - windowData = new List(); - _windowStateStore.Put(key, windowData); - _windowStartTimes[key] = DateTime.UtcNow; - } - windowData.Add(typedInput); - } - span.SetAttribute("key", key.ToString()); + ProcessInput(typedInput); span.SetAttribute("status", "success"); } catch (Exception ex) @@ -177,71 +116,159 @@ public void Process(object input) } else { - lock (_lock) - { - var windowData = _windowStateStore.Get(key); - if (windowData == null) - { - windowData = new List(); - _windowStateStore.Put(key, windowData); - _windowStartTimes[key] = DateTime.UtcNow; - } - windowData.Add(typedInput); - } + ProcessInput(typedInput); } } - public void SetNext(IOperator nextOperator) + private void ProcessInput(TInput input) { - _nextOperator = nextOperator; + var key = _keySelector(input); + var currentTime = DateTime.UtcNow; - // Propagate telemetry - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + WindowState windowState; + bool isNewWindow = false; + + lock (_stateLock) { - nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + if (!_windowStateStore.ContainsKey(key)) + { + // Initialize window state + var windowStartTime = GetWindowStartTime(currentTime); + windowState = new WindowState + { + WindowStartTime = windowStartTime, + Events = new List() + }; + _windowStateStore.Put(key, windowState); + isNewWindow = true; + } + else + { + windowState = _windowStateStore.Get(key); + } + + // Check if the event falls into the current window + if (currentTime >= windowState.WindowStartTime && currentTime < windowState.WindowStartTime + _windowDuration) + { + // Event falls into current window + windowState.Events.Add(input); + _windowStateStore.Put(key, windowState); + } + else + { + // Window has closed, process the window + ProcessWindow(key, windowState); + + // Start a new window + var newWindowStartTime = GetWindowStartTime(currentTime); + windowState = new WindowState + { + WindowStartTime = newWindowStartTime, + Events = new List { input } + }; + _windowStateStore.Put(key, windowState); + isNewWindow = true; + } + } + + if (isNewWindow) + { + // Optionally, we could set up a timer for this specific key to process the window after the window duration + // However, since we have a global timer, this might not be necessary } } - public IEnumerable GetStateStores() + private void ProcessWindow(TKey key, WindowState windowState) { - yield return _windowStateStore; + var windowOutput = _windowFunction(windowState.Events); + + // Optionally store the window result if (_windowResultsStateStore != null) { - yield return _windowResultsStateStore; + var resultKey = new WindowKey + { + Key = key, + WindowStartTime = windowState.WindowStartTime + }; + _windowResultsStateStore.Put(resultKey, windowOutput); } + + // Emit the window output + _nextOperator?.Process(windowOutput); + + // Remove the window state + _windowStateStore.Remove(key); } - public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) + private void WindowExpirationCallback(object state) { - _telemetryProvider = telemetryProvider; - - if (_telemetryProvider != null) + try { - var metricsProvider = _telemetryProvider.GetMetricsProvider(); - _processedCounter = metricsProvider.CreateCounter($"tumbling_window_operator_processed_{typeof(TInput).Name}", "Number of items processed by TumblingWindowOperator"); - _processingTimeHistogram = metricsProvider.CreateHistogram($"tumbling_window_operator_processing_time_{typeof(TInput).Name}", "Processing time for TumblingWindowOperator"); - _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"TumblingWindowOperator_{typeof(TInput).Name}"); + var currentTime = DateTime.UtcNow; + var keysToProcess = new List(); - // Cache delegates - _incrementProcessedCounter = () => _processedCounter.Increment(); - _recordProcessingTime = value => _processingTimeHistogram.Record(value); + lock (_stateLock) + { + var allKeys = _windowStateStore.GetKeys(); + + foreach (var key in allKeys) + { + var windowState = _windowStateStore.Get(key); + if (windowState != null) + { + if (currentTime >= windowState.WindowStartTime + _windowDuration) + { + // Window has expired + keysToProcess.Add(key); + } + } + } + } + + // Process expired windows outside the lock to avoid long lock durations + foreach (var key in keysToProcess) + { + WindowState windowState; + + lock (_stateLock) + { + windowState = _windowStateStore.Get(key); + if (windowState == null) + continue; // Already processed + } + + ProcessWindow(key, windowState); + } } - else + catch (Exception ex) { - _incrementProcessedCounter = null; - _recordProcessingTime = null; + // Log or handle exceptions as necessary + Console.WriteLine($"Error in WindowExpirationCallback: {ex.Message}"); } + } + + private DateTime GetWindowStartTime(DateTime timestamp) + { + var windowStartTicks = (long)(timestamp.Ticks / _windowDuration.Ticks) * _windowDuration.Ticks; + return new DateTime(windowStartTicks, DateTimeKind.Utc); + } + + public IEnumerable GetStateStores() + { + yield return _windowStateStore; + if (_windowResultsStateStore != null) + yield return _windowResultsStateStore; + } + + public void SetNext(IOperator nextOperator) + { + _nextOperator = nextOperator; // Propagate telemetry to the next operator - if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) { nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } } - - public void Dispose() - { - _timer?.Dispose(); - } } } diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index 95c6863..97099a9 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -1,6 +1,7 @@ using Cortex.States; using Cortex.Streams.Abstractions; using Cortex.Streams.Operators; +using Cortex.Streams.Windows; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -328,8 +329,10 @@ public IInitialStreamBuilder WithTelemetry(ITelemetryProvider tel /// A function to extract the key from data. /// The duration of the tumbling window. /// A function to process the data in the window. - /// Optional name for the state store. - /// Optional state store instance. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for window state. + /// Optional state store instance for window results. /// A stream builder with the new data type. public IStreamBuilder TumblingWindow( Func keySelector, @@ -337,8 +340,8 @@ public IStreamBuilder TumblingWindow( Func, TWindowOutput> windowFunction, string windowStateStoreName = null, string windowResultsStateStoreName = null, - IStateStore> windowStateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + IStateStore> windowStateStore = null, + IStateStore, TWindowOutput> windowResultsStateStore = null) { if (windowStateStore == null) { @@ -346,12 +349,12 @@ public IStreamBuilder TumblingWindow( { windowStateStoreName = $"TumblingWindowStateStore_{Guid.NewGuid()}"; } - windowStateStore = new InMemoryStateStore>(windowStateStoreName); + windowStateStore = new InMemoryStateStore>(windowStateStoreName); } if (windowResultsStateStore == null && !string.IsNullOrEmpty(windowResultsStateStoreName)) { - windowResultsStateStore = new InMemoryStateStore<(TKey, DateTime), TWindowOutput>(windowResultsStateStoreName); + windowResultsStateStore = new InMemoryStateStore, TWindowOutput>(windowResultsStateStoreName); } var windowOperator = new TumblingWindowOperator( @@ -368,18 +371,36 @@ public IStreamBuilder TumblingWindow( _lastOperator = windowOperator; } - return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded); + return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded) + { + _telemetryProvider = this._telemetryProvider + }; } + + /// + /// Adds a sliding window operator to the stream. + /// + /// The type of the key to group by. + /// The type of the output after windowing. + /// A function to extract the key from data. + /// The duration of the sliding window. + /// The interval at which the window slides. + /// A function to process the data in the window. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for window state. + /// Optional state store instance for window results. + /// A stream builder with the new data type. public IStreamBuilder SlidingWindow( Func keySelector, - TimeSpan windowSize, - TimeSpan advanceBy, + TimeSpan windowDuration, + TimeSpan slideInterval, Func, TWindowOutput> windowFunction, string windowStateStoreName = null, string windowResultsStateStoreName = null, - IStateStore> windowStateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + IStateStore, List> windowStateStore = null, + IStateStore, TWindowOutput> windowResultsStateStore = null) { if (windowStateStore == null) { @@ -387,16 +408,16 @@ public IStreamBuilder SlidingWindow( { windowStateStoreName = $"SlidingWindowStateStore_{Guid.NewGuid()}"; } - windowStateStore = new InMemoryStateStore>(windowStateStoreName); + windowStateStore = new InMemoryStateStore, List>(windowStateStoreName); } if (windowResultsStateStore == null && !string.IsNullOrEmpty(windowResultsStateStoreName)) { - windowResultsStateStore = new InMemoryStateStore<(TKey, DateTime), TWindowOutput>(windowResultsStateStoreName); + windowResultsStateStore = new InMemoryStateStore, TWindowOutput>(windowResultsStateStoreName); } var windowOperator = new SlidingWindowOperator( - keySelector, windowSize, advanceBy, windowFunction, windowStateStore, windowResultsStateStore); + keySelector, windowDuration, slideInterval, windowFunction, windowStateStore, windowResultsStateStore); if (_firstOperator == null) { @@ -409,47 +430,66 @@ public IStreamBuilder SlidingWindow( _lastOperator = windowOperator; } - return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded); + return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded) + { + _telemetryProvider = this._telemetryProvider + }; } - public IStreamBuilder SessionWindow( + /// + /// Adds a session window operator to the stream. + /// + /// The type of the key to group by. + /// The type of the output after session windowing. + /// A function to extract the key from data. + /// The inactivity gap duration to define session boundaries. + /// A function to process the data in the session. + /// Optional name for the state store. + /// Optional name for the results state store. + /// Optional state store instance for session state. + /// Optional state store instance for session results. + /// A stream builder with the new data type. + public IStreamBuilder SessionWindow( Func keySelector, TimeSpan inactivityGap, - Func, TWindowOutput> windowFunction, + Func, TSessionOutput> sessionFunction, string sessionStateStoreName = null, - string windowResultsStateStoreName = null, - IStateStore> sessionStateStore = null, - IStateStore<(TKey, DateTime), TWindowOutput> windowResultsStateStore = null) + string sessionResultsStateStoreName = null, + IStateStore> sessionStateStore = null, + IStateStore, TSessionOutput> sessionResultsStateStore = null) { if (sessionStateStore == null) { if (string.IsNullOrEmpty(sessionStateStoreName)) { - sessionStateStoreName = $"SessionStateStore_{Guid.NewGuid()}"; + sessionStateStoreName = $"SessionWindowStateStore_{Guid.NewGuid()}"; } - sessionStateStore = new InMemoryStateStore>(sessionStateStoreName); + sessionStateStore = new InMemoryStateStore>(sessionStateStoreName); } - if (windowResultsStateStore == null && !string.IsNullOrEmpty(windowResultsStateStoreName)) + if (sessionResultsStateStore == null && !string.IsNullOrEmpty(sessionResultsStateStoreName)) { - windowResultsStateStore = new InMemoryStateStore<(TKey, DateTime), TWindowOutput>(windowResultsStateStoreName); + sessionResultsStateStore = new InMemoryStateStore, TSessionOutput>(sessionResultsStateStoreName); } - var sessionWindowOperator = new SessionWindowOperator( - keySelector, inactivityGap, windowFunction, sessionStateStore, windowResultsStateStore); + var sessionOperator = new SessionWindowOperator( + keySelector, inactivityGap, sessionFunction, sessionStateStore, sessionResultsStateStore); if (_firstOperator == null) { - _firstOperator = sessionWindowOperator; - _lastOperator = sessionWindowOperator; + _firstOperator = sessionOperator; + _lastOperator = sessionOperator; } else { - _lastOperator.SetNext(sessionWindowOperator); - _lastOperator = sessionWindowOperator; + _lastOperator.SetNext(sessionOperator); + _lastOperator = sessionOperator; } - return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded); + return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded) + { + _telemetryProvider = this._telemetryProvider + }; } public IStreamBuilder SetNext(IOperator customOperator) diff --git a/src/Cortex.Streams/Windows/SessionKey.cs b/src/Cortex.Streams/Windows/SessionKey.cs new file mode 100644 index 0000000..e214fbc --- /dev/null +++ b/src/Cortex.Streams/Windows/SessionKey.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; + +namespace Cortex.Streams.Windows +{ + /// + /// Represents a composite key for session results. + /// + /// The type of the key. + public class SessionKey + { + public TKey Key { get; set; } + public DateTime SessionStartTime { get; set; } + public DateTime SessionEndTime { get; set; } + + public override bool Equals(object obj) + { + if (obj is SessionKey other) + { + return EqualityComparer.Default.Equals(Key, other.Key) + && SessionStartTime.Equals(other.SessionStartTime) + && SessionEndTime.Equals(other.SessionEndTime); + } + return false; + } + + public override int GetHashCode() + { + int hashKey = Key != null ? Key.GetHashCode() : 0; + int hashStartTime = SessionStartTime.GetHashCode(); + int hashEndTime = SessionEndTime.GetHashCode(); + return hashKey ^ hashStartTime ^ hashEndTime; + } + } +} diff --git a/src/Cortex.Streams/Windows/SessionState.cs b/src/Cortex.Streams/Windows/SessionState.cs new file mode 100644 index 0000000..0e8ea2c --- /dev/null +++ b/src/Cortex.Streams/Windows/SessionState.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace Cortex.Streams.Windows +{ + /// + /// Represents the state of a session window for a specific key. + /// + /// The type of input data. + public class SessionState + { + public DateTime SessionStartTime { get; set; } + public DateTime LastEventTime { get; set; } + public List Events { get; set; } + } +} diff --git a/src/Cortex.Streams/Windows/WindowKey.cs b/src/Cortex.Streams/Windows/WindowKey.cs new file mode 100644 index 0000000..9c6efd3 --- /dev/null +++ b/src/Cortex.Streams/Windows/WindowKey.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; + +namespace Cortex.Streams.Windows +{ + + /// + /// Represents a composite key for window results. + /// + /// The type of the key. + public class WindowKey + { + public TKey Key { get; set; } + public DateTime WindowStartTime { get; set; } + + public override bool Equals(object obj) + { + if (obj is WindowKey other) + { + return EqualityComparer.Default.Equals(Key, other.Key) + && WindowStartTime.Equals(other.WindowStartTime); + } + return false; + } + + public override int GetHashCode() + { + int hashKey = Key != null ? Key.GetHashCode() : 0; + int hashTime = WindowStartTime.GetHashCode(); + return hashKey ^ hashTime; + } + } +} diff --git a/src/Cortex.Streams/Windows/WindowState.cs b/src/Cortex.Streams/Windows/WindowState.cs new file mode 100644 index 0000000..5cb1d9b --- /dev/null +++ b/src/Cortex.Streams/Windows/WindowState.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; + +namespace Cortex.Streams.Windows +{ + /// + /// Represents the state of a window for a specific key. + /// + /// The type of input data. + public class WindowState + { + public DateTime WindowStartTime { get; set; } + public List Events { get; set; } + } +} diff --git a/src/Cortex.Tests/Streams/Tests/SessionWindowOperatorTests.cs b/src/Cortex.Tests/Streams/Tests/SessionWindowOperatorTests.cs index 29141fe..d883285 100644 --- a/src/Cortex.Tests/Streams/Tests/SessionWindowOperatorTests.cs +++ b/src/Cortex.Tests/Streams/Tests/SessionWindowOperatorTests.cs @@ -1,62 +1,246 @@ using Cortex.States; -using Cortex.Streams.Operators; +using Cortex.Streams.Windows; +using Moq; namespace Cortex.Streams.Tests { public class SessionWindowOperatorTests { + + [Fact] + public void SessionWindowOperator_BasicFunctionality_SessionsAggregatedCorrectly() + { + // Arrange + var inactivityGap = TimeSpan.FromSeconds(5); + + var emittedValues = new List(); + Action sinkAction = output => + { + emittedValues.Add(output); + Console.WriteLine($"Session closed for Key: {output.Key}, Aggregated Value: {output.AggregatedValue}"); + }; + + var sessionStateStore = new InMemoryStateStore>("SessionStateStore"); + + // Build the stream + var stream = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .SessionWindow( + keySelector: input => input.Key, + inactivityGap: inactivityGap, + sessionFunction: events => + { + var key = events.First().Key; + var sum = events.Sum(e => e.Value); + var sessionStartTime = events.Min(e => e.EventTime); + var sessionEndTime = events.Max(e => e.EventTime); + return new SessionOutput + { + Key = key, + AggregatedValue = sum, + SessionStartTime = sessionStartTime, + SessionEndTime = sessionEndTime + }; + }, + sessionStateStore: sessionStateStore) + .Sink(sinkAction) + .Build(); + + stream.Start(); + + var now = DateTime.UtcNow; + + // Act + var input1 = new InputData { Key = "A", Value = 1, EventTime = now }; + stream.Emit(input1); + + Thread.Sleep(2000); + + var input2 = new InputData { Key = "A", Value = 2, EventTime = now.AddSeconds(2) }; + stream.Emit(input2); + + Thread.Sleep(6000); // Wait to exceed inactivity gap + + // Wait for session to expire + Thread.Sleep(1000); + + // Assert + Assert.Single(emittedValues); + Assert.Equal("A", emittedValues[0].Key); + Assert.Equal(3, emittedValues[0].AggregatedValue); // 1 + 2 = 3 + Assert.Equal(now, emittedValues[0].SessionStartTime); + Assert.Equal(now.AddSeconds(2), emittedValues[0].SessionEndTime); + + stream.Stop(); + } + [Fact] - public void Process_ShouldStartNewSession() + public void SessionWindowOperator_InactivityGap_SessionClosesAfterInactivity() { // Arrange - var stateStore = new InMemoryStateStore>("SessionTestStore"); - var sessionOperator = new SessionWindowOperator( - x => x.ToString(), - TimeSpan.FromSeconds(10), - inputs => string.Join(",", inputs), - stateStore - ); + var inactivityGap = TimeSpan.FromSeconds(2); + + var emittedValues = new List(); + Action sinkAction = output => + { + emittedValues.Add(output); + Console.WriteLine($"Session closed for Key: {output.Key}, Aggregated Value: {output.AggregatedValue}"); + }; + + var sessionStateStore = new InMemoryStateStore>("SessionStateStore"); + + // Build the stream + var stream = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .SessionWindow( + keySelector: input => input.Key, + inactivityGap: inactivityGap, + sessionFunction: events => + { + var key = events.First().Key; + var sum = events.Sum(e => e.Value); + var sessionStartTime = events.Min(e => e.EventTime); + var sessionEndTime = events.Max(e => e.EventTime); + return new SessionOutput + { + Key = key, + AggregatedValue = sum, + SessionStartTime = sessionStartTime, + SessionEndTime = sessionEndTime + }; + }, + sessionStateStore: sessionStateStore) + .Sink(sinkAction) + .Build(); + + stream.Start(); + + var now = DateTime.UtcNow; // Act - sessionOperator.Process(2); - sessionOperator.Process(1); - sessionOperator.Process(2); + stream.Emit(new InputData { Key = "A", Value = 1, EventTime = now }); + + Thread.Sleep(1000); + + stream.Emit(new InputData { Key = "A", Value = 2, EventTime = now.AddSeconds(1) }); + + Thread.Sleep(3000); // Wait to exceed inactivity gap + + // Wait for session to expire + Thread.Sleep(1000); // Assert - var session = stateStore.Get("2"); - Assert.NotNull(session); - Assert.Equal(2, session.Events.Count); + Assert.Single(emittedValues); + Assert.Equal(3, emittedValues[0].AggregatedValue); + + stream.Stop(); } [Fact] - public void CheckForInactiveSessions_ShouldCloseExpiredSessions() + public void SessionWindowOperator_StatePersistence_StateRestoredCorrectly() { // Arrange - var stateStore = new InMemoryStateStore>("SessionTestStore"); - var sessionOperator = new SessionWindowOperator( - x => x.ToString(), - TimeSpan.FromMilliseconds(500), - inputs => string.Join(",", inputs), - stateStore - ); - - sessionOperator.Process(1); - - // Wait for inactivity gap - Thread.Sleep(100); - Thread.Sleep(100); - Thread.Sleep(100); - Thread.Sleep(100); - Thread.Sleep(100); - Thread.Sleep(100); + var inactivityGap = TimeSpan.FromSeconds(5); + + var emittedValues = new List(); + Action sinkAction = output => + { + emittedValues.Add(output); + }; + + var sessionStateStore = new InMemoryStateStore>("SessionStateStore"); + + // First stream instance + var stream1 = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .SessionWindow( + keySelector: input => input.Key, + inactivityGap: inactivityGap, + sessionFunction: events => + { + var key = events.First().Key; + var sum = events.Sum(e => e.Value); + var sessionStartTime = events.Min(e => e.EventTime); + var sessionEndTime = events.Max(e => e.EventTime); + return new SessionOutput + { + Key = key, + AggregatedValue = sum, + SessionStartTime = sessionStartTime, + SessionEndTime = sessionEndTime + }; + }, + sessionStateStore: sessionStateStore) + .Sink(sinkAction) + .Build(); + + stream1.Start(); + + var now = DateTime.UtcNow; // Act - // acting should be done automatically by the framework - //sessionOperator.CheckForInactiveSessions(); + stream1.Emit(new InputData { Key = "A", Value = 1, EventTime = now }); + + // Simulate application restart + stream1.Stop(); + + var stream2 = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .SessionWindow( + keySelector: input => input.Key, + inactivityGap: inactivityGap, + sessionFunction: events => + { + var key = events.First().Key; + var sum = events.Sum(e => e.Value); + var sessionStartTime = events.Min(e => e.EventTime); + var sessionEndTime = events.Max(e => e.EventTime); + return new SessionOutput + { + Key = key, + AggregatedValue = sum, + SessionStartTime = sessionStartTime, + SessionEndTime = sessionEndTime + }; + }, + sessionStateStore: sessionStateStore) + .Sink(sinkAction) + .Build(); + + stream2.Start(); + + stream2.Emit(new InputData { Key = "A", Value = 2, EventTime = now.AddSeconds(2) }); + + // Wait to exceed inactivity gap + Thread.Sleep(6000); + + // Wait for session to expire + Thread.Sleep(1000); // Assert - var session = stateStore.Get("1"); - Assert.Null(session); + Assert.Single(emittedValues); + Assert.Equal(3, emittedValues[0].AggregatedValue); + + stream2.Stop(); + } + + public class InputData + { + public string Key { get; set; } + public int Value { get; set; } + public DateTime EventTime { get; set; } + } + + public class SessionOutput + { + public string Key { get; set; } + public int AggregatedValue { get; set; } + public DateTime SessionStartTime { get; set; } + public DateTime SessionEndTime { get; set; } } } } diff --git a/src/Cortex.Tests/Streams/Tests/SlidingWindowOperatorTests.cs b/src/Cortex.Tests/Streams/Tests/SlidingWindowOperatorTests.cs index 3286cf6..b1294f8 100644 --- a/src/Cortex.Tests/Streams/Tests/SlidingWindowOperatorTests.cs +++ b/src/Cortex.Tests/Streams/Tests/SlidingWindowOperatorTests.cs @@ -1,42 +1,88 @@ using Cortex.States; using Cortex.Streams.Operators; +using Cortex.Streams.Windows; namespace Cortex.Streams.Tests { public class SlidingWindowOperatorTests { [Fact] - public void SlidingWindowOperator_WindowsDataCorrectly() + public void SlidingWindowOperator_BasicFunctionality_WindowsAggregatedCorrectly() { // Arrange - var windowSize = TimeSpan.FromSeconds(4); - var advanceBy = TimeSpan.FromSeconds(2); - var windowStateStore = new InMemoryStateStore>("TestSlidingWindowStateStore"); - var windowResultsStateStore = new InMemoryStateStore<(string, DateTime), int>("TestSlidingWindowResultsStore"); - var slidingWindowOperator = new SlidingWindowOperator( - keySelector: x => "key", - windowSize: windowSize, - advanceBy: advanceBy, - windowFunction: data => data.Count(), - windowStateStore: windowStateStore, - windowResultsStateStore: windowResultsStateStore - ); - - var outputs = new List(); - var sinkOperator = new SinkOperator(x => outputs.Add(x)); - slidingWindowOperator.SetNext(sinkOperator); + var windowDuration = TimeSpan.FromSeconds(10); + var slideInterval = TimeSpan.FromSeconds(5); + + var emittedValues = new List(); + var sinkCalled = new ManualResetEventSlim(false); + Action sinkAction = output => + { + emittedValues.Add(output); + sinkCalled.Set(); + }; + + var windowStateStore = new InMemoryStateStore, List>("WindowStateStore"); + + // Build the stream + var stream = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .SlidingWindow( + keySelector: input => input.Key, + windowDuration: windowDuration, + slideInterval: slideInterval, + windowFunction: events => + { + var key = events.First().Key; + var windowStartTime = events.Min(e => e.EventTime); + var windowEndTime = events.Max(e => e.EventTime); + var sum = events.Sum(e => e.Value); + return new WindowOutput + { + Key = key, + WindowStartTime = windowStartTime, + WindowEndTime = windowEndTime, + AggregatedValue = sum + }; + }, + windowStateStore: windowStateStore) + .Sink(sinkAction) + .Build(); + + stream.Start(); + + var now = DateTime.UtcNow; // Act - slidingWindowOperator.Process(1); - Thread.Sleep(TimeSpan.FromSeconds(1)); - slidingWindowOperator.Process(2); - Thread.Sleep(advanceBy + TimeSpan.FromMilliseconds(500)); // Wait for window to advance - slidingWindowOperator.Process(3); - Thread.Sleep(windowSize + TimeSpan.FromSeconds(1)); // Wait for all windows to close + stream.Emit(new InputData { Key = "A", Value = 1, EventTime = now }); + stream.Emit(new InputData { Key = "A", Value = 2, EventTime = now.AddSeconds(3) }); + stream.Emit(new InputData { Key = "A", Value = 3, EventTime = now.AddSeconds(6) }); + + // Wait for windows to be processed + Thread.Sleep(15000); // Wait enough time for windows to be emitted + + // Manually trigger window processing if necessary + // Not needed if the timer in the operator works correctly // Assert - // Expected outputs depend on the timing and windowing logic - Assert.True(outputs.Count > 0); + Assert.True(emittedValues.Count > 0); + // Verify that the emitted values are correct based on your expectations + + stream.Stop(); + } + public class InputData + { + public string Key { get; set; } + public int Value { get; set; } + public DateTime EventTime { get; set; } + } + + public class WindowOutput + { + public string Key { get; set; } + public DateTime WindowStartTime { get; set; } + public DateTime WindowEndTime { get; set; } + public int AggregatedValue { get; set; } } } } diff --git a/src/Cortex.Tests/Streams/Tests/TumblingWindowOperatorTests.cs b/src/Cortex.Tests/Streams/Tests/TumblingWindowOperatorTests.cs index b6dd9c4..9e666e7 100644 --- a/src/Cortex.Tests/Streams/Tests/TumblingWindowOperatorTests.cs +++ b/src/Cortex.Tests/Streams/Tests/TumblingWindowOperatorTests.cs @@ -1,5 +1,7 @@ using Cortex.States; using Cortex.Streams.Operators; +using Cortex.Streams.Windows; +using Moq; namespace Cortex.Streams.Tests { @@ -8,42 +10,174 @@ public class TumblingWindowOperatorTests [Fact] public void TumblingWindowOperator_WindowsDataCorrectly() { - /// This test might fail, because it is using Thread Sleeps, these do not play well with tests. + // Arrange + var windowDuration = TimeSpan.FromSeconds(5); + + var emittedValues = new List(); + Action sinkAction = output => + { + emittedValues.Add(output); + }; + + var windowStateStore = new InMemoryStateStore>("WindowStateStore"); + var windowResultsStateStore = new InMemoryStateStore, int>("WindowResultsStateStore"); + + // Build the stream + var stream = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .TumblingWindow( + keySelector: input => input.Key, + windowDuration: windowDuration, + windowFunction: events => events.Sum(e => e.Value), + windowStateStore: windowStateStore, + windowResultsStateStore: windowResultsStateStore) + .Sink(sinkAction) + .Build(); + + stream.Start(); + + // Act + var input1 = new InputData { Key = "A", Value = 1 }; + stream.Emit(input1); + + var input2 = new InputData { Key = "A", Value = 2 }; + stream.Emit(input2); + + // Wait for the window to close + Thread.Sleep(6000); + + // Assert + Assert.Single(emittedValues); + Assert.Equal(3, emittedValues[0]); // 1 + 2 = 3 + + stream.Stop(); + } + + public class InputData + { + public string Key { get; set; } + public int Value { get; set; } + } + [Fact] + public void TumblingWindowOperator_ThreadSafety_NoExceptionsThrown_StreamBuilder() + { // Arrange var windowDuration = TimeSpan.FromSeconds(2); - var windowStateStore = new InMemoryStateStore>("TestWindowStateStore"); - var windowResultsStateStore = new InMemoryStateStore<(string, DateTime), int>("TestWindowResultsStore"); - var tumblingWindowOperator = new TumblingWindowOperator( - keySelector: x => "key", - windowDuration: windowDuration, - windowFunction: data => data.Count(), - windowStateStore: windowStateStore, - windowResultsStateStore: windowResultsStateStore - ); - - var outputs = new List(); - var sinkOperator = new SinkOperator(x => outputs.Add(x)); - tumblingWindowOperator.SetNext(sinkOperator); + + var emittedValues = new List(); + object emittedValuesLock = new object(); + Action sinkAction = output => + { + lock (emittedValuesLock) + { + emittedValues.Add(output); + } + }; + + var windowStateStore = new InMemoryStateStore>("WindowStateStore"); + + // Build the stream + var stream = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .TumblingWindow( + keySelector: input => input.Key, + windowDuration: windowDuration, + windowFunction: events => events.Sum(e => e.Value), + windowStateStore: windowStateStore) + .Sink(sinkAction) + .Build(); + + stream.Start(); // Act - tumblingWindowOperator.Process(1); - tumblingWindowOperator.Process(2); - Thread.Sleep(windowDuration + TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - tumblingWindowOperator.Process(3); - Thread.Sleep(windowDuration + TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close - Thread.Sleep(TimeSpan.FromMilliseconds(100)); // Wait for window to close + var tasks = new List(); + for (int i = 0; i < 100; i++) + { + int value = i; + tasks.Add(Task.Run(() => + { + var input = new InputData { Key = "A", Value = value }; + stream.Emit(input); + })); + } + + Task.WaitAll(tasks.ToArray()); + + System.Threading.Thread.Sleep(3000); // Wait for windows to close + + // Assert + Assert.True(emittedValues.Count > 0); + int totalInputSum = Enumerable.Range(0, 100).Sum(); + int totalEmittedSum; + lock (emittedValuesLock) + { + totalEmittedSum = emittedValues.Sum(); + } + Assert.Equal(totalInputSum, totalEmittedSum); + + stream.Stop(); + } + + [Fact] + public void TumblingWindowOperator_StatePersistence_StateRestoredCorrectly_StreamBuilder() + { + // Arrange + var windowDuration = TimeSpan.FromSeconds(5); + + var emittedValues = new List(); + Action sinkAction = output => + { + emittedValues.Add(output); + }; + + var windowStateStore = new InMemoryStateStore>("WindowStateStore"); + + // Build the first stream + var stream1 = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .TumblingWindow( + keySelector: input => input.Key, + windowDuration: windowDuration, + windowFunction: events => events.Sum(e => e.Value), + windowStateStore: windowStateStore) + .Sink(sinkAction) + .Build(); + + stream1.Start(); + + // Act + stream1.Emit(new InputData { Key = "A", Value = 1 }); + + // Simulate application restart by creating a new stream with the same state store + stream1.Stop(); + + var stream2 = StreamBuilder + .CreateNewStream("Test Stream") + .Stream() + .TumblingWindow( + keySelector: input => input.Key, + windowDuration: windowDuration, + windowFunction: events => events.Sum(e => e.Value), + windowStateStore: windowStateStore) + .Sink(sinkAction) + .Build(); + + stream2.Start(); + + stream2.Emit(new InputData { Key = "A", Value = 2 }); + + System.Threading.Thread.Sleep(6000); // Wait for window to close // Assert - Assert.Equal(new[] { 2, 1 }, outputs); + Assert.Single(emittedValues); + Assert.Equal(3, emittedValues[0]); // 1 + 2 = 3 + + stream2.Stop(); } } } From 53aabd07abba19ea3cbcf63cbfc37898ec9f57cb Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 3 Dec 2024 16:14:18 +0100 Subject: [PATCH 2/2] Update versions; ready for release v1.0.1 --- README.md | 35 ++++++++++--------- .../Cortex.States.RocksDb.csproj | 6 ++-- src/Cortex.States/Cortex.States.csproj | 6 ++-- .../Cortex.Streams.AWSSQS.csproj | 6 ++-- .../Cortex.Streams.AzureBlobStorage.csproj | 6 ++-- .../Cortex.Streams.AzureServiceBus.csproj | 6 ++-- .../Cortex.Streams.Files.csproj | 6 ++-- .../Cortex.Streams.Kafka.csproj | 6 ++-- .../Cortex.Streams.Pulsar.csproj | 6 ++-- .../Cortex.Streams.RabbitMQ.csproj | 6 ++-- .../Cortex.Streams.S3.csproj | 6 ++-- src/Cortex.Streams/Cortex.Streams.csproj | 6 ++-- .../Cortex.Telemetry.OpenTelemetry.csproj | 6 ++-- src/Cortex.Telemetry/Cortex.Telemetry.csproj | 6 ++-- 14 files changed, 57 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index eb01546..2799c46 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,11 @@ **Cortex Data Framework** is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications. -![GitHub forks](https://img.shields.io/github/forks/buildersoftio/cortex) -![GitHub License](https://img.shields.io/github/license/buildersoftio/cortex) -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams?label=Cortex.Streams) -![GitHub contributors](https://img.shields.io/github/contributors/buildersoftio/cortex) [![Discord Shield](https://discord.com/api/guilds/1310034212371566612/widget.png?style=shield)](https://discord.com/invite/4Gfe6mhJ) +[![GitHub forks](https://img.shields.io/github/forks/buildersoftio/cortex)](https://github.com/buildersoftio/cortex/fork) +[![GitHub License](https://img.shields.io/github/license/buildersoftio/cortex)](https://github.com/buildersoftio/cortex/blob/master/LICENSE) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams?label=Cortex.Streams)](https://www.nuget.org/packages/Cortex.Streams) +[![GitHub contributors](https://img.shields.io/github/contributors/buildersoftio/cortex)](https://github.com/buildersoftio/cortex) +[![Discord Shield](https://discord.com/api/guilds/1310034212371566612/widget.png?style=shield)](https://discord.com/invite/4Gfe6mhJ) ## Key Features - **Modular Architecture**: Comprises distinct, interchangeable modules for streaming, state management, and connectors, allowing developers to choose components that best fit their requirements. @@ -41,44 +42,44 @@ ## Project Structure - **Cortex.Streams:** Core streaming capabilities for building data pipelines. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams?label=Cortex.Streams) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams?label=Cortex.Streams)](https://www.nuget.org/packages/Cortex.Streams) - **Cortex.Streams.Kafka:** Integration with Apache Kafka for robust data streaming. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Kafka?label=Cortex.Streams.Kafka) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Kafka?label=Cortex.Streams.Kafka)](https://www.nuget.org/packages/Cortex.Streams.Kafka) - **Cortex.Streams.Pulsar:** Integration with Apache Pulsar for versatile messaging needs. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Pulsar?label=Cortex.Streams.Pulsar) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Pulsar?label=Cortex.Streams.Pulsar)](https://www.nuget.org/packages/Cortex.Streams.Pulsar) - **Cortex.Streams.RabbitMQ:** Integration with RabbitMQ for versatile messaging needs. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.RabbitMQ?label=Cortex.Streams.RabbitMQ) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.RabbitMQ?label=Cortex.Streams.RabbitMQ)](https://www.nuget.org/packages/Cortex.Streams.RabbitMQ) - **Cortex.Streams.AWSSQS:** Integration with Amazon SQS for messaging needs in the cloud. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AWSSQS?label=Cortex.Streams.AWSSQS) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AWSSQS?label=Cortex.Streams.AWSSQS)](https://www.nuget.org/packages/Cortex.Streams.AWSSQS) - **Cortex.Streams.AzureServiceBus:** Integration with Azure Messaging Service Bus for messaging needs in the cloud. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AzureServiceBus?label=Cortex.Streams.AzureServiceBus) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AzureServiceBus?label=Cortex.Streams.AzureServiceBus)](https://www.nuget.org/packages/Cortex.Streams.AzureServiceBus) - **Cortex.Streams.AzureBlobStorage:** Integration with Azure Blob Storage for sinking messages. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AzureBlobStorage?label=Cortex.Streams.AzureBlobStorage) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.AzureBlobStorage?label=Cortex.Streams.AzureBlobStorage)](https://www.nuget.org/packages/Cortex.Streams.AzureBlobStorage) - **Cortex.Streams.S3:** Integration with AWS S3 for sinking messages. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.S3?label=Cortex.Streams.S3) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.S3?label=Cortex.Streams.S3)](https://www.nuget.org/packages/Cortex.Streams.S3) - **Cortex.Streams.Files:** Implementation of File Source and Sink operators. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Files?label=Cortex.Streams.Files) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Files?label=Cortex.Streams.Files)](https://www.nuget.org/packages/Cortex.Streams.Files) - **Cortex.States:** Core state management functionalities. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.States?label=Cortex.States) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States?label=Cortex.States)](https://www.nuget.org/packages/Cortex.States) - **Cortex.States.RocksDb:** Persistent state storage using RocksDB. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.RocksDb?label=Cortex.States.RocksDb) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.RocksDb?label=Cortex.States.RocksDb)](https://www.nuget.org/packages/Cortex.States.RocksDb) - **Cortex.Telemetry:** Core library to add support for Tracing and Matrics. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Telemetry?label=Cortex.Telemetry) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Telemetry?label=Cortex.Telemetry)](https://www.nuget.org/packages/Cortex.Telemetry) - **Cortex.Telemetry.OpenTelemetry:** Adds support for Open Telemetry. -![NuGet Version](https://img.shields.io/nuget/v/Cortex.Telemetry.OpenTelemetry?label=Cortex.Telemetry.OpenTelemetry) +[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Telemetry.OpenTelemetry?label=Cortex.Telemetry.OpenTelemetry)](https://www.nuget.org/packages/Cortex.Telemetry.OpenTelemetry) ## Getting Started diff --git a/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj b/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj index 8893708..96660c6 100644 --- a/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj +++ b/src/Cortex.States.RocksDb/Cortex.States.RocksDb.csproj @@ -3,8 +3,8 @@ net8.0;net7.0 - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.States.RocksDb diff --git a/src/Cortex.States/Cortex.States.csproj b/src/Cortex.States/Cortex.States.csproj index 487f533..0e7ec29 100644 --- a/src/Cortex.States/Cortex.States.csproj +++ b/src/Cortex.States/Cortex.States.csproj @@ -3,8 +3,8 @@ net9.0;net8.0;net7.0;netstandard2.1;netstandard2.0 - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.States diff --git a/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj b/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj index 214c362..e2b5f7a 100644 --- a/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj +++ b/src/Cortex.Streams.AWSSQS/Cortex.Streams.AWSSQS.csproj @@ -4,8 +4,8 @@ net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.AWSSQS diff --git a/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj b/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj index 2dfdeb1..78851a2 100644 --- a/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj +++ b/src/Cortex.Streams.AzureBlobStorage/Cortex.Streams.AzureBlobStorage.csproj @@ -4,8 +4,8 @@ net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq AWS Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.AzureBlobStorage diff --git a/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj b/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj index 18e75a9..0529ad9 100644 --- a/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj +++ b/src/Cortex.Streams.AzureServiceBus/Cortex.Streams.AzureServiceBus.csproj @@ -4,8 +4,8 @@ net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq AWS Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.AzureServiceBus diff --git a/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj b/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj index c2d81c9..8c6de69 100644 --- a/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj +++ b/src/Cortex.Streams.Files/Cortex.Streams.Files.csproj @@ -4,8 +4,8 @@ net9.0;net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq files azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.Files diff --git a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj index 7dfc80b..5005fc5 100644 --- a/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj +++ b/src/Cortex.Streams.Kafka/Cortex.Streams.Kafka.csproj @@ -4,8 +4,8 @@ net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.Kafka diff --git a/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj b/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj index 21bc68e..49a1de9 100644 --- a/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj +++ b/src/Cortex.Streams.Pulsar/Cortex.Streams.Pulsar.csproj @@ -4,8 +4,8 @@ net8.0;net7.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -17,7 +17,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.Pulsar diff --git a/src/Cortex.Streams.RabbitMQ/Cortex.Streams.RabbitMQ.csproj b/src/Cortex.Streams.RabbitMQ/Cortex.Streams.RabbitMQ.csproj index aab88de..1d20b45 100644 --- a/src/Cortex.Streams.RabbitMQ/Cortex.Streams.RabbitMQ.csproj +++ b/src/Cortex.Streams.RabbitMQ/Cortex.Streams.RabbitMQ.csproj @@ -4,8 +4,8 @@ net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb rabbitMq AWS Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.RabbitMQ diff --git a/src/Cortex.Streams.S3/Cortex.Streams.S3.csproj b/src/Cortex.Streams.S3/Cortex.Streams.S3.csproj index b126afd..e093aa0 100644 --- a/src/Cortex.Streams.S3/Cortex.Streams.S3.csproj +++ b/src/Cortex.Streams.S3/Cortex.Streams.S3.csproj @@ -4,8 +4,8 @@ net9.0;net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq S3 Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams.S3 diff --git a/src/Cortex.Streams/Cortex.Streams.csproj b/src/Cortex.Streams/Cortex.Streams.csproj index be9e83e..d6524e8 100644 --- a/src/Cortex.Streams/Cortex.Streams.csproj +++ b/src/Cortex.Streams/Cortex.Streams.csproj @@ -3,8 +3,8 @@ net9.0;net8.0;net7.0;netstandard2.1;netstandard2.0 - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex mediator eda streaming distributed streams states kafka pulsar rocksdb - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Streams diff --git a/src/Cortex.Telemetry.OpenTelemetry/Cortex.Telemetry.OpenTelemetry.csproj b/src/Cortex.Telemetry.OpenTelemetry/Cortex.Telemetry.OpenTelemetry.csproj index b04d9e9..dae0173 100644 --- a/src/Cortex.Telemetry.OpenTelemetry/Cortex.Telemetry.OpenTelemetry.csproj +++ b/src/Cortex.Telemetry.OpenTelemetry/Cortex.Telemetry.OpenTelemetry.csproj @@ -4,8 +4,8 @@ net9.0;net8.0 enable - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -16,7 +16,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq S3 Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Telemetry.OpenTelemetry diff --git a/src/Cortex.Telemetry/Cortex.Telemetry.csproj b/src/Cortex.Telemetry/Cortex.Telemetry.csproj index b62819a..d4ccc47 100644 --- a/src/Cortex.Telemetry/Cortex.Telemetry.csproj +++ b/src/Cortex.Telemetry/Cortex.Telemetry.csproj @@ -3,8 +3,8 @@ net9.0;net8.0;net7.0;netstandard2.1;netstandard2.0 - 1.0.0 - 1.0.0 + 1.0.1 + 1.0.1 Buildersoft Cortex Framework Buildersoft Buildersoft,EnesHoxha @@ -15,7 +15,7 @@ https://github.com/buildersoftio/cortex cortex vortex eda streaming distributed streams states kafka pulsar rocksdb rabbitMq S3 Azure - 1.0.0 + 1.0.1 license.md cortex.png Cortex.Telemetry