Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package circuitbreaker

import (
"fmt"
"sync"
"time"

"github.com/simiancreative/simiango/logger"
)

type State int

const (
StateClosed State = iota
StateOpen
StateHalfOpen
)

func (s State) String() string {
switch s {
case StateClosed:
return "CLOSED"
case StateOpen:
return "OPEN"
case StateHalfOpen:
return "HALF_OPEN"
default:
return "UNKNOWN"

Check warning on line 28 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}
}

type Config struct {
FailureThreshold int
OpenTimeout time.Duration
HalfOpenMaxCalls int
OnStateChange func(from, to State)
}

type CircuitBreaker struct {
config Config
state State
failures int
attempts int
successes int
mutex sync.RWMutex
timer *time.Timer
}

func New(config Config) (*CircuitBreaker, error) {
if err := validateConfig(config); err != nil {
return nil, err
}

Check warning on line 52 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L51-L52

Added lines #L51 - L52 were not covered by tests

logger.Debug("creating new circuit breaker", logger.Fields{
"failure_threshold": config.FailureThreshold,
"open_timeout": config.OpenTimeout.String(),
"half_open_max_calls": config.HalfOpenMaxCalls,
})

return &CircuitBreaker{
config: config,
state: StateClosed,
}, nil
}

func (cb *CircuitBreaker) Allow() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()

allowed := false
switch cb.state {
case StateOpen:
allowed = false
case StateHalfOpen:
allowed = cb.attempts < cb.config.HalfOpenMaxCalls

Check warning on line 75 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L72-L75

Added lines #L72 - L75 were not covered by tests
default:
allowed = true
}

logger.Debug("circuit breaker allow check", logger.Fields{
"state": cb.state.String(),
"allowed": allowed,
"attempts": cb.attempts,
"max_calls": cb.config.HalfOpenMaxCalls,
})

return allowed
}

func (cb *CircuitBreaker) GetState() State {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}

// RecordStart marks the beginning of an attempt
func (cb *CircuitBreaker) RecordStart() bool {
cb.mutex.Lock()
defer cb.mutex.Unlock()

switch cb.state {
case StateOpen:
logger.Debug("attempt rejected - circuit open", logger.Fields{
"state": cb.state.String(),
})
return false
case StateHalfOpen:
if cb.attempts >= cb.config.HalfOpenMaxCalls {
logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{
"attempts": cb.attempts,
"max_calls": cb.config.HalfOpenMaxCalls,
})
return false
}
}

cb.attempts++
logger.Debug("attempt started", logger.Fields{
"state": cb.state.String(),
"attempts": cb.attempts,
})
return true
}

// RecordResult records the result of an attempt
func (cb *CircuitBreaker) RecordResult(success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

logger.Debug("recording attempt result", logger.Fields{
"success": success,
"state": cb.state.String(),
"attempts": cb.attempts,
"successes": cb.successes,
"failures": cb.failures,
})

if !success {
cb.recordFailure()
return
}

switch cb.state {
case StateHalfOpen:
cb.successes++
logger.Debug("recorded success in half-open state", logger.Fields{
"attempts": cb.attempts,
"successes": cb.successes,
"max_calls": cb.config.HalfOpenMaxCalls,
})
if cb.successes >= cb.config.HalfOpenMaxCalls {
cb.transitionTo(StateClosed)
}
case StateClosed:
cb.failures = 0
logger.Debug("recorded success in closed state", logger.Fields{
"failures": cb.failures,
})

Check warning on line 158 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L154-L158

Added lines #L154 - L158 were not covered by tests
}
}

func (cb *CircuitBreaker) Reset() {
cb.mutex.Lock()
defer cb.mutex.Unlock()

logger.Debug("resetting circuit breaker", logger.Fields{
"from_state": cb.state.String(),
})

if cb.timer != nil {
cb.timer.Stop()
}

Check warning on line 172 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L162-L172

Added lines #L162 - L172 were not covered by tests

cb.transitionTo(StateClosed)
cb.failures = 0
cb.attempts = 0
cb.successes = 0

Check warning on line 177 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L174-L177

Added lines #L174 - L177 were not covered by tests
}

func (cb *CircuitBreaker) recordFailure() {
cb.failures++

logger.Debug("recorded failure", logger.Fields{
"state": cb.state.String(),
"failures": cb.failures,
"threshold": cb.config.FailureThreshold,
})

shouldOpen := cb.state == StateHalfOpen ||
(cb.state == StateClosed && cb.failures >= cb.config.FailureThreshold)

if !shouldOpen {
return
}

cb.openCircuit()
}

func (cb *CircuitBreaker) openCircuit() {
logger.Debug("opening circuit", logger.Fields{
"from_state": cb.state.String(),
"open_timeout": cb.config.OpenTimeout.String(),
})

if cb.timer != nil {
cb.timer.Stop()
}

Check warning on line 207 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L206-L207

Added lines #L206 - L207 were not covered by tests

cb.transitionTo(StateOpen)

cb.timer = time.AfterFunc(cb.config.OpenTimeout, func() {
cb.mutex.Lock()
defer cb.mutex.Unlock()

logger.Debug("open timeout elapsed", logger.Fields{
"current_state": cb.state.String(),
})

if cb.state == StateOpen {
cb.transitionTo(StateHalfOpen)
}
})
}

func (cb *CircuitBreaker) transitionTo(newState State) {
if cb.state == newState {
return
}

Check warning on line 228 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L227-L228

Added lines #L227 - L228 were not covered by tests

oldState := cb.state
cb.state = newState
cb.attempts = 0
cb.successes = 0

logger.Debug("state transition", logger.Fields{
"from_state": oldState.String(),
"to_state": newState.String(),
"attempts": cb.attempts,
"successes": cb.successes,
})

if cb.config.OnStateChange != nil {
go cb.config.OnStateChange(oldState, newState)
}

Check warning on line 244 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L243-L244

Added lines #L243 - L244 were not covered by tests
}

func validateConfig(config Config) error {
if config.FailureThreshold <= 0 {
return fmt.Errorf("failure threshold must be greater than 0")
}

Check warning on line 250 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L249-L250

Added lines #L249 - L250 were not covered by tests
if config.OpenTimeout <= 0 {
return fmt.Errorf("open timeout must be greater than 0")
}

Check warning on line 253 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L252-L253

Added lines #L252 - L253 were not covered by tests
if config.HalfOpenMaxCalls <= 0 {
return fmt.Errorf("half-open max calls must be greater than 0")
}

Check warning on line 256 in circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuitbreaker.go#L255-L256

Added lines #L255 - L256 were not covered by tests
return nil
}
Loading