Skip to content

TjTheDj2011/aiobreakers

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

aiobreakers

Async-first circuit breakers and retry policies for Python. Composable resilience patterns with decorators, event hooks, and real-time metrics.

Zero runtime dependencies — stdlib only.

Features

  • Circuit Breaker — CLOSED → OPEN → HALF_OPEN → CLOSED state machine
  • Retry Policy — 4 backoff strategies (exponential, full jitter, equal jitter, decorrelated)
  • Async + Sync — first-class support for both async def and regular functions
  • Decorators@breaker and @retry stack cleanly
  • Context Managersasync with breaker: and with breaker: protect code blocks
  • Event Hookson_open, on_close, on_retry for alerting and logging
  • Real-time Metrics — success rate, failure rate, trip count, avg latency
  • Exception Filtering — only trip/retry on specific exception types
  • Composable — stack retry + breaker for production-grade resilience
  • Thread-safeRLock-protected state for concurrent access

Installation

pip install aiobreakers

Or install from source:

git clone https://github.com/TjTheDj2011/aiobreakers.git
cd aiobreakers
pip install -e ".[dev]"

Quick Start

Circuit Breaker

from aiobreakers import CircuitBreaker, BreakerConfig, CircuitOpenError

breaker = CircuitBreaker("payment-api", BreakerConfig(
    failure_threshold=3,       # Trip after 3 consecutive failures
    recovery_timeout=30.0,     # Wait 30s before probing
    success_threshold=2,       # 2 successes in half-open to close
))

# As a decorator
@breaker
async def charge(amount: float) -> dict:
    return await payment_api.post("/charge", json={"amount": amount})

# As a context manager
async with breaker:
    result = await payment_api.get("/status")

# Programmatic
result = await breaker.call_async(payment_api.get, "/health")

Retry Policy

from aiobreakers import RetryPolicy, RetryConfig, BackoffStrategy

retry = RetryPolicy("api-retry", RetryConfig(
    max_retries=3,
    base_delay=1.0,
    max_delay=30.0,
    strategy=BackoffStrategy.FULL_JITTER,
    retryable_exceptions=(ConnectionError, TimeoutError),
))

@retry
async def fetch_data(url: str) -> dict:
    return await httpx.get(url).json()

Composed: Retry + Circuit Breaker

retry = RetryPolicy("api-retry", RetryConfig(
    max_retries=2,
    base_delay=0.5,
    retryable_exceptions=(ConnectionError, TimeoutError),
))

breaker = CircuitBreaker("api", BreakerConfig(failure_threshold=5))

@retry      # Outer: retries on transient errors
@breaker    # Inner: tracks failures, trips when threshold hit
async def resilient_call(endpoint: str) -> dict:
    return await api.get(endpoint)

When the breaker trips, it raises CircuitOpenError — which is not in retryable_exceptions, so the retry policy stops immediately instead of wasting attempts on a known-down service.

Circuit Breaker State Machine

CLOSED ──(failures >= threshold)──→ OPEN
   ↑                                  │
   │                          (recovery_timeout)
   │                                  ↓
   └──(successes >= threshold)── HALF_OPEN
              │
          (any failure)
              ↓
            OPEN
State Behavior
CLOSED All calls pass through. Failures are counted.
OPEN All calls immediately rejected with CircuitOpenError.
HALF_OPEN Limited probe calls allowed. Success → CLOSED, failure → OPEN.

Backoff Strategies

Strategy Formula Best For
EXPONENTIAL min(base * 2^attempt, max_delay) Predictable backoff
FULL_JITTER random(0, min(base * 2^attempt, max_delay)) Preventing thundering herd (recommended)
EQUAL_JITTER half + random(0, half) where half = ceiling/2 Balance between predictable and random
DECORRELATED random(base, prev_delay * 3) Correlated retry streams

Event Hooks

# Circuit breaker hooks
breaker = CircuitBreaker("api")
breaker.on_open(lambda b: alert(f"{b.name} tripped!"))
breaker.on_close(lambda b: log(f"{b.name} recovered"))
breaker.on_half_open(lambda b: log(f"{b.name} probing"))
breaker.on_success(lambda b: metrics.record("success"))
breaker.on_failure(lambda b: metrics.record("failure"))

# Retry hooks
retry = RetryPolicy("api-retry")
retry.on_retry(lambda attempt, err, delay:
    log(f"Retry {attempt}: {err}, waiting {delay:.1f}s"))

Hooks support both sync and async callbacks. Async callbacks are automatically scheduled as tasks on the running event loop.

Metrics

# Circuit breaker metrics
breaker.get_metrics()
# {
#     "name": "api",
#     "state": "closed",
#     "total_calls": 150,
#     "total_successes": 142,
#     "total_failures": 8,
#     "total_rejected": 3,
#     "total_trips": 1,
#     "success_rate": 0.947,
#     "failure_rate": 0.053,
#     "avg_call_duration_ms": 45.2,
#     "time_until_recovery": 0.0,
# }

# Retry metrics
retry.get_metrics()
# {
#     "name": "api-retry",
#     "total_retries": 12,
#     "total_exhausted": 1,
#     "config": { "max_retries": 3, ... },
# }

Exception Filtering

Circuit Breaker

# Only trip on these specific errors
breaker = CircuitBreaker("api", BreakerConfig(
    included_exceptions=(ConnectionError, TimeoutError),
))

# Trip on everything EXCEPT these
breaker = CircuitBreaker("api", BreakerConfig(
    excluded_exceptions=(ValueError, KeyError),
))

Retry Policy

# Only retry on these errors
retry = RetryPolicy("api", RetryConfig(
    retryable_exceptions=(ConnectionError, TimeoutError),
))

# Never retry on these (takes precedence)
retry = RetryPolicy("api", RetryConfig(
    non_retryable_exceptions=(AuthenticationError,),
))

Manual Control

breaker.trip()    # Force-open the circuit
breaker.reset()   # Force-close and reset all counters

Sync Support

Every API works without asyncio:

@breaker
def sync_function():
    return requests.get("/api")

with breaker:
    result = db.query("SELECT 1")

result = retry.call_sync(requests.get, "/api")

API Reference

CircuitBreaker(name, config=None, fallback=None)

Parameter Type Default Description
name str required Identifier for logging and metrics
config BreakerConfig defaults Configuration
fallback Callable None Called when circuit is open

BreakerConfig

Field Type Default Description
failure_threshold int 5 Consecutive failures to trip
success_threshold int 3 Successes in half-open to close
recovery_timeout float 60.0 Seconds before probing
included_exceptions tuple () Only count these as failures
excluded_exceptions tuple () Never count these as failures
half_open_max_calls int 1 Max concurrent half-open probes

RetryPolicy(name, config=None)

Parameter Type Default Description
name str "retry" Identifier for logging and metrics
config RetryConfig defaults Configuration

RetryConfig

Field Type Default Description
max_retries int 3 Maximum retry attempts
base_delay float 1.0 Base delay in seconds
max_delay float 60.0 Maximum delay cap
strategy BackoffStrategy FULL_JITTER Backoff calculation strategy
retryable_exceptions tuple () Only retry on these (empty = all)
non_retryable_exceptions tuple () Never retry on these

Requirements

  • Python 3.10+
  • No runtime dependencies

License

MIT

About

Async-first circuit breakers and retry policies for Python. Zero dependencies.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages