From 67d4e25f0257643deaa639feede472dbc85be78a Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Tue, 10 Dec 2024 14:26:24 +0100 Subject: [PATCH] v1/bugs/51: GroupBy (Emitter) is not outputing List Update type of GroupBy and implement the changes in StreamBuilder and BranchStreamBuilder --- .../Abstractions/IBranchStreamBuilder.cs | 2 +- .../Abstractions/IStreamBuilder.cs | 2 +- src/Cortex.Streams/BranchStreamBuilder.cs | 6 ++---- .../Operators/AggregateOperator.cs | 18 +++++++++--------- src/Cortex.Streams/StreamBuilder.cs | 5 ++--- 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs index df03187..be172f3 100644 --- a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs @@ -48,7 +48,7 @@ IBranchStreamBuilder GroupBySilently( /// A function to extract the key from data. /// An optional state store to use for storing group state. /// A stream builder with grouped data. - IBranchStreamBuilder> GroupBy( + IBranchStreamBuilder>> GroupBy( Func keySelector, string stateStoreName = null, IStateStore> stateStore = null); diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index 85e8511..cf433e8 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -88,7 +88,7 @@ IStreamBuilder GroupBySilently( /// A function to extract the key from data. /// An optional state store to use for storing group state. /// A stream builder with grouped data. - IStreamBuilder> GroupBy( + IStreamBuilder>> GroupBy( Func keySelector, string stateStoreName = null, IStateStore> stateStore = null); diff --git a/src/Cortex.Streams/BranchStreamBuilder.cs b/src/Cortex.Streams/BranchStreamBuilder.cs index cfeb89c..939bd2f 100644 --- a/src/Cortex.Streams/BranchStreamBuilder.cs +++ b/src/Cortex.Streams/BranchStreamBuilder.cs @@ -116,9 +116,7 @@ public void Sink(ISinkOperator sinkOperator) } - - - public IBranchStreamBuilder> GroupBy(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) + public IBranchStreamBuilder>> GroupBy(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) { if (stateStore == null) { @@ -142,7 +140,7 @@ public IBranchStreamBuilder> GroupBy(Fun _lastOperator = groupByOperator; } - return new BranchStreamBuilder>(_name) + return new BranchStreamBuilder>>(_name) { _firstOperator = _firstOperator, _lastOperator = _lastOperator, diff --git a/src/Cortex.Streams/Operators/AggregateOperator.cs b/src/Cortex.Streams/Operators/AggregateOperator.cs index 423e66d..6c79621 100644 --- a/src/Cortex.Streams/Operators/AggregateOperator.cs +++ b/src/Cortex.Streams/Operators/AggregateOperator.cs @@ -8,10 +8,10 @@ namespace Cortex.Streams.Operators { - public class AggregateOperator : IOperator, IStatefulOperator, ITelemetryEnabled + public class AggregateOperator : IOperator, IStatefulOperator, ITelemetryEnabled { - private readonly Func _keySelector; - private readonly Func _aggregateFunction; + private readonly Func _keySelector; + private readonly Func _aggregateFunction; private readonly IStateStore _stateStore; private IOperator _nextOperator; @@ -23,7 +23,7 @@ public class AggregateOperator : IOperator, IStatefulO private Action _incrementProcessedCounter; private Action _recordProcessingTime; - public AggregateOperator(Func keySelector, Func aggregateFunction, IStateStore stateStore) + public AggregateOperator(Func keySelector, Func aggregateFunction, IStateStore stateStore) { _keySelector = keySelector; _aggregateFunction = aggregateFunction; @@ -37,9 +37,9 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) if (_telemetryProvider != null) { var metricsProvider = _telemetryProvider.GetMetricsProvider(); - _processedCounter = metricsProvider.CreateCounter($"aggregate_operator_processed_{typeof(TInput).Name}", "Number of items processed by AggregateOperator"); - _processingTimeHistogram = metricsProvider.CreateHistogram($"aggregate_operator_processing_time_{typeof(TInput).Name}", "Processing time for AggregateOperator"); - _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"AggregateOperator_{typeof(TInput).Name}"); + _processedCounter = metricsProvider.CreateCounter($"aggregate_operator_processed_{typeof(TCurrent).Name}", "Number of items processed by AggregateOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"aggregate_operator_processing_time_{typeof(TCurrent).Name}", "Processing time for AggregateOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"AggregateOperator_{typeof(TCurrent).Name}"); // Cache delegates _incrementProcessedCounter = () => _processedCounter.Increment(); @@ -70,7 +70,7 @@ public void Process(object input) { try { - var typedInput = (TInput)input; + var typedInput = (TCurrent)input; key = _keySelector(typedInput); lock (_stateStore) { @@ -97,7 +97,7 @@ public void Process(object input) } else { - var typedInput = (TInput)input; + var typedInput = (TCurrent)input; key = _keySelector(typedInput); lock (_stateStore) diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index b6571bd..3aa4181 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -315,7 +315,7 @@ public IStreamBuilder AggregateSilently(Func> GroupBy(Func keySelector, string stateStoreName = null, IStateStore> stateStore = null) + public IStreamBuilder>> GroupBy(Func keySelector, string stateStoreName = null, IStateStore> stateStore = null) { if (stateStore == null) { @@ -339,12 +339,11 @@ public IStreamBuilder> GroupBy(Func>(_name, _firstOperator, _lastOperator, _sourceAdded); + return new StreamBuilder>>(_name, _firstOperator, _lastOperator, _sourceAdded); } public IStreamBuilder> Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, IStateStore stateStore = null) { - //private readonly Func _keySelector if (stateStore == null) { if (string.IsNullOrEmpty(stateStoreName))