diff --git a/src/Cortex.Mediator/Assets/license.md b/src/Cortex.Mediator/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Mediator/Assets/license.md +++ b/src/Cortex.Mediator/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.States.RocksDb/Assets/license.md b/src/Cortex.States.RocksDb/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.States.RocksDb/Assets/license.md +++ b/src/Cortex.States.RocksDb/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.States/Assets/license.md b/src/Cortex.States/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.States/Assets/license.md +++ b/src/Cortex.States/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.AWSSQS/Assets/license.md b/src/Cortex.Streams.AWSSQS/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.AWSSQS/Assets/license.md +++ b/src/Cortex.Streams.AWSSQS/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.AzureBlobStorage/Assets/license.md b/src/Cortex.Streams.AzureBlobStorage/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.AzureBlobStorage/Assets/license.md +++ b/src/Cortex.Streams.AzureBlobStorage/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.AzureServiceBus/Assets/license.md b/src/Cortex.Streams.AzureServiceBus/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.AzureServiceBus/Assets/license.md +++ b/src/Cortex.Streams.AzureServiceBus/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.Files/Assets/license.md b/src/Cortex.Streams.Files/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.Files/Assets/license.md +++ b/src/Cortex.Streams.Files/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.Kafka/Assets/license.md b/src/Cortex.Streams.Kafka/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.Kafka/Assets/license.md +++ b/src/Cortex.Streams.Kafka/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.Pulsar/Assets/license.md b/src/Cortex.Streams.Pulsar/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.Pulsar/Assets/license.md +++ b/src/Cortex.Streams.Pulsar/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.RabbitMQ/Assets/license.md b/src/Cortex.Streams.RabbitMQ/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.RabbitMQ/Assets/license.md +++ b/src/Cortex.Streams.RabbitMQ/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams.S3/Assets/license.md b/src/Cortex.Streams.S3/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams.S3/Assets/license.md +++ b/src/Cortex.Streams.S3/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs index 3ec53fd..df03187 100644 --- a/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs @@ -1,5 +1,7 @@ -using Cortex.Streams.Operators; +using Cortex.States; +using Cortex.Streams.Operators; using System; +using System.Collections.Generic; namespace Cortex.Streams.Abstractions { @@ -23,7 +25,62 @@ public interface IBranchStreamBuilder /// The type of data after the transformation. /// A function to transform data. /// The branch stream builder with the new data type. - IBranchStreamBuilder Map(Func mapFunction); + IBranchStreamBuilder Map(Func mapFunction); + + + + /// + /// Groups the stream data by a specified key selector. + /// + /// The type of the key to group by. + /// A function to extract the key from data. + /// An optional state store to use for storing group state. + /// A stream builder with grouped data. + IBranchStreamBuilder GroupBySilently( + Func keySelector, + string stateStoreName = null, + IStateStore> stateStore = null); + + /// + /// Groups the stream data by a specified key selector silently. + /// + /// The type of the key to group by. + /// A function to extract the key from data. + /// An optional state store to use for storing group state. + /// A stream builder with grouped data. + IBranchStreamBuilder> GroupBy( + Func keySelector, + string stateStoreName = null, + IStateStore> stateStore = null); + + /// + /// Aggregates the stream data using a specified aggregation function. + /// + /// The type of the aggregate value. + /// A function to aggregate data. + /// An optional state store to use for storing aggregate state. + /// A stream builder with aggregated data. + IBranchStreamBuilder AggregateSilently( + Func keySelector, + Func aggregateFunction, + string stateStoreName = null, + IStateStore stateStore = null); + + /// + /// Aggregates the stream data using a specified aggregation function silently in the background. + /// + /// The type of the aggregate value. + /// A function to aggregate data. + /// An optional state store to use for storing aggregate state. + /// A stream builder with input data. + IBranchStreamBuilder> Aggregate( + Func keySelector, + Func aggregateFunction, + string stateStoreName = null, + IStateStore stateStore = null); + + + /// /// Adds a sink function to the branch to consume data. @@ -36,5 +93,9 @@ public interface IBranchStreamBuilder /// /// A sink operator to consume data. void Sink(ISinkOperator sinkOperator); + + + + } } diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index 9170315..85e8511 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -76,8 +76,22 @@ public interface IStreamBuilder /// A function to extract the key from data. /// An optional state store to use for storing group state. /// A stream builder with grouped data. - //IStreamBuilder> GroupBy(Func keySelector, string stateStoreName = null, IStateStore> stateStore = null); - IStreamBuilder GroupBy(Func keySelector, string stateStoreName = null, IStateStore> stateStore = null); + IStreamBuilder GroupBySilently( + Func keySelector, + string stateStoreName = null, + IStateStore> stateStore = null); + + /// + /// Groups the stream data by a specified key selector silently. + /// + /// The type of the key to group by. + /// A function to extract the key from data. + /// An optional state store to use for storing group state. + /// A stream builder with grouped data. + IStreamBuilder> GroupBy( + Func keySelector, + string stateStoreName = null, + IStateStore> stateStore = null); /// /// Aggregates the stream data using a specified aggregation function. @@ -86,9 +100,24 @@ public interface IStreamBuilder /// A function to aggregate data. /// An optional state store to use for storing aggregate state. /// A stream builder with aggregated data. - IStreamBuilder Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, IStateStore stateStore = null); - //IStreamBuilder> Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, IStateStore stateStore = null); + IStreamBuilder AggregateSilently( + Func keySelector, + Func aggregateFunction, + string stateStoreName = null, + IStateStore stateStore = null); + /// + /// Aggregates the stream data using a specified aggregation function silently in the background. + /// + /// The type of the aggregate value. + /// A function to aggregate data. + /// An optional state store to use for storing aggregate state. + /// A stream builder with input data. + IStreamBuilder> Aggregate( + Func keySelector, + Func aggregateFunction, + string stateStoreName = null, + IStateStore stateStore = null); /// diff --git a/src/Cortex.Streams/Assets/license.md b/src/Cortex.Streams/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Streams/Assets/license.md +++ b/src/Cortex.Streams/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Streams/BranchStreamBuilder.cs b/src/Cortex.Streams/BranchStreamBuilder.cs index 64641d1..cfeb89c 100644 --- a/src/Cortex.Streams/BranchStreamBuilder.cs +++ b/src/Cortex.Streams/BranchStreamBuilder.cs @@ -1,6 +1,8 @@ -using Cortex.Streams.Abstractions; +using Cortex.States; +using Cortex.Streams.Abstractions; using Cortex.Streams.Operators; using System; +using System.Collections.Generic; namespace Cortex.Streams { @@ -14,6 +16,7 @@ public class BranchStreamBuilder : IBranchStreamBuilder Filter(Func predicate return this; } + /// /// Adds a map operator to the branch to transform data. /// /// The type of data after the transformation. /// A function to transform data. /// The branch stream builder with the new data type. - public IBranchStreamBuilder Map(Func mapFunction) + public IBranchStreamBuilder Map(Func mapFunction) { var mapOperator = new MapOperator(mapFunction); @@ -64,10 +68,10 @@ public IBranchStreamBuilder Map(Func mapFunc _lastOperator = mapOperator; } - return new BranchStreamBuilder(_name) + return new BranchStreamBuilder(_name) { - _firstOperator = this._firstOperator, - _lastOperator = this._lastOperator + _firstOperator = _firstOperator, + _lastOperator = _lastOperator }; } @@ -110,5 +114,139 @@ public void Sink(ISinkOperator sinkOperator) _lastOperator = sinkAdapter; } } + + + + + public IBranchStreamBuilder> GroupBy(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) + { + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"GroupByStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore>(stateStoreName); + } + + var groupByOperator = new GroupByKeyOperator(keySelector, stateStore); + + if (_firstOperator == null) + { + _firstOperator = groupByOperator; + _lastOperator = groupByOperator; + } + else + { + _lastOperator.SetNext(groupByOperator); + _lastOperator = groupByOperator; + } + + return new BranchStreamBuilder>(_name) + { + _firstOperator = _firstOperator, + _lastOperator = _lastOperator, + _sourceAdded = _sourceAdded + }; + } + + public IBranchStreamBuilder GroupBySilently(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) + { + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"GroupByStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore>(stateStoreName); + } + + var groupByOperator = new GroupByKeySilentlyOperator(keySelector, stateStore); + + if (_firstOperator == null) + { + _firstOperator = groupByOperator; + _lastOperator = groupByOperator; + } + else + { + _lastOperator.SetNext(groupByOperator); + _lastOperator = groupByOperator; + } + + return new BranchStreamBuilder(_name) + { + _firstOperator = _firstOperator, + _lastOperator = _lastOperator, + _sourceAdded = _sourceAdded + }; + } + + public IBranchStreamBuilder> Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, States.IStateStore stateStore = null) + { + //private readonly Func _keySelector + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"AggregateStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore(stateStoreName); + } + + var aggregateOperator = new AggregateOperator(keySelector, aggregateFunction, stateStore); + + if (_firstOperator == null) + { + _firstOperator = aggregateOperator; + _lastOperator = aggregateOperator; + } + else + { + _lastOperator.SetNext(aggregateOperator); + _lastOperator = aggregateOperator; + } + + return new BranchStreamBuilder>(_name) + { + _firstOperator = _firstOperator, + _lastOperator = _lastOperator, + _sourceAdded = _sourceAdded + }; + } + + public IBranchStreamBuilder AggregateSilently(Func keySelector, Func aggregateFunction, string stateStoreName = null, States.IStateStore stateStore = null) + { + //private readonly Func _keySelector + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"AggregateStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore(stateStoreName); + } + + var aggregateOperator = new AggregateSilentlyOperator(keySelector, aggregateFunction, stateStore); + + if (_firstOperator == null) + { + _firstOperator = aggregateOperator; + _lastOperator = aggregateOperator; + } + else + { + _lastOperator.SetNext(aggregateOperator); + _lastOperator = aggregateOperator; + } + + return new BranchStreamBuilder(_name) + { + _firstOperator = _firstOperator, + _lastOperator = _lastOperator, + _sourceAdded = _sourceAdded + }; + } + } } diff --git a/src/Cortex.Streams/Cortex.Streams.csproj b/src/Cortex.Streams/Cortex.Streams.csproj index d6524e8..5e1df3f 100644 --- a/src/Cortex.Streams/Cortex.Streams.csproj +++ b/src/Cortex.Streams/Cortex.Streams.csproj @@ -24,7 +24,7 @@ True True git - Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management! + Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management! https://buildersoft.io/ Cortex Data Framework README.md diff --git a/src/Cortex.Streams/Operators/AggregateOperator.cs b/src/Cortex.Streams/Operators/AggregateOperator.cs index 0b3a718..423e66d 100644 --- a/src/Cortex.Streams/Operators/AggregateOperator.cs +++ b/src/Cortex.Streams/Operators/AggregateOperator.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; + namespace Cortex.Streams.Operators { public class AggregateOperator : IOperator, IStatefulOperator, ITelemetryEnabled @@ -59,17 +60,18 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) public void Process(object input) { + TAggregate aggregate; + TKey key; + if (_telemetryProvider != null) { var stopwatch = Stopwatch.StartNew(); - using (var span = _tracer.StartSpan("AggregateOperator.Process")) { try { var typedInput = (TInput)input; - var key = _keySelector(typedInput); - TAggregate aggregate; + key = _keySelector(typedInput); lock (_stateStore) { aggregate = _stateStore.Get(key); @@ -96,17 +98,17 @@ public void Process(object input) else { var typedInput = (TInput)input; - var key = _keySelector(typedInput); + key = _keySelector(typedInput); + lock (_stateStore) { - var aggregate = _stateStore.Get(key); + aggregate = _stateStore.Get(key); aggregate = _aggregateFunction(aggregate, typedInput); _stateStore.Put(key, aggregate); } } - // Continue processing - _nextOperator?.Process(input); + _nextOperator?.Process(new KeyValuePair(key, aggregate)); } public void SetNext(IOperator nextOperator) @@ -125,4 +127,5 @@ public IEnumerable GetStateStores() yield return _stateStore; } } + } diff --git a/src/Cortex.Streams/Operators/AggregateSilentlyOperator.cs b/src/Cortex.Streams/Operators/AggregateSilentlyOperator.cs new file mode 100644 index 0000000..8332f1a --- /dev/null +++ b/src/Cortex.Streams/Operators/AggregateSilentlyOperator.cs @@ -0,0 +1,132 @@ +using Cortex.States; +using Cortex.States.Operators; +using Cortex.Telemetry; +using System; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Cortex.Streams.Operators +{ + public class AggregateSilentlyOperator : IOperator, IStatefulOperator, ITelemetryEnabled + { + private readonly Func _keySelector; + private readonly Func _aggregateFunction; + private readonly IStateStore _stateStore; + private IOperator _nextOperator; + + // Telemetry fields + private ITelemetryProvider _telemetryProvider; + private ICounter _processedCounter; + private IHistogram _processingTimeHistogram; + private ITracer _tracer; + private Action _incrementProcessedCounter; + private Action _recordProcessingTime; + + public AggregateSilentlyOperator(Func keySelector, Func aggregateFunction, IStateStore stateStore) + { + _keySelector = keySelector; + _aggregateFunction = aggregateFunction; + _stateStore = stateStore; + } + + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) + { + _telemetryProvider = telemetryProvider; + + if (_telemetryProvider != null) + { + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"aggregate_operator_processed_{typeof(TInput).Name}", "Number of items processed by AggregateOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"aggregate_operator_processing_time_{typeof(TInput).Name}", "Processing time for AggregateOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"AggregateOperator_{typeof(TInput).Name}"); + + // Cache delegates + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); + } + else + { + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } + + // Propagate telemetry to the next operator + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } + } + + public void Process(object input) + { + if (_telemetryProvider != null) + { + var stopwatch = Stopwatch.StartNew(); + + using (var span = _tracer.StartSpan("AggregateOperator.Process")) + { + try + { + var typedInput = (TInput)input; + var key = _keySelector(typedInput); + TAggregate aggregate; + lock (_stateStore) + { + aggregate = _stateStore.Get(key); + aggregate = _aggregateFunction(aggregate, typedInput); + _stateStore.Put(key, aggregate); + } + span.SetAttribute("key", key.ToString()); + span.SetAttribute("status", "success"); + } + catch (Exception ex) + { + span.SetAttribute("status", "error"); + span.SetAttribute("exception", ex.ToString()); + throw; + } + finally + { + stopwatch.Stop(); + _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); + _incrementProcessedCounter(); + } + } + } + else + { + var typedInput = (TInput)input; + var key = _keySelector(typedInput); + lock (_stateStore) + { + var aggregate = _stateStore.Get(key); + aggregate = _aggregateFunction(aggregate, typedInput); + _stateStore.Put(key, aggregate); + } + } + + // we should not return the value from the state, continue the process further, state is just used to mutate + // for now we are commenting the next Operator. + //_nextOperator?.Process(new KeyValuePair(key, aggregate)); + + // Continue processing + _nextOperator?.Process(input); + } + + public void SetNext(IOperator nextOperator) + { + _nextOperator = nextOperator; + + // Propagate telemetry + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } + } + + public IEnumerable GetStateStores() + { + yield return _stateStore; + } + } +} diff --git a/src/Cortex.Streams/Operators/GroupByKeyOperator.cs b/src/Cortex.Streams/Operators/GroupByKeyOperator.cs index 49a99c2..d0f8e05 100644 --- a/src/Cortex.Streams/Operators/GroupByKeyOperator.cs +++ b/src/Cortex.Streams/Operators/GroupByKeyOperator.cs @@ -57,17 +57,19 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) public void Process(object input) { + + var typedInput = (TInput)input; + var key = _keySelector(typedInput); + List group; + if (_telemetryProvider != null) { - var stopwatch = Stopwatch.StartNew(); - using (var span = _tracer.StartSpan("GroupByKeyOperator.Process")) { + var stopwatch = Stopwatch.StartNew(); try { - var typedInput = (TInput)input; - var key = _keySelector(typedInput); - List group; + lock (_stateStore) { group = _stateStore.Get(key) ?? new List(); @@ -94,18 +96,15 @@ public void Process(object input) } else { - var typedInput = (TInput)input; - var key = _keySelector(typedInput); lock (_stateStore) { - var group = _stateStore.Get(key) ?? new List(); + group = _stateStore.Get(key) ?? new List(); group.Add(typedInput); _stateStore.Put(key, group); } } - // Continue processing - _nextOperator?.Process(input); + _nextOperator?.Process(new KeyValuePair>(key, group)); } public void SetNext(IOperator nextOperator) diff --git a/src/Cortex.Streams/Operators/GroupByKeySilentlyOperator.cs b/src/Cortex.Streams/Operators/GroupByKeySilentlyOperator.cs new file mode 100644 index 0000000..b982071 --- /dev/null +++ b/src/Cortex.Streams/Operators/GroupByKeySilentlyOperator.cs @@ -0,0 +1,129 @@ +using Cortex.States; +using Cortex.States.Operators; +using Cortex.Telemetry; +using System; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Cortex.Streams.Operators +{ + public class GroupByKeySilentlyOperator : IOperator, IStatefulOperator, ITelemetryEnabled + { + private readonly Func _keySelector; + private readonly IStateStore> _stateStore; + private IOperator _nextOperator; + + // Telemetry fields + private ITelemetryProvider _telemetryProvider; + private ICounter _processedCounter; + private IHistogram _processingTimeHistogram; + private ITracer _tracer; + private Action _incrementProcessedCounter; + private Action _recordProcessingTime; + + public GroupByKeySilentlyOperator(Func keySelector, IStateStore> stateStore) + { + _keySelector = keySelector; + _stateStore = stateStore; + } + + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) + { + _telemetryProvider = telemetryProvider; + + if (_telemetryProvider != null) + { + var metricsProvider = _telemetryProvider.GetMetricsProvider(); + _processedCounter = metricsProvider.CreateCounter($"groupby_operator_processed_{typeof(TInput).Name}", "Number of items processed by GroupByKeyOperator"); + _processingTimeHistogram = metricsProvider.CreateHistogram($"groupby_operator_processing_time_{typeof(TInput).Name}", "Processing time for GroupByKeyOperator"); + _tracer = _telemetryProvider.GetTracingProvider().GetTracer($"GroupByKeyOperator_{typeof(TInput).Name}"); + + // Cache delegates + _incrementProcessedCounter = () => _processedCounter.Increment(); + _recordProcessingTime = value => _processingTimeHistogram.Record(value); + } + else + { + _incrementProcessedCounter = null; + _recordProcessingTime = null; + } + + // Propagate telemetry + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } + } + + public void Process(object input) + { + + var typedInput = (TInput)input; + var key = _keySelector(typedInput); + List group; + + if (_telemetryProvider != null) + { + using (var span = _tracer.StartSpan("GroupByKeyOperator.Process")) + { + var stopwatch = Stopwatch.StartNew(); + try + { + + lock (_stateStore) + { + group = _stateStore.Get(key) ?? new List(); + group.Add(typedInput); + _stateStore.Put(key, group); + } + span.SetAttribute("key", key.ToString()); + span.SetAttribute("group_size", group.Count.ToString()); + span.SetAttribute("status", "success"); + } + catch (Exception ex) + { + span.SetAttribute("status", "error"); + span.SetAttribute("exception", ex.ToString()); + throw; + } + finally + { + stopwatch.Stop(); + _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); + _incrementProcessedCounter(); + } + } + } + else + { + lock (_stateStore) + { + group = _stateStore.Get(key) ?? new List(); + group.Add(typedInput); + _stateStore.Put(key, group); + } + } + + //_nextOperator?.Process(new KeyValuePair>(key, group)); + + // Continue processing + _nextOperator?.Process(input); + } + + public void SetNext(IOperator nextOperator) + { + _nextOperator = nextOperator; + + // Propagate telemetry + if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null) + { + nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); + } + } + + public IEnumerable GetStateStores() + { + yield return _stateStore; + } + } +} diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index 97099a9..b6571bd 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -258,7 +258,7 @@ public IStreamBuilder AddBranch(string name, Action GroupBy(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) + public IStreamBuilder GroupBySilently(Func keySelector, string stateStoreName = null, States.IStateStore> stateStore = null) { if (stateStore == null) { @@ -269,7 +269,7 @@ public IStreamBuilder GroupBy(Func keySelec stateStore = new InMemoryStateStore>(stateStoreName); } - var groupByOperator = new GroupByKeyOperator(keySelector, stateStore); + var groupByOperator = new GroupByKeySilentlyOperator(keySelector, stateStore); if (_firstOperator == null) { @@ -282,11 +282,10 @@ public IStreamBuilder GroupBy(Func keySelec _lastOperator = groupByOperator; } - //return new StreamBuilder>(_name, _firstOperator, _lastOperator, _sourceAdded); return new StreamBuilder(_name, _firstOperator, _lastOperator, _sourceAdded); } - public IStreamBuilder Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, States.IStateStore stateStore = null) + public IStreamBuilder AggregateSilently(Func keySelector, Func aggregateFunction, string stateStoreName = null, States.IStateStore stateStore = null) { //private readonly Func _keySelector if (stateStore == null) @@ -298,7 +297,7 @@ public IStreamBuilder Aggregate(Func(stateStoreName); } - var aggregateOperator = new AggregateOperator(keySelector, aggregateFunction, stateStore); + var aggregateOperator = new AggregateSilentlyOperator(keySelector, aggregateFunction, stateStore); if (_firstOperator == null) { @@ -315,6 +314,62 @@ public IStreamBuilder Aggregate(Func(_name, _firstOperator, _lastOperator, _sourceAdded); } + + public IStreamBuilder> GroupBy(Func keySelector, string stateStoreName = null, IStateStore> stateStore = null) + { + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"GroupByStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore>(stateStoreName); + } + + var groupByOperator = new GroupByKeyOperator(keySelector, stateStore); + + if (_firstOperator == null) + { + _firstOperator = groupByOperator; + _lastOperator = groupByOperator; + } + else + { + _lastOperator.SetNext(groupByOperator); + _lastOperator = groupByOperator; + } + + return new StreamBuilder>(_name, _firstOperator, _lastOperator, _sourceAdded); + } + + public IStreamBuilder> Aggregate(Func keySelector, Func aggregateFunction, string stateStoreName = null, IStateStore stateStore = null) + { + //private readonly Func _keySelector + if (stateStore == null) + { + if (string.IsNullOrEmpty(stateStoreName)) + { + stateStoreName = $"AggregateStateStore_{Guid.NewGuid()}"; + } + stateStore = new InMemoryStateStore(stateStoreName); + } + + var aggregateOperator = new AggregateOperator(keySelector, aggregateFunction, stateStore); + + if (_firstOperator == null) + { + _firstOperator = aggregateOperator; + _lastOperator = aggregateOperator; + } + else + { + _lastOperator.SetNext(aggregateOperator); + _lastOperator = aggregateOperator; + } + + return new StreamBuilder>(_name, _firstOperator, _lastOperator, _sourceAdded); + } + public IInitialStreamBuilder WithTelemetry(ITelemetryProvider telemetryProvider) { _telemetryProvider = telemetryProvider; diff --git a/src/Cortex.Telemetry.OpenTelemetry/Assets/license.md b/src/Cortex.Telemetry.OpenTelemetry/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Telemetry.OpenTelemetry/Assets/license.md +++ b/src/Cortex.Telemetry.OpenTelemetry/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/Cortex.Telemetry/Assets/license.md b/src/Cortex.Telemetry/Assets/license.md index 6413433..530f621 100644 --- a/src/Cortex.Telemetry/Assets/license.md +++ b/src/Cortex.Telemetry/Assets/license.md @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2022 Buildersoft +Copyright (c) 2024 Buildersoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in