Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ IBranchStreamBuilder<TIn, TCurrent> GroupBySilently<TKey>(
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
IBranchStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(
IBranchStreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>> GroupBy<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ IStreamBuilder<TIn, TCurrent> GroupBySilently<TKey>(
/// <param name="keySelector">A function to extract the key from data.</param>
/// <param name="stateStore">An optional state store to use for storing group state.</param>
/// <returns>A stream builder with grouped data.</returns>
IStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(
IStreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>> GroupBy<TKey>(
Func<TCurrent, TKey> keySelector,
string stateStoreName = null,
IStateStore<TKey, List<TCurrent>> stateStore = null);
Expand Down
6 changes: 2 additions & 4 deletions src/Cortex.Streams/BranchStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ public void Sink(ISinkOperator<TCurrent> sinkOperator)
}




public IBranchStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, States.IStateStore<TKey, List<TCurrent>> stateStore = null)
public IBranchStreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, States.IStateStore<TKey, List<TCurrent>> stateStore = null)
{
if (stateStore == null)
{
Expand All @@ -142,7 +140,7 @@ public IBranchStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(Fun
_lastOperator = groupByOperator;
}

return new BranchStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>>(_name)
return new BranchStreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>>(_name)
{
_firstOperator = _firstOperator,
_lastOperator = _lastOperator,
Expand Down
18 changes: 9 additions & 9 deletions src/Cortex.Streams/Operators/AggregateOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

namespace Cortex.Streams.Operators
{
public class AggregateOperator<TKey, TInput, TAggregate> : IOperator, IStatefulOperator, ITelemetryEnabled
public class AggregateOperator<TKey, TCurrent, TAggregate> : IOperator, IStatefulOperator, ITelemetryEnabled
{
private readonly Func<TInput, TKey> _keySelector;
private readonly Func<TAggregate, TInput, TAggregate> _aggregateFunction;
private readonly Func<TCurrent, TKey> _keySelector;
private readonly Func<TAggregate, TCurrent, TAggregate> _aggregateFunction;
private readonly IStateStore<TKey, TAggregate> _stateStore;
private IOperator _nextOperator;

Expand All @@ -23,7 +23,7 @@ public class AggregateOperator<TKey, TInput, TAggregate> : IOperator, IStatefulO
private Action _incrementProcessedCounter;
private Action<double> _recordProcessingTime;

public AggregateOperator(Func<TInput, TKey> keySelector, Func<TAggregate, TInput, TAggregate> aggregateFunction, IStateStore<TKey, TAggregate> stateStore)
public AggregateOperator(Func<TCurrent, TKey> keySelector, Func<TAggregate, TCurrent, TAggregate> aggregateFunction, IStateStore<TKey, TAggregate> stateStore)
{
_keySelector = keySelector;
_aggregateFunction = aggregateFunction;
Expand All @@ -37,9 +37,9 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
if (_telemetryProvider != null)
{
var metricsProvider = _telemetryProvider.GetMetricsProvider();
_processedCounter = metricsProvider.CreateCounter($"aggregate_operator_processed_{typeof(TInput).Name}", "Number of items processed by AggregateOperator");
_processingTimeHistogram = metricsProvider.CreateHistogram($"aggregate_operator_processing_time_{typeof(TInput).Name}", "Processing time for AggregateOperator");
_tracer = _telemetryProvider.GetTracingProvider().GetTracer($"AggregateOperator_{typeof(TInput).Name}");
_processedCounter = metricsProvider.CreateCounter($"aggregate_operator_processed_{typeof(TCurrent).Name}", "Number of items processed by AggregateOperator");
_processingTimeHistogram = metricsProvider.CreateHistogram($"aggregate_operator_processing_time_{typeof(TCurrent).Name}", "Processing time for AggregateOperator");
_tracer = _telemetryProvider.GetTracingProvider().GetTracer($"AggregateOperator_{typeof(TCurrent).Name}");

// Cache delegates
_incrementProcessedCounter = () => _processedCounter.Increment();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void Process(object input)
{
try
{
var typedInput = (TInput)input;
var typedInput = (TCurrent)input;
key = _keySelector(typedInput);
lock (_stateStore)
{
Expand All @@ -97,7 +97,7 @@ public void Process(object input)
}
else
{
var typedInput = (TInput)input;
var typedInput = (TCurrent)input;
key = _keySelector(typedInput);

lock (_stateStore)
Expand Down
5 changes: 2 additions & 3 deletions src/Cortex.Streams/StreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public IStreamBuilder<TIn, TCurrent> AggregateSilently<TKey, TAggregate>(Func<TC
}


public IStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, IStateStore<TKey, List<TCurrent>> stateStore = null)
public IStreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>> GroupBy<TKey>(Func<TCurrent, TKey> keySelector, string stateStoreName = null, IStateStore<TKey, List<TCurrent>> stateStore = null)
{
if (stateStore == null)
{
Expand All @@ -339,12 +339,11 @@ public IStreamBuilder<TIn, KeyValuePair<TKey, TCurrent>> GroupBy<TKey>(Func<TCur
_lastOperator = groupByOperator;
}

return new StreamBuilder<TIn, KeyValuePair<TKey, TCurrent>>(_name, _firstOperator, _lastOperator, _sourceAdded);
return new StreamBuilder<TIn, KeyValuePair<TKey, List<TCurrent>>>(_name, _firstOperator, _lastOperator, _sourceAdded);
}

public IStreamBuilder<TIn, KeyValuePair<TKey, TAggregate>> Aggregate<TKey, TAggregate>(Func<TCurrent, TKey> keySelector, Func<TAggregate, TCurrent, TAggregate> aggregateFunction, string stateStoreName = null, IStateStore<TKey, TAggregate> stateStore = null)
{
//private readonly Func<TInput, TKey> _keySelector
if (stateStore == null)
{
if (string.IsNullOrEmpty(stateStoreName))
Expand Down
Loading