Skip to content

[Feature]: Add support for CircuitBreaker settings in Streams using Polly #136

@eneshoxha

Description

@eneshoxha

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like.
We would like to add support for CircuitBreaker using Polly within the Cortex.Streams by creating a new Operator. This implementation should be done with the new library Cortex.Streams.Polly.

The proposed solution:

// CircuitBreakerOperator.cs
using Cortex.Streams.Operators;
using Polly;
using System;

namespace Cortex.Streams.Operators
{
    public class CircuitBreakerOperator<T> : IOperator, ITelemetryEnabled
    {
        private readonly IOperator _nextOperator;
        private readonly AsyncCircuitBreakerPolicy _circuitBreaker;

        public CircuitBreakerOperator(IOperator nextOperator)
        {
            _nextOperator = nextOperator;
            _circuitBreaker = Policy
                .Handle<Exception>()
                .CircuitBreakerAsync(
                    exceptionsAllowedBeforeBreaking: 3,
                    durationOfBreak: TimeSpan.FromMinutes(1));
        }

        public async void Process(object input)
        {
            await _circuitBreaker.ExecuteAsync(() => 
            {
                _nextOperator.Process(input);
                return Task.CompletedTask;
            });
        }

        public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) 
            => (_nextOperator as ITelemetryEnabled)?.SetTelemetryProvider(telemetryProvider);
    }
}

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestfeatureThis label is in use for minor version increments

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions