diff --git a/.spelling b/.spelling
index e9ceb2a8..0c96de03 100644
--- a/.spelling
+++ b/.spelling
@@ -137,6 +137,7 @@ interop
Interop
interoperability
interoperate
+jitter
IOCP
IP
Kubernetes
@@ -181,6 +182,7 @@ non-mockable
ns
nuget
NUMA
+observability
ohno
ok
Ok
@@ -212,6 +214,7 @@ repo
repos
Reqwest
Reusability
+RPC
runtime
runtimes
rustc
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9862cfa6..9a9ad24d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,7 @@ Please see each crate's change log below:
- [`ohno`](./crates/ohno/CHANGELOG.md)
- [`ohno_macros`](./crates/ohno_macros/CHANGELOG.md)
- [`recoverable`](./crates/recoverable/CHANGELOG.md)
+- [`seatbelt`](./crates/seatbelt/CHANGELOG.md)
- [`thread_aware`](./crates/thread_aware/CHANGELOG.md)
- [`thread_aware_macros`](./crates/thread_aware_macros/CHANGELOG.md)
- [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/CHANGELOG.md)
diff --git a/Cargo.lock b/Cargo.lock
index ad68e1cc..60a7650a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -598,6 +598,16 @@ dependencies = [
"stable_deref_trait",
]
+[[package]]
+name = "http"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
+dependencies = [
+ "bytes",
+ "itoa",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.64"
@@ -944,6 +954,47 @@ version = "11.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
+[[package]]
+name = "opentelemetry"
+version = "0.31.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+ "js-sys",
+ "pin-project-lite",
+ "thiserror 2.0.17",
+]
+
+[[package]]
+name = "opentelemetry-stdout"
+version = "0.31.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811"
+dependencies = [
+ "chrono",
+ "opentelemetry",
+ "opentelemetry_sdk",
+]
+
+[[package]]
+name = "opentelemetry_sdk"
+version = "0.31.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd"
+dependencies = [
+ "futures-channel",
+ "futures-executor",
+ "futures-util",
+ "opentelemetry",
+ "percent-encoding",
+ "rand 0.9.2",
+ "thiserror 2.0.17",
+ "tokio",
+ "tokio-stream",
+]
+
[[package]]
name = "os_pipe"
version = "1.2.3"
@@ -992,6 +1043,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35fb2e5f958ec131621fdd531e9fc186ed768cbe395337403ae56c17a74c68ec"
+[[package]]
+name = "percent-encoding"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
+
[[package]]
name = "phf"
version = "0.11.3"
@@ -1298,6 +1355,29 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+[[package]]
+name = "seatbelt"
+version = "0.2.0"
+dependencies = [
+ "alloc_tracker",
+ "criterion",
+ "fastrand",
+ "futures",
+ "http",
+ "layered",
+ "mutants",
+ "ohno",
+ "opentelemetry",
+ "opentelemetry-stdout",
+ "opentelemetry_sdk",
+ "recoverable",
+ "static_assertions",
+ "tick",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
[[package]]
name = "semver"
version = "1.0.27"
@@ -1682,6 +1762,17 @@ dependencies = [
"syn",
]
+[[package]]
+name = "tokio-stream"
+version = "0.1.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "tokio-util"
version = "0.7.17"
diff --git a/Cargo.toml b/Cargo.toml
index fb58951b..975995d4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,6 +36,7 @@ layered = { path = "crates/layered", default-features = false, version = "0.3.0"
ohno = { path = "crates/ohno", default-features = false, version = "0.2.1" }
ohno_macros = { path = "crates/ohno_macros", default-features = false, version = "0.2.0" }
recoverable = { path = "crates/recoverable", default-features = false, version = "0.1.0" }
+seatbelt = { path = "crates/seatbelt", default-features = false, version = "0.2.0" }
testing_aids = { path = "crates/testing_aids", default-features = false }
thread_aware = { path = "crates/thread_aware", default-features = false, version = "0.6.1" }
thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.6.1" }
@@ -51,9 +52,11 @@ criterion = { version = "0.7.0", default-features = false }
derive_more = { version = "2.0.1", default-features = false }
duct = { version = "1.1.1", default-features = false }
dynosaur = { version = "0.3.0", default-features = false }
+fastrand = { version = "2.3.0", default-features = false, features = ["std"] }
futures = { version = "0.3.31", default-features = false }
futures-core = { version = "0.3.31", default-features = false }
futures-util = { version = "0.3.31", default-features = false }
+http = { version = "1.2.0", default-features = false, features = ["std"] }
infinity_pool = { version = "0.8.1", default-features = false }
insta = { version = "1.44.1", default-features = false }
jiff = { version = "0.2.16", default-features = false }
@@ -65,6 +68,9 @@ new_zealand = { version = "1.0.1", default-features = false }
nm = { version = "0.1.21", default-features = false }
num-traits = { version = "0.2.19", default-features = false }
once_cell = { version = "1.21.3", default-features = false }
+opentelemetry = { version = "0.31.0", default-features = false }
+opentelemetry-stdout = { version = "0.31.0", default-features = false }
+opentelemetry_sdk = { version = "0.31.0", default-features = false }
pin-project-lite = { version = "0.2.13", default-features = false }
pretty_assertions = { version = "1.4.1", default-features = false }
prettyplease = { version = "0.2.37", default-features = false }
diff --git a/README.md b/README.md
index d7f6ea59..31896942 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,7 @@ These are the primary crates built out of this repo:
- [`layered`](./crates/layered/README.md) - A foundational service abstraction for building composable, middleware-driven systems.
- [`ohno`](./crates/ohno/README.md) - High-quality Rust error handling.
- [`recoverable`](./crates/recoverable/README.md) - Recovery information and classification for resilience patterns.
+- [`seatbelt`](./crates/seatbelt/README.md) - Resilience and recovery mechanisms for fallible operations.
- [`thread_aware`](./crates/thread_aware/README.md) - Facilities to support thread-isolated state.
- [`tick`](./crates/tick/README.md) - Provides primitives to interact with and manipulate machine time.
diff --git a/crates/seatbelt/CHANGELOG.md b/crates/seatbelt/CHANGELOG.md
new file mode 100644
index 00000000..cc03dcc8
--- /dev/null
+++ b/crates/seatbelt/CHANGELOG.md
@@ -0,0 +1,14 @@
+# Changelog
+
+## [0.2.0] - 2026-01-20
+
+Initial release.
+
+- ✨ Features
+
+ - Timeout middleware for canceling long-running operations
+ - Retry middleware with constant, linear, and exponential backoff strategies
+ - Circuit breaker middleware with health-based failure detection and gradual recovery
+ - OpenTelemetry metrics integration (`metrics` feature)
+ - Structured logging via tracing (`logs` feature)
+ - Shared `Context` for clock and telemetry configuration
diff --git a/crates/seatbelt/Cargo.toml b/crates/seatbelt/Cargo.toml
new file mode 100644
index 00000000..60add28c
--- /dev/null
+++ b/crates/seatbelt/Cargo.toml
@@ -0,0 +1,118 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+[package]
+name = "seatbelt"
+description = "Resilience and recovery mechanisms for fallible operations."
+version = "0.2.0"
+readme = "README.md"
+keywords = ["oxidizer", "resilience", "layered", "recovery", "retry", "circuit-breaker"]
+categories = ["data-structures"]
+
+edition.workspace = true
+rust-version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+
+[package.metadata.cargo_check_external_types]
+allowed_external_types = [
+ "layered::layer::stack::Stack",
+ "layered::service::Service",
+ "opentelemetry::metrics::meter::MeterProvider",
+ "recoverable::Recovery",
+ "recoverable::RecoveryInfo",
+ "recoverable::RecoveryKind",
+ "tick::clock::Clock",
+ "tower_layer::Layer",
+]
+
+[package.metadata.docs.rs]
+all-features = true
+
+[features]
+default = []
+timeout = []
+retry = ["dep:fastrand"]
+circuit-breaker = ["dep:fastrand"]
+metrics = ["dep:opentelemetry", "opentelemetry/metrics"]
+logs = ["dep:tracing"]
+
+[dependencies]
+fastrand = { workspace = true, optional = true }
+futures = { workspace = true }
+layered = { workspace = true }
+opentelemetry = { workspace = true, optional = true }
+recoverable = { workspace = true }
+tick = { workspace = true }
+tracing = { workspace = true, optional = true }
+
+[dev-dependencies]
+alloc_tracker.workspace = true
+criterion.workspace = true
+fastrand.workspace = true
+futures = { workspace = true, features = ["executor"] }
+http.workspace = true
+layered = { workspace = true }
+mutants.workspace = true
+ohno = { workspace = true, features = ["app-err"] }
+opentelemetry = { workspace = true, default-features = false, features = ["metrics"] }
+opentelemetry-stdout = { workspace = true, default-features = false, features = ["metrics", "logs"] }
+opentelemetry_sdk = { workspace = true, default-features = false, features = ["metrics", "testing", "experimental_metrics_custom_reader"] }
+static_assertions.workspace = true
+tick = { workspace = true, features = ["test-util", "tokio"] }
+tokio = { workspace = true, features = ["rt", "macros"] }
+tracing.workspace = true
+tracing-subscriber = { workspace = true, features = ["fmt", "std"] }
+
+[[example]]
+name = "timeout"
+required-features = ["timeout"]
+
+[[example]]
+name = "timeout_advanced"
+required-features = ["timeout"]
+
+[[example]]
+name = "retry"
+required-features = ["retry"]
+
+[[example]]
+name = "retry_advanced"
+required-features = ["retry"]
+
+[[example]]
+name = "retry_outage"
+required-features = ["retry"]
+
+[[example]]
+name = "resilience_pipeline"
+required-features = ["retry", "timeout"]
+
+[[example]]
+name = "circuit_breaker"
+required-features = ["circuit-breaker", "metrics"]
+
+[[bench]]
+name = "observability"
+harness = false
+required-features = ["retry", "logs", "metrics"]
+
+[[bench]]
+name = "timeout"
+harness = false
+required-features = ["timeout"]
+
+[[bench]]
+name = "retry"
+harness = false
+required-features = ["retry"]
+
+[[bench]]
+name = "circuit_breaker"
+harness = false
+required-features = ["circuit-breaker"]
+
+[lints]
+workspace = true
diff --git a/crates/seatbelt/README.md b/crates/seatbelt/README.md
new file mode 100644
index 00000000..3e97fe98
--- /dev/null
+++ b/crates/seatbelt/README.md
@@ -0,0 +1,124 @@
+
+

+
+# Seatbelt
+
+[](https://crates.io/crates/seatbelt)
+[](https://docs.rs/seatbelt)
+[](https://crates.io/crates/seatbelt)
+[](https://github.com/microsoft/oxidizer/actions/workflows/main.yml)
+[](https://codecov.io/gh/microsoft/oxidizer)
+[](../../LICENSE)
+

+
+
+
+Resilience and recovery mechanisms for fallible operations.
+
+## Quick Start
+
+Add resilience to fallible operations, such as RPC calls over the network, with just a few lines of code.
+**Retry** handles transient failures and **Timeout** prevents operations from hanging indefinitely:
+
+```rust
+use seatbelt::retry::Retry;
+use seatbelt::timeout::Timeout;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+
+let context = ResilienceContext::new(&clock);
+let service = (
+ // Retry middleware: Automatically retries failed operations
+ Retry::layer("retry", &context)
+ .clone_input()
+ .recovery_with(|output: &String, _| match output.as_str() {
+ "temporary_error" => RecoveryInfo::retry(),
+ "operation timed out" => RecoveryInfo::retry(),
+ _ => RecoveryInfo::never(),
+ }),
+ // Timeout middleware: Cancels operations that take too long
+ Timeout::layer("timeout", &context)
+ .timeout_output(|_| "operation timed out".to_string())
+ .timeout(Duration::from_secs(30)),
+ // Your core business logic
+ Execute::new(my_string_operation),
+)
+ .into_service();
+
+let result = service.execute("input data".to_string()).await;
+```
+
+## Why?
+
+Communicating over a network is inherently fraught with problems. The network can go down at any time,
+sometimes for a millisecond or two. The endpoint you’re connecting to may crash or be rebooted,
+network configuration may change from under you, etc. To deliver a robust experience to users, and to
+achieve `5` or more `9s` of availability, it is imperative to implement robust resilience patterns to
+mask these transient failures.
+
+This crate provides production-ready resilience middleware with excellent telemetry for building
+robust distributed systems that can automatically handle timeouts, retries, and other failure
+scenarios.
+
+* **Production-ready** - Battle-tested middleware with sensible defaults and comprehensive
+ configuration options.
+* **Excellent telemetry** - Built-in support for metrics and structured logging to monitor
+ resilience behavior in production.
+* **Runtime agnostic** - Works seamlessly across any async runtime. Use the same resilience
+ patterns across different projects and migrate between runtimes without changes.
+
+## Overview
+
+This crate uses the [`layered`][__link0] crate for composing middleware. The middleware layers
+can be stacked together using tuples and built into a service using the [`Stack`][__link1] trait.
+
+Resilience middleware also requires [`Clock`][__link2] from the [`tick`][__link3] crate for timing
+operations like delays, timeouts, and backoff calculations. The clock is passed through
+[`ResilienceContext`][__link4] when creating middleware layers.
+
+### Core Types
+
+* [`ResilienceContext`][__link5] - Holds shared state for resilience middleware, including the clock.
+* [`RecoveryInfo`][__link6] - Classifies errors as recoverable (transient) or non-recoverable (permanent).
+* [`Recovery`][__link7] - A trait for types that can determine their recoverability.
+
+### Built-in Middleware
+
+This crate provides built-in resilience middleware that you can use out of the box. See the documentation
+for each module for details on how to use them.
+
+* [`timeout`][__link8] - Middleware that cancels long-running operations.
+* [`retry`][__link9] - Middleware that automatically retries failed operations.
+* [`circuit_breaker`][__link10] - Middleware that prevents cascading failures.
+
+## Features
+
+This crate provides several optional features that can be enabled in your `Cargo.toml`:
+
+* **`timeout`** - Enables the [`timeout`][__link11] middleware for canceling long-running operations.
+* **`retry`** - Enables the [`retry`][__link12] middleware for automatically retrying failed operations with
+ configurable backoff strategies, jitter, and recovery classification.
+* **`circuit-breaker`** - Enables the [`circuit_breaker`][__link13] middleware for preventing cascading failures.
+* **`metrics`** - Exposes the OpenTelemetry metrics API for collecting and reporting metrics.
+* **`logs`** - Enables structured logging for resilience middleware using the `tracing` crate.
+
+
+
+
+This crate was developed as part of The Oxidizer Project. Browse this crate's source code.
+
+
+ [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG5bw60hngnQRG76QZSWWI79pG7-oEvcoPz3VGzVGNfifez53YWSEgmdsYXllcmVkZTAuMy4wgmtyZWNvdmVyYWJsZWUwLjEuMIJoc2VhdGJlbHRlMC4yLjCCZHRpY2tlMC4xLjI
+ [__link0]: https://crates.io/crates/layered/0.3.0
+ [__link1]: https://docs.rs/layered/0.3.0/layered/?search=Stack
+ [__link10]: https://docs.rs/seatbelt/0.2.0/seatbelt/circuit_breaker/index.html
+ [__link11]: https://docs.rs/seatbelt/0.2.0/seatbelt/timeout/index.html
+ [__link12]: https://docs.rs/seatbelt/0.2.0/seatbelt/retry/index.html
+ [__link13]: https://docs.rs/seatbelt/0.2.0/seatbelt/circuit_breaker/index.html
+ [__link2]: https://docs.rs/tick/0.1.2/tick/?search=Clock
+ [__link3]: https://crates.io/crates/tick/0.1.2
+ [__link4]: https://docs.rs/seatbelt/0.2.0/seatbelt/?search=ResilienceContext
+ [__link5]: https://docs.rs/seatbelt/0.2.0/seatbelt/?search=ResilienceContext
+ [__link6]: https://docs.rs/recoverable/0.1.0/recoverable/?search=RecoveryInfo
+ [__link7]: https://docs.rs/recoverable/0.1.0/recoverable/?search=Recovery
+ [__link8]: https://docs.rs/seatbelt/0.2.0/seatbelt/timeout/index.html
+ [__link9]: https://docs.rs/seatbelt/0.2.0/seatbelt/retry/index.html
diff --git a/crates/seatbelt/benches/circuit_breaker.rs b/crates/seatbelt/benches/circuit_breaker.rs
new file mode 100644
index 00000000..6cbe96a4
--- /dev/null
+++ b/crates/seatbelt/benches/circuit_breaker.rs
@@ -0,0 +1,60 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+#![expect(missing_docs, reason = "benchmark code")]
+use alloc_tracker::{Allocator, Session};
+use criterion::{Criterion, criterion_group, criterion_main};
+use futures::executor::block_on;
+use layered::{Execute, Service, Stack};
+use seatbelt::circuit_breaker::Circuit;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+
+#[global_allocator]
+static ALLOCATOR: Allocator = Allocator::system();
+
+fn entry(c: &mut Criterion) {
+ let mut group = c.benchmark_group("circuit_breaker");
+ let session = Session::new();
+
+ // No circuit breaker
+ let service = Execute::new(|_input: Input| async move { Output });
+ let operation = session.operation("no-circuit-breaker");
+ group.bench_function("no-circuit-breaker", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // With circuit breaker (closed state)
+ let context = ResilienceContext::new(Clock::new_frozen());
+
+ let service = (
+ Circuit::layer("bench", &context)
+ .recovery_with(|_, _| RecoveryInfo::never())
+ .rejected_input_error(|_input, _args| Output)
+ .min_throughput(1000), // High threshold to keep circuit closed
+ Execute::new(|_input: Input| async move { Ok(Output) }),
+ )
+ .into_service();
+
+ let operation = session.operation("with-circuit-breaker");
+ group.bench_function("with-circuit-breaker", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ group.finish();
+ session.print_to_stdout();
+}
+
+criterion_group!(benches, entry);
+criterion_main!(benches);
+
+#[derive(Debug, Clone)]
+struct Input;
+
+#[derive(Debug, Clone)]
+struct Output;
diff --git a/crates/seatbelt/benches/observability.rs b/crates/seatbelt/benches/observability.rs
new file mode 100644
index 00000000..a618d54f
--- /dev/null
+++ b/crates/seatbelt/benches/observability.rs
@@ -0,0 +1,117 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+#![expect(missing_docs, reason = "benchmark code")]
+use std::time::Duration;
+
+use alloc_tracker::{Allocator, Session};
+use criterion::{Criterion, criterion_group, criterion_main};
+use futures::executor::block_on;
+use layered::{Execute, Service, Stack};
+use opentelemetry_sdk::error::OTelSdkResult;
+use opentelemetry_sdk::metrics::data::ResourceMetrics;
+use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
+use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality};
+use seatbelt::retry::Retry;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+
+#[global_allocator]
+static ALLOCATOR: Allocator = Allocator::system();
+
+fn entry(c: &mut Criterion) {
+ let mut group = c.benchmark_group("observability");
+ let session = Session::new();
+
+ // No telemetry
+ let context = ResilienceContext::new(Clock::new_frozen());
+ let service = (
+ Retry::layer("bench", &context)
+ .clone_input()
+ .base_delay(Duration::ZERO)
+ .recovery_with(|_, _| RecoveryInfo::retry()),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+ let operation = session.operation("retry-no-telemetry");
+ group.bench_function("retry-no-telemetry", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // Metrics
+ let meter_provider = SdkMeterProvider::builder().with_periodic_exporter(EmptyExporter).build();
+ let context = ResilienceContext::new(Clock::new_frozen()).enable_metrics(&meter_provider);
+ let service = (
+ Retry::layer("bench", &context)
+ .clone_input()
+ .base_delay(Duration::ZERO)
+ .recovery_with(|_, _| RecoveryInfo::retry()),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+ let operation = session.operation("retry-metrics");
+ group.bench_function("retry-metrics", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // Logs
+ let context = ResilienceContext::new(Clock::new_frozen()).enable_logs();
+ let service = (
+ Retry::layer("bench", &context)
+ .clone_input()
+ .base_delay(Duration::ZERO)
+ .recovery_with(|_, _| RecoveryInfo::retry()),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+ let operation = session.operation("retry-logs");
+ group.bench_function("retry-logs", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ group.finish();
+ session.print_to_stdout();
+}
+
+criterion_group!(benches, entry);
+criterion_main!(benches);
+
+#[derive(Debug, Clone)]
+struct Input;
+
+#[derive(Debug, Clone)]
+struct Output;
+
+impl From for Output {
+ fn from(_input: Input) -> Self {
+ Self
+ }
+}
+
+struct EmptyExporter;
+
+impl PushMetricExporter for EmptyExporter {
+ async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
+ Ok(())
+ }
+
+ fn force_flush(&self) -> OTelSdkResult {
+ Ok(())
+ }
+
+ fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
+ Ok(())
+ }
+
+ fn temporality(&self) -> Temporality {
+ Temporality::Cumulative
+ }
+}
diff --git a/crates/seatbelt/benches/retry.rs b/crates/seatbelt/benches/retry.rs
new file mode 100644
index 00000000..e7a12bdb
--- /dev/null
+++ b/crates/seatbelt/benches/retry.rs
@@ -0,0 +1,89 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+#![expect(missing_docs, reason = "benchmark code")]
+
+use std::time::Duration;
+
+use alloc_tracker::{Allocator, Session};
+use criterion::{Criterion, criterion_group, criterion_main};
+use futures::executor::block_on;
+use layered::{Execute, Service, Stack};
+use seatbelt::retry::Retry;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+
+#[global_allocator]
+static ALLOCATOR: Allocator = Allocator::system();
+
+fn entry(c: &mut Criterion) {
+ let mut group = c.benchmark_group("retry");
+ let session = Session::new();
+
+ // No retries
+ let service = Execute::new(|v: Input| async move { Output::from(v) });
+ let operation = session.operation("no-retry");
+ group.bench_function("no-retry", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // With retry
+ let context = ResilienceContext::new(Clock::new_frozen());
+
+ let service = (
+ Retry::layer("bench", &context)
+ .clone_input()
+ .recovery_with(|_, _| RecoveryInfo::never()),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+
+ let operation = session.operation("with-retry");
+ group.bench_function("with-retry", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // With retry and recovery
+ let context = ResilienceContext::new(Clock::new_frozen());
+
+ let service = (
+ Retry::layer("bench", &context)
+ .clone_input()
+ .max_retry_attempts(1)
+ .base_delay(Duration::ZERO)
+ .recovery_with(|_, _| RecoveryInfo::retry()),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+
+ let operation = session.operation("with-retry-and-recovery");
+ group.bench_function("with-retry-and-recovery", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ group.finish();
+ session.print_to_stdout();
+}
+
+criterion_group!(benches, entry);
+criterion_main!(benches);
+
+#[derive(Debug, Clone)]
+struct Input;
+
+#[derive(Debug, Clone)]
+struct Output;
+
+impl From for Output {
+ fn from(_input: Input) -> Self {
+ Self
+ }
+}
diff --git a/crates/seatbelt/benches/timeout.rs b/crates/seatbelt/benches/timeout.rs
new file mode 100644
index 00000000..25ae351c
--- /dev/null
+++ b/crates/seatbelt/benches/timeout.rs
@@ -0,0 +1,66 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+#![expect(missing_docs, reason = "benchmark code")]
+
+use std::time::Duration;
+
+use alloc_tracker::{Allocator, Session};
+use criterion::{Criterion, criterion_group, criterion_main};
+use futures::executor::block_on;
+use layered::{Execute, Service, Stack};
+use seatbelt::ResilienceContext;
+use seatbelt::timeout::Timeout;
+use tick::Clock;
+
+#[global_allocator]
+static ALLOCATOR: Allocator = Allocator::system();
+
+fn entry(c: &mut Criterion) {
+ let mut group = c.benchmark_group("timeout");
+ let session = Session::new();
+
+ // No timeout
+ let service = Execute::new(|v: Input| async move { Output::from(v) });
+ let operation = session.operation("no-timeout");
+ group.bench_function("no-timeout", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ // With timeout
+ let context = ResilienceContext::new(Clock::new_frozen());
+
+ let service = (
+ Timeout::layer("bench", &context)
+ .timeout_output(|_args| Output)
+ .timeout(Duration::from_secs(10)),
+ Execute::new(|v: Input| async move { Output::from(v) }),
+ )
+ .into_service();
+
+ let operation = session.operation("with-timeout");
+ group.bench_function("with-timeout", |b| {
+ b.iter(|| {
+ let _span = operation.measure_thread();
+ _ = block_on(service.execute(Input));
+ });
+ });
+
+ group.finish();
+ session.print_to_stdout();
+}
+
+criterion_group!(benches, entry);
+criterion_main!(benches);
+
+struct Input;
+
+struct Output;
+
+impl From for Output {
+ fn from(_input: Input) -> Self {
+ Self
+ }
+}
diff --git a/crates/seatbelt/examples/circuit_breaker.rs b/crates/seatbelt/examples/circuit_breaker.rs
new file mode 100644
index 00000000..a8f65007
--- /dev/null
+++ b/crates/seatbelt/examples/circuit_breaker.rs
@@ -0,0 +1,100 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Circuit breaker example that simulates a major service outage and tripping of the
+//! circuit breaker by:
+//!
+//! 1. Monitoring failure rates in real-time
+//! 2. Opening the circuit when failure thresholds are exceeded
+//! 3. Allowing probe requests to test service recovery
+//! 4. Automatically closing the circuit when the service recovers
+
+use std::time::Duration;
+
+use layered::{Execute, Service, Stack};
+use ohno::AppError;
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::circuit_breaker::Circuit;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+ let context = ResilienceContext::new(&clock).enable_metrics(&meter_provider);
+
+ // Define stack with circuit breaker layer
+ let stack = (
+ Circuit::layer("my_circuit_breaker", &context)
+ // Required: classify the recoverability of outputs
+ .recovery_with(|output, _args| match output {
+ Ok(_) => RecoveryInfo::never(),
+ Err(_) => RecoveryInfo::retry(),
+ })
+ // Required: provide output when circuit is open
+ .rejected_input_error(|input, _args| format!("rejecting execution of '{input}' because circuit is open"))
+ // Decrease the following values to see the circuit breaker trip faster
+ // and speed-up the example
+ .sampling_duration(Duration::from_secs(2))
+ .min_throughput(5)
+ .break_duration(Duration::from_secs(2))
+ .on_probing(|_, _| println!("probing input let in to see if the service has recovered"))
+ .on_opened(|_, _| println!("circuit opened due to exceeding failure threshold"))
+ .on_closed(|_, args| {
+ println!(
+ "circuit closed because probing succeeded, opened for: {}s",
+ args.open_duration().as_secs()
+ );
+ }),
+ Execute::new(execute_operation),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ // Execute multiple attempts, the circuit breaker will eventually open because the
+ // failure rate exceeds the threshold. You can play with this value an increase it to 300
+ // to see how the circuit breaker eventually closes when the service recovers.
+ for attempt in 0..30 {
+ clock.delay(Duration::from_millis(50)).await;
+
+ match service.execute(attempt).await {
+ Ok(output) => println!("{attempt}: {output}"),
+ Err(e) => println!("{attempt}: {e}"),
+ }
+ }
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+// Simulate major service outage, 50% chance of failing
+async fn execute_operation(input: u32) -> Result {
+ // After input 100, the service recovers and always succeeds
+ if input > 100 {
+ return Ok(format!("output-{input}"));
+ }
+
+ if fastrand::i16(0..10) > 5 {
+ Err(format!("transient error for '{input}'"))
+ } else {
+ // Produce some output
+ Ok(format!("output-{input}"))
+ }
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/examples/resilience_pipeline.rs b/crates/seatbelt/examples/resilience_pipeline.rs
new file mode 100644
index 00000000..6cfb311c
--- /dev/null
+++ b/crates/seatbelt/examples/resilience_pipeline.rs
@@ -0,0 +1,71 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! This example demonstrates how to combine multiple resilience middlewares
+//! using the `seatbelt` crate to create a robust execution pipeline with basic
+//! resilience capabilities.
+
+use std::time::Duration;
+
+use layered::{Execute, Service, Stack};
+use ohno::AppError;
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::retry::Retry;
+use seatbelt::timeout::Timeout;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+
+ // Shared options for resilience middleware
+ let context = ResilienceContext::new(&clock).enable_metrics(&meter_provider).name("my_pipeline");
+
+ // Define stack with retry and timeout middlewares
+ let stack = (
+ Retry::layer("my_retry", &context)
+ // automatically clones the input for retries
+ .clone_input()
+ // classify the output
+ .recovery_with(|output: &String, _args| match output.as_str() {
+ "error" | "timeout" => RecoveryInfo::retry(),
+ _ => RecoveryInfo::never(),
+ }),
+ Timeout::layer("my_timeout", &context)
+ .timeout(Duration::from_secs(1))
+ .timeout_output(|_args| "timeout".to_string()),
+ Execute::new(execute_operation),
+ );
+
+ // Build the service
+ let service = stack.into_service();
+
+ // Execute the service with an input
+ let output = service.execute("value".to_string()).await;
+
+ println!("execution finished, output: {output}");
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+async fn execute_operation(input: String) -> String {
+ if fastrand::i16(0..10) > 4 { "error".to_string() } else { input }
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/examples/retry.rs b/crates/seatbelt/examples/retry.rs
new file mode 100644
index 00000000..fba73eb9
--- /dev/null
+++ b/crates/seatbelt/examples/retry.rs
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Basic retry middleware example with automatic input cloning and simple recovery logic.
+
+use std::io::Error;
+
+use layered::{Execute, Service, Stack};
+use ohno::AppError;
+use seatbelt::retry::Retry;
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let clock = Clock::new_tokio();
+ let context = ResilienceContext::new(&clock);
+
+ // Define stack with retry layer
+ let stack = (
+ Retry::layer("my_retry", &context)
+ .clone_input() // Automatically clone input for retries
+ .recovery_with(|output, _args| match output {
+ Ok(_) => RecoveryInfo::never(),
+ Err(_) => RecoveryInfo::retry(),
+ }),
+ Execute::new(execute_operation),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ match service.execute("value".to_string()).await {
+ Ok(output) => println!("execution succeeded, result: {output}"),
+ Err(e) => println!("execution failed, error: {e}"),
+ }
+
+ Ok(())
+}
+
+// 20% chance of failing with a transient error
+async fn execute_operation(input: String) -> Result {
+ if fastrand::i16(0..10) > 8 {
+ Err(Error::other("transient execution error"))
+ } else {
+ Ok(input)
+ }
+}
diff --git a/crates/seatbelt/examples/retry_advanced.rs b/crates/seatbelt/examples/retry_advanced.rs
new file mode 100644
index 00000000..1fd6cad1
--- /dev/null
+++ b/crates/seatbelt/examples/retry_advanced.rs
@@ -0,0 +1,102 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Advanced retry middleware demonstrating custom input cloning and attempt info forwarding.
+//!
+//! Shows how to inject attempt metadata into requests via `.clone_input()`, access it
+//! in the service function, and forward it through to the final output.
+
+use std::io::Error;
+use std::time::Duration;
+
+use http::{Request, Response};
+use layered::{Execute, Service, Stack};
+use ohno::AppError;
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::retry::{Attempt, Retry};
+use seatbelt::{RecoveryInfo, ResilienceContext};
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+ let context = ResilienceContext::new(&clock)
+ .name("retry_advanced")
+ .enable_metrics(&meter_provider);
+
+ // Define stack with retry layer
+ let stack = (
+ Retry::layer("my_retry", &context)
+ // Custom input cloning - inject attempt info into request extensions
+ .clone_input_with(|input: &mut Request, args| {
+ let mut cloned = input.clone();
+ cloned.extensions_mut().insert(args.attempt());
+ Some(cloned)
+ })
+ .max_retry_attempts(10)
+ .use_jitter(true)
+ .base_delay(Duration::from_millis(100))
+ .recovery_with(|output, _args| match output {
+ Ok(_) => RecoveryInfo::never(),
+ Err(_) => RecoveryInfo::retry(),
+ })
+ // Register a callback called just before the next retry
+ .on_retry(|_output, args| {
+ println!(
+ "retrying, attempt {}, delay: {}s",
+ args.attempt().index(),
+ args.retry_delay().as_secs_f32(),
+ );
+ }),
+ Execute::new(send_request),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ let request = Request::builder().uri("https://example.com").body("value".to_string())?;
+
+ match service.execute(request).await {
+ Ok(output) => {
+ // Extract attempt info that was forwarded through the pipeline
+ let attempts = output.extensions().get::().map_or(0, |a| a.index());
+ println!("execution succeeded, result: {}, attempts: {}", output.body(), attempts);
+ }
+ Err(e) => println!("execution failed, error: {e}"),
+ }
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+// Only 20% chance of success, retries will be attempted with a high probability
+async fn send_request(input: Request) -> Result, Error> {
+ if fastrand::i16(0..10) > 2 {
+ Err(Error::other("transient execution error"))
+ } else {
+ // Extract attempt info that was injected during custom cloning
+ let attempt = input.extensions().get::().copied().unwrap_or_default();
+
+ // Forward attempt info to output via response extensions
+ Response::builder()
+ .extension(attempt)
+ .body("success".to_string())
+ .map_err(Error::other)
+ }
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/examples/retry_outage.rs b/crates/seatbelt/examples/retry_outage.rs
new file mode 100644
index 00000000..4e2b7aab
--- /dev/null
+++ b/crates/seatbelt/examples/retry_outage.rs
@@ -0,0 +1,149 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+#![expect(clippy::unwrap_used, reason = "sample code")]
+
+//! Demonstrates advanced retry patterns with input restoration from errors.
+//!
+//! This example showcases how to handle outage scenarios where:
+//! - The original input cannot be cloned (expensive request bodies)
+//! - Input must be restored from error information using `restore_input_on_error()`
+//! - Failed requests are automatically retried with a fallback endpoint
+//! - Outage detection and recovery are handled seamlessly
+
+use std::time::Duration;
+
+use http::{Request, Response};
+use layered::{Execute, Service, Stack};
+use ohno::AppError;
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::retry::Retry;
+use seatbelt::{Recovery, RecoveryInfo, ResilienceContext};
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+const ENDPOINT_WITH_OUTAGE: &str = "https://example.com";
+const ENDPOINT_ALIVE: &str = "https://fallback.example.com";
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+ let context = ResilienceContext::new(&clock).enable_metrics(&meter_provider);
+
+ // Configure retry layer for outage handling with input restoration
+ let stack = (
+ Retry::layer("outage_retry", &context)
+ // Disable input cloning - we'll restore from error instead
+ .clone_input_with(|_, _| None)
+ // Configure recovery based on an error type
+ .recovery_with(|output: &Result<_, HttpError>, _| match output {
+ Ok(_) => RecoveryInfo::never(), // Don't retry successful responses
+ Err(error) => error.recovery(), // Use error's recovery strategy
+ })
+ // Enable unavailable detection and handling
+ .handle_unavailable(true)
+ // Restore input from error when retrying (key feature!)
+ .restore_input_from_error(|error: &mut HttpError, _| {
+ // Extract the original request and modify it for fallback endpoint
+ error.try_restore_request()
+ }),
+ Execute::new(send_request),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ // Create a request that will initially fail but can be restored
+ let request = Request::builder()
+ .uri(ENDPOINT_WITH_OUTAGE)
+ .body("important request data".to_string())?;
+
+ println!("Sending request to: {}", request.uri());
+
+ // The service will:
+ // 1. Try the original endpoint (fails with outage)
+ // 2. Restore input from the error with fallback endpoint
+ // 3. Retry with the modified request (succeeds)
+ let response = service.execute(request).await?;
+
+ println!("Final response: {}", response.body());
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+/// Simulates a service that has outages on the primary endpoint but works on fallback.
+///
+/// This demonstrates the input restoration pattern where the original request is preserved
+/// in the error so it can be modified and retried against a different endpoint.
+async fn send_request(input: Request) -> Result, HttpError> {
+ if input.uri() == ENDPOINT_WITH_OUTAGE {
+ println!("Request to {} failed - simulating outage", input.uri());
+ // Store the original request in the error for later restoration
+ Err(HttpError::outage(input))
+ } else {
+ println!("Request to {} succeeded", input.uri());
+ Ok(Response::new(format!("Success! Data from {}", input.uri())))
+ }
+}
+
+/// Custom error type that preserves the original request for restoration.
+///
+/// This pattern allows failed requests to be modified and retried against different
+/// endpoints without requiring the original input to be cloneable.
+#[ohno::error]
+struct HttpError {
+ /// The original request that failed, preserved for input restoration
+ rejected_request: Option>>,
+ /// Recovery strategy (retry vs. never) for this error type
+ recovery: RecoveryInfo,
+}
+
+impl HttpError {
+ /// Creates an outage error that preserves the original request for retry.
+ fn outage(rejected_request: Request) -> Self {
+ Self::caused_by(
+ Some(Box::new(rejected_request)),
+ RecoveryInfo::unavailable().delay(Duration::from_millis(100)),
+ "simulated outage",
+ )
+ }
+
+ /// Restores the original request with a modified endpoint for retry.
+ ///
+ /// This is called by `restore_input_on_error()` to extract and modify the
+ /// original request. It changes the URI to the fallback endpoint and returns
+ /// the modified request for the next retry attempt.
+ fn try_restore_request(&mut self) -> Option> {
+ self.rejected_request
+ .take() // Extract the stored request
+ .map(|boxed_request| *boxed_request) // Unbox it
+ .map(|mut request| {
+ // Modify the request to use the fallback endpoint
+ *request.uri_mut() = ENDPOINT_ALIVE.parse().unwrap();
+ println!("Restored request with fallback endpoint: {}", request.uri());
+ request
+ })
+ }
+}
+
+impl Recovery for HttpError {
+ fn recovery(&self) -> RecoveryInfo {
+ self.recovery.clone()
+ }
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/examples/timeout.rs b/crates/seatbelt/examples/timeout.rs
new file mode 100644
index 00000000..03c8294a
--- /dev/null
+++ b/crates/seatbelt/examples/timeout.rs
@@ -0,0 +1,75 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+#![expect(clippy::unwrap_used, reason = "sample code")]
+
+//! Simple timeout resilience middleware example.
+//!
+//! This example demonstrates the basic usage of the timeout middleware to cancel
+//! long-running operations.
+
+use std::time::Duration;
+
+use layered::{Execute, Service, Stack};
+use ohno::{AppError, app_err};
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::ResilienceContext;
+use seatbelt::timeout::Timeout;
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
+const PROCESSING_DELAY: Duration = Duration::from_millis(500);
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+
+ // Create common options
+ let context = ResilienceContext::new(&clock).enable_metrics(&meter_provider);
+
+ // Define stack with timeout layer
+ let stack = (
+ Timeout::layer("my_timeout", &context)
+ // Required: specify the timeout duration
+ .timeout(TIMEOUT_DURATION)
+ // Required: create error output for timeouts
+ .timeout_error(|args| app_err!("timeout occurred, timeout: {}ms", args.timeout().as_millis())),
+ Execute::new({
+ let clock = clock.clone();
+ move |_input| {
+ let clock = clock.clone();
+ async move {
+ clock.delay(PROCESSING_DELAY).await; // Simulate some processing delay so the timeout can trigger
+ Ok(())
+ }
+ }
+ }),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ for i in 0..10 {
+ // Execute the service, results in a timeout error
+ let timeout_error = service.execute(i.to_string()).await.unwrap_err();
+ println!("{i} attempt, error: {timeout_error}");
+ }
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/examples/timeout_advanced.rs b/crates/seatbelt/examples/timeout_advanced.rs
new file mode 100644
index 00000000..29a42114
--- /dev/null
+++ b/crates/seatbelt/examples/timeout_advanced.rs
@@ -0,0 +1,88 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Advanced timeout resilience middleware example.
+//!
+//! This example demonstrates advanced usage of the timeout middleware, including working with
+//! Result-based outputs, timeout callbacks, and dynamic timeout durations based on input.
+
+use std::time::Duration;
+
+use layered::{Execute, Service, Stack};
+use ohno::{AppError, app_err};
+use opentelemetry_sdk::metrics::SdkMeterProvider;
+use opentelemetry_stdout::MetricExporter;
+use seatbelt::ResilienceContext;
+use seatbelt::timeout::Timeout;
+use tick::Clock;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+const TIMEOUT_DURATION: Duration = Duration::from_millis(20);
+const PROCESSING_DELAY: Duration = Duration::from_secs(1);
+
+#[tokio::main]
+async fn main() -> Result<(), AppError> {
+ // Configure telemetry to see the timeout metrics and logs
+ let meter_provider = configure_telemetry();
+
+ let clock = Clock::new_tokio();
+
+ // Create service options
+ let context: ResilienceContext> =
+ ResilienceContext::new(&clock).name("my_pipeline").enable_metrics(&meter_provider);
+
+ // Define stack with timeout layer
+ let stack = (
+ Timeout::layer("my_timeout", &context)
+ // Required: specify the timeout duration
+ .timeout(TIMEOUT_DURATION)
+ // Required: create error output for timeouts
+ .timeout_error(|args| app_err!("timeout occurred, timeout: {}ms", args.timeout().as_millis()))
+ // Optional: callback to be invoked when a timeout occurs
+ .on_timeout(|_out, args| {
+ println!("timeout occurred, timeout: {}ms", args.timeout().as_millis());
+ })
+ // Optional: override the default timeout duration by inspecting the input
+ .timeout_override(|input, _args| match input.as_str() {
+ "2" => Some(Duration::from_millis(300)),
+ _ => None,
+ }),
+ Execute::new({
+ let clock = clock.clone();
+ move |_input| {
+ let clock = clock.clone();
+ async move {
+ // Simulate some processing delay so the timeout can trigger
+ clock.delay(PROCESSING_DELAY).await;
+ Ok(())
+ }
+ }
+ }),
+ );
+
+ // Create the service from the stack
+ let service = stack.into_service();
+
+ for i in 0..10 {
+ // Execute the service, results in a timeout error
+ match service.execute(i.to_string()).await {
+ Ok(()) => println!("execute, input: {i}, result: success"),
+ Err(e) => println!("execute, input: {i}, error: {e}"),
+ }
+ }
+
+ // Flush metrics to stdout before exiting
+ meter_provider.force_flush()?;
+
+ Ok(())
+}
+
+fn configure_telemetry() -> SdkMeterProvider {
+ // Set up tracing subscriber for logs to console
+ tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
+
+ SdkMeterProvider::builder()
+ .with_periodic_exporter(MetricExporter::default())
+ .build()
+}
diff --git a/crates/seatbelt/favicon.ico b/crates/seatbelt/favicon.ico
new file mode 100644
index 00000000..e7752941
--- /dev/null
+++ b/crates/seatbelt/favicon.ico
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:3de762b64903267b6d416cfb2d420b956fa4dfdfcb47ee8b232845da495be8e5
+size 28527
diff --git a/crates/seatbelt/logo.png b/crates/seatbelt/logo.png
new file mode 100644
index 00000000..1794a428
--- /dev/null
+++ b/crates/seatbelt/logo.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:7992716781c4ace4099acf8d4104ac6ebc0e1cdeb437f43a0a884ead21bfd29e
+size 84743
diff --git a/crates/seatbelt/src/circuit_breaker/args.rs b/crates/seatbelt/src/circuit_breaker/args.rs
new file mode 100644
index 00000000..f16ced2e
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/args.rs
@@ -0,0 +1,161 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Duration;
+
+use tick::Clock;
+
+use crate::circuit_breaker::PartitionKey;
+
+/// Arguments for the [`recovery_with`][super::CircuitLayer::recovery_with] callback function.
+///
+/// Provides context for recovery classification in the circuit breaker.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct RecoveryArgs<'a> {
+ pub(crate) partition_key: &'a PartitionKey,
+ pub(crate) clock: &'a Clock,
+}
+
+impl RecoveryArgs<'_> {
+ /// Returns the partition key associated with the recovery evaluation.
+ #[must_use]
+ pub fn partition_key(&self) -> &PartitionKey {
+ self.partition_key
+ }
+
+ /// Returns a reference to the clock use by the circuit breaker.
+ #[must_use]
+ pub fn clock(&self) -> &Clock {
+ self.clock
+ }
+}
+
+/// Arguments for the [`rejected_input`][super::CircuitLayer::rejected_input] callback function.
+///
+/// Provides context for generating outputs when the inputs are rejected by the circuit breaker.
+#[derive(Debug)]
+pub struct RejectedInputArgs<'a> {
+ pub(crate) partition_key: &'a PartitionKey,
+}
+
+impl RejectedInputArgs<'_> {
+ /// Returns the partition key associated with the rejected input.
+ #[must_use]
+ pub fn partition_key(&self) -> &PartitionKey {
+ self.partition_key
+ }
+}
+
+/// Arguments for the [`on_probing`][super::CircuitLayer::on_probing] callback function.
+///
+/// Provides context when the circuit breaker enters the probing state to test if the service has recovered.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct OnProbingArgs<'a> {
+ pub(crate) partition_key: &'a PartitionKey,
+}
+
+impl OnProbingArgs<'_> {
+ /// Returns the partition key associated with the probing execution.
+ #[must_use]
+ pub fn partition_key(&self) -> &PartitionKey {
+ self.partition_key
+ }
+}
+
+/// Arguments for the [`on_closed`][super::CircuitLayer::on_closed] callback function.
+///
+/// Provides context when the circuit breaker transitions to the closed state, allowing normal operation.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct OnClosedArgs<'a> {
+ pub(crate) partition_key: &'a PartitionKey,
+ pub(crate) open_duration: std::time::Duration,
+}
+
+impl OnClosedArgs<'_> {
+ /// Returns the partition key associated with this event.
+ #[must_use]
+ pub fn partition_key(&self) -> &PartitionKey {
+ self.partition_key
+ }
+
+ /// Returns the duration the circuit was open before closing.
+ #[must_use]
+ pub fn open_duration(&self) -> Duration {
+ self.open_duration
+ }
+}
+
+/// Arguments for the [`on_opened`][super::CircuitLayer::on_opened] callback function.
+///
+/// Provides context when the circuit breaker transitions to the open state, blocking requests due to failures.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct OnOpenedArgs<'a> {
+ pub(crate) partition_key: &'a PartitionKey,
+}
+
+impl OnOpenedArgs<'_> {
+ /// Returns the partition key associated with this event.
+ #[must_use]
+ pub fn partition_key(&self) -> &PartitionKey {
+ self.partition_key
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn recovery_args_accessors() {
+ let key = PartitionKey::from("test");
+ let clock = Clock::new_frozen();
+ let args = RecoveryArgs {
+ partition_key: &key,
+ clock: &clock,
+ };
+ assert_eq!(args.partition_key(), &key);
+ let _ = args.clock();
+ assert!(format!("{args:?}").contains("RecoveryArgs"));
+ }
+
+ #[test]
+ fn rejected_input_args_accessors() {
+ let key = PartitionKey::from("rejected");
+ let args = RejectedInputArgs { partition_key: &key };
+ assert_eq!(args.partition_key(), &key);
+ assert!(format!("{args:?}").contains("RejectedInputArgs"));
+ }
+
+ #[test]
+ fn on_probing_args_accessors() {
+ let key = PartitionKey::from("probing");
+ let args = OnProbingArgs { partition_key: &key };
+ assert_eq!(args.partition_key(), &key);
+ assert!(format!("{args:?}").contains("OnProbingArgs"));
+ }
+
+ #[test]
+ fn on_closed_args_accessors() {
+ let key = PartitionKey::from("closed");
+ let duration = Duration::from_secs(5);
+ let args = OnClosedArgs {
+ partition_key: &key,
+ open_duration: duration,
+ };
+ assert_eq!(args.partition_key(), &key);
+ assert_eq!(args.open_duration(), duration);
+ assert!(format!("{args:?}").contains("OnClosedArgs"));
+ }
+
+ #[test]
+ fn on_opened_args_accessors() {
+ let key = PartitionKey::from("opened");
+ let args = OnOpenedArgs { partition_key: &key };
+ assert_eq!(args.partition_key(), &key);
+ assert!(format!("{args:?}").contains("OnOpenedArgs"));
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/callbacks.rs b/crates/seatbelt/src/circuit_breaker/callbacks.rs
new file mode 100644
index 00000000..ebb579f7
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/callbacks.rs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use super::{OnClosedArgs, OnOpenedArgs, OnProbingArgs, PartitionKey, RecoveryArgs, RejectedInputArgs};
+use crate::RecoveryInfo;
+
+crate::utils::define_fn_wrapper!(PartionKeyProvider(Fn(&In) -> PartitionKey));
+crate::utils::define_fn_wrapper!(ShouldRecover(Fn(&Out, RecoveryArgs) -> RecoveryInfo));
+crate::utils::define_fn_wrapper!(RejectedInput(Fn(In, RejectedInputArgs) -> Out));
+crate::utils::define_fn_wrapper!(OnProbing(Fn(&mut In, OnProbingArgs)));
+crate::utils::define_fn_wrapper!(OnOpened(Fn(&Out, OnOpenedArgs)));
+crate::utils::define_fn_wrapper!(OnClosed(Fn(&Out, OnClosedArgs)));
diff --git a/crates/seatbelt/src/circuit_breaker/constants.rs b/crates/seatbelt/src/circuit_breaker/constants.rs
new file mode 100644
index 00000000..1030ed01
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/constants.rs
@@ -0,0 +1,37 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Duration;
+
+/// Minimum allowed duration for the circuit breaker's sampling window.
+pub(crate) const MIN_SAMPLING_DURATION: Duration = Duration::from_secs(1);
+
+/// Default minimum throughput (number of requests) in the sampling window before
+/// the circuit breaker can evaluate the failure rate and potentially trip the circuit.
+///
+/// The defaults taken from `Polly V8`:
+///
+pub(crate) const DEFAULT_MIN_THROUGHPUT: u32 = 100;
+
+/// Default duration of the circuit breaker's sampling window.
+///
+/// The defaults taken from `Polly V8`:
+///
+pub(crate) const DEFAULT_SAMPLING_DURATION: Duration = Duration::from_secs(30);
+
+/// Default failure threshold (percentage of failed requests) in the sampling window
+/// that will trip the circuit breaker.
+///
+/// The defaults taken from `Polly V8`:
+///
+pub(crate) const DEFAULT_FAILURE_THRESHOLD: f32 = 0.1;
+
+/// Default duration that the circuit breaker remains open (broken) before
+/// transitioning to half-open to test if the service has recovered.
+///
+/// The defaults taken from `Polly V8`:
+///
+pub(crate) const DEFAULT_BREAK_DURATION: Duration = Duration::from_secs(5);
+
+pub(crate) const ERR_POISONED_LOCK: &str =
+ "poisoned lock - cannot continue execution because security and privacy guarantees can no longer be upheld";
diff --git a/crates/seatbelt/src/circuit_breaker/engine/engine_core.rs b/crates/seatbelt/src/circuit_breaker/engine/engine_core.rs
new file mode 100644
index 00000000..4edc58c6
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/engine_core.rs
@@ -0,0 +1,690 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use tick::Clock;
+
+use super::{EngineOptions, EnterCircuitResult, ExitCircuitResult};
+use crate::circuit_breaker::constants::ERR_POISONED_LOCK;
+use crate::circuit_breaker::engine::probing::{AllowProbeResult, Probes, ProbingResult};
+use crate::circuit_breaker::{CircuitEngine, ExecutionMode, ExecutionResult, HealthMetrics, HealthStatus};
+
+/// Engine that manages the state of the circuit breaker.
+#[derive(Debug)]
+pub(crate) struct EngineCore {
+ state: Mutex,
+ options: EngineOptions,
+ clock: Clock,
+}
+
+impl EngineCore {
+ pub fn new(options: EngineOptions, clock: Clock) -> Self {
+ Self {
+ state: Mutex::new(State::Closed {
+ health: options.health_metrics_builder.build(),
+ }),
+ options,
+ clock,
+ }
+ }
+}
+
+impl CircuitEngine for EngineCore {
+ fn enter(&self) -> EnterCircuitResult {
+ let now = self.clock.instant();
+
+ // NOTE: Remember to execute all expensive operations (like time checks) outside the lock.
+ self.state.lock().expect(ERR_POISONED_LOCK).enter(now, &self.options)
+ }
+
+ fn exit(&self, result: ExecutionResult, _mode: ExecutionMode) -> ExitCircuitResult {
+ let now = self.clock.instant();
+
+ // NOTE: Remember to execute all expensive operations (like time checks) outside the lock.
+ self.state.lock().expect(ERR_POISONED_LOCK).exit(result, now, &self.options)
+ }
+}
+
+#[derive(Debug)]
+enum State {
+ Closed { health: HealthMetrics },
+ Open { open_until: Instant, stats: Stats },
+ HalfOpen { probes: Probes, stats: Stats },
+}
+
+impl State {
+ fn enter(&mut self, now: Instant, settings: &EngineOptions) -> EnterCircuitResult {
+ match self {
+ Self::Closed { .. } => EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal,
+ },
+ Self::Open { open_until, stats } => {
+ if now >= *open_until {
+ let mut probes = Probes::new(&settings.probes);
+ let allow = probes.allow_probe(now);
+ stats.record_allow_result(allow);
+
+ *self = Self::HalfOpen {
+ probes,
+ stats: stats.clone(),
+ };
+ EnterCircuitResult::from(allow)
+ } else {
+ stats.rejected = stats.rejected.saturating_add(1);
+ EnterCircuitResult::Rejected
+ }
+ }
+ Self::HalfOpen { probes, stats: info } => {
+ let allow = probes.allow_probe(now);
+ info.record_allow_result(allow);
+ EnterCircuitResult::from(allow)
+ }
+ }
+ }
+
+ fn exit(&mut self, result: ExecutionResult, now: Instant, settings: &EngineOptions) -> ExitCircuitResult {
+ match self {
+ Self::Closed { health } => {
+ // first, record the result and evaluate the health metrics
+ health.record(result, now);
+ let health = health.health_info();
+
+ // decide the next state based on health status
+ match health.status() {
+ // Health is good, remain in a closed state
+ HealthStatus::Healthy => ExitCircuitResult::Unchanged,
+ // Health is poor, transition to Open state
+ HealthStatus::Unhealthy => {
+ *self = Self::Open {
+ open_until: now + settings.break_duration,
+ stats: Stats::new(now),
+ };
+ ExitCircuitResult::Opened(health)
+ }
+ }
+ }
+ Self::Open { stats, .. } => {
+ // Record lost results for statistics purposes
+ stats.probes_lost = stats.probes_lost.saturating_add(1);
+
+ // In open state, we don't process results. This can happen when multiple threads are involved and
+ // the state of circuit breaker changes between enter and exit calls since these are separate
+ // method calls that could be interleaved with other threads. Ignore the result.
+ ExitCircuitResult::Unchanged
+ }
+ Self::HalfOpen { probes, stats } => {
+ // record the result of the probe
+ stats.record_probe_execution_result(result);
+
+ match probes.record(result, now) {
+ ProbingResult::Success => {
+ let stats = stats.clone();
+
+ *self = Self::Closed {
+ health: settings.health_metrics_builder.build(),
+ };
+
+ ExitCircuitResult::Closed(stats)
+ }
+ ProbingResult::Failure => {
+ stats.re_opened = stats.re_opened.saturating_add(1);
+
+ *self = Self::Open {
+ open_until: now + settings.break_duration,
+ stats: stats.clone(),
+ };
+
+ ExitCircuitResult::Reopened
+ }
+ ProbingResult::Pending => ExitCircuitResult::Unchanged,
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct Stats {
+ pub opened_at: Instant,
+ pub re_opened: usize,
+ pub probes_total: usize,
+ pub probes_lost: usize,
+ pub probes_successes: usize,
+ pub probes_failures: usize,
+ pub rejected: usize,
+}
+
+impl Stats {
+ pub fn new(opened_at: Instant) -> Self {
+ Self {
+ opened_at,
+ probes_total: 0,
+ probes_lost: 0,
+ probes_successes: 0,
+ probes_failures: 0,
+ rejected: 0,
+ re_opened: 0,
+ }
+ }
+
+ pub fn opened_duration(&self, now: Instant) -> Duration {
+ now.saturating_duration_since(self.opened_at)
+ }
+
+ fn record_allow_result(&mut self, allow: AllowProbeResult) {
+ if allow == AllowProbeResult::Accepted {
+ self.probes_total = self.probes_total.saturating_add(1);
+ } else {
+ self.rejected = self.rejected.saturating_add(1);
+ }
+ }
+
+ fn record_probe_execution_result(&mut self, result: ExecutionResult) {
+ match result {
+ ExecutionResult::Success => {
+ self.probes_successes = self.probes_successes.saturating_add(1);
+ }
+ ExecutionResult::Failure => {
+ self.probes_failures = self.probes_failures.saturating_add(1);
+ }
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use std::ops::Deref;
+
+ use tick::ClockControl;
+
+ use super::*;
+ use crate::circuit_breaker::HealthMetricsBuilder;
+ use crate::circuit_breaker::engine::probing::ProbesOptions;
+
+ fn create_test_settings() -> EngineOptions {
+ EngineOptions {
+ break_duration: Duration::from_secs(5),
+ health_metrics_builder: HealthMetricsBuilder::new(
+ Duration::from_secs(30),
+ 0.1, // 10% failure threshold
+ 10, // minimum 10 requests
+ ),
+ probes: ProbesOptions::quick(Duration::from_secs(2)),
+ }
+ }
+
+ fn create_test_engine() -> EngineCore {
+ let settings = create_test_settings();
+ let clock = Clock::new_frozen();
+ EngineCore::new(settings, clock)
+ }
+
+ fn open_engine(engine: &EngineCore) {
+ const MAX_ATTEMPTS: usize = 1000;
+
+ for _attempt in 0..MAX_ATTEMPTS {
+ engine.enter();
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ if matches!(result, ExitCircuitResult::Opened(_)) {
+ return;
+ }
+ }
+
+ panic!("failed to open the circuit after {MAX_ATTEMPTS} attempts");
+ }
+
+ #[test]
+ fn new_with_valid_settings_creates_closed_state() {
+ let engine = create_test_engine();
+
+ // Verify engine was created (we can't directly inspect the state due to encapsulation)
+ // but we can verify it starts in closed state by checking enter() behavior
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal
+ }
+ ));
+ }
+
+ #[test]
+ fn enter_when_closed_accepts_request() {
+ let engine = create_test_engine();
+
+ let result = engine.enter();
+
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal
+ }
+ ));
+ }
+
+ #[test]
+ fn enter_when_open_before_timeout_rejects_request() {
+ let engine = create_test_engine();
+ open_engine(&engine);
+
+ // Verify circuit is now open
+ let result = engine.enter();
+ assert!(matches!(result, EnterCircuitResult::Rejected));
+ }
+
+ #[test]
+ fn enter_when_open_after_timeout_transitions_to_half_open() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force circuit to open
+ open_engine(&engine);
+
+ // Advance time beyond break duration
+ control.advance(Duration::from_secs(6));
+
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Probe
+ }
+ ));
+ }
+
+ #[test]
+ fn enter_when_half_open_within_break_duration_rejects_request() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open then half-open
+ open_engine(&engine);
+ control.advance(Duration::from_secs(6));
+ engine.enter(); // Transitions to half-open
+
+ // Try entering again immediately (within break duration)
+ let result = engine.enter();
+ assert!(matches!(result, EnterCircuitResult::Rejected));
+ }
+
+ #[test]
+ fn enter_when_half_open_after_break_duration_resets_half_open_timer() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open then half-open
+ open_engine(&engine);
+ control.advance(Duration::from_secs(6));
+ engine.enter(); // Transitions to half-open
+
+ // Advance time beyond break duration while in half-open
+ control.advance(Duration::from_secs(6));
+
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Probe
+ }
+ ));
+ }
+
+ #[test]
+ fn exit_when_closed_with_success_remains_unchanged() {
+ let engine = create_test_engine();
+ engine.enter();
+
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Unchanged));
+ }
+
+ #[test]
+ fn exit_when_closed_with_enough_failures_opens_circuit() {
+ let settings = EngineOptions {
+ break_duration: Duration::from_secs(5),
+ health_metrics_builder: HealthMetricsBuilder::new(
+ Duration::from_secs(30),
+ 0.1, // 10% failure threshold
+ 20, // minimum 20 requests (higher than default 10 for this test)
+ ),
+ probes: ProbesOptions::quick(Duration::from_secs(2)),
+ };
+ let clock = Clock::new_frozen();
+ let engine = EngineCore::new(settings, clock);
+
+ // Record 19 successes and 3 failures = 22 total requests with ~13.6% failure rate
+ for _ in 0..19 {
+ engine.enter();
+ engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+ }
+ for _ in 0..2 {
+ engine.enter();
+ engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ }
+
+ // One more failure to trigger opening: 3 failures out of 22 total = ~13.6% > 10%
+ engine.enter();
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Opened(_)));
+ }
+
+ #[test]
+ fn exit_when_closed_with_insufficient_failures_remains_unchanged() {
+ let engine = create_test_engine();
+
+ // Record some failures but not enough to exceed a threshold (need at least 10 requests)
+ for _ in 0..5 {
+ engine.enter();
+ engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ }
+
+ engine.enter();
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Unchanged));
+ }
+
+ #[test]
+ fn exit_when_open_ignores_result() {
+ let engine = create_test_engine();
+ open_engine(&engine);
+
+ // Try to record success in open state
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+ assert!(matches!(result, ExitCircuitResult::Unchanged));
+
+ if let State::Open { stats, .. } = engine.state.lock().unwrap().deref() {
+ assert_eq!(stats.probes_lost, 1);
+ } else {
+ panic!("expected engine to be in Open state");
+ }
+ }
+
+ #[test]
+ fn exit_when_half_open_with_success_closes_circuit() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open then half-open
+ open_engine(&engine);
+ control.advance(Duration::from_secs(6));
+ engine.enter(); // Transitions to half-open
+
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Closed(stats) if stats.probes_successes == 1 && stats.probes_total == 1));
+ }
+
+ #[test]
+ fn exit_when_half_open_with_failure_reopens_circuit() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open then half-open
+ open_engine(&engine);
+ control.advance(Duration::from_secs(6));
+ engine.enter(); // Transitions to half-open
+
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Reopened));
+ }
+
+ #[test]
+ fn circuit_breaker_full_cycle() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Start in closed state
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal
+ }
+ ));
+
+ // Force to open state
+ open_engine(&engine);
+
+ // Verify open state rejects requests
+ let result = engine.enter();
+ assert!(matches!(result, EnterCircuitResult::Rejected));
+
+ // Advance time to allow transition to half-open
+ control.advance(Duration::from_secs(6));
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Probe
+ }
+ ));
+
+ // Successful probe closes the circuit
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ if let ExitCircuitResult::Closed(stats) = &result {
+ assert_eq!(stats.probes_successes, 1);
+ assert_eq!(stats.probes_total, 1);
+ assert_eq!(stats.rejected, 1);
+ assert_eq!(stats.probes_failures, 0);
+ assert_eq!(stats.probes_lost, 0);
+ assert_eq!(stats.re_opened, 0);
+ } else {
+ panic!("expected circuit to close after successful probe");
+ }
+
+ // Verify back to normal operation
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal
+ }
+ ));
+ }
+
+ #[test]
+ fn circuit_breaker_half_open_failure_cycle() {
+ let settings = create_test_settings();
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open state
+ open_engine(&engine);
+
+ // Transition to half-open
+ control.advance(Duration::from_secs(6));
+ engine.enter();
+
+ // Failed probe reopens circuit
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ assert!(matches!(result, ExitCircuitResult::Reopened));
+
+ // Verify circuit is open again
+ let result = engine.enter();
+ assert!(matches!(result, EnterCircuitResult::Rejected));
+
+ // Transition to half-open
+ control.advance(Duration::from_secs(6));
+ engine.enter();
+
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ if let ExitCircuitResult::Closed(stats) = &result {
+ assert_eq!(stats.probes_successes, 1);
+ assert_eq!(stats.probes_total, 2);
+ assert_eq!(stats.rejected, 1);
+ assert_eq!(stats.probes_failures, 1);
+ assert_eq!(stats.probes_lost, 0);
+ assert_eq!(stats.re_opened, 1);
+ } else {
+ panic!("expected circuit to close after successful probe");
+ }
+ }
+
+ #[test]
+ fn concurrent_enter_exit_operations() {
+ let engine = create_test_engine();
+
+ // Simulate operations where enter and exit are called separately
+ // (though each method call is atomic due to the internal mutex)
+ engine.enter();
+ let result1 = engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ engine.enter();
+ let result2 = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+
+ // Both should complete without panicking
+ assert!(matches!(result1, ExitCircuitResult::Unchanged));
+ assert!(matches!(result2, ExitCircuitResult::Unchanged));
+ }
+
+ #[test]
+ fn engine_with_custom_break_duration() {
+ let settings = EngineOptions {
+ break_duration: Duration::from_millis(100),
+ health_metrics_builder: HealthMetricsBuilder::new(Duration::from_secs(30), 0.1, 50),
+ probes: ProbesOptions::quick(Duration::from_secs(2)),
+ };
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open state
+ open_engine(&engine);
+
+ // Verify still rejected just before timeout
+ control.advance(Duration::from_millis(99));
+ let result = engine.enter();
+ assert!(matches!(result, EnterCircuitResult::Rejected));
+
+ // Verify accepted just after timeout
+ control.advance(Duration::from_millis(2));
+ let result = engine.enter();
+ assert!(matches!(
+ result,
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Probe
+ }
+ ));
+ }
+
+ #[test]
+ fn engine_with_custom_failure_threshold() {
+ let settings = EngineOptions {
+ break_duration: Duration::from_secs(5),
+ health_metrics_builder: HealthMetricsBuilder::new(
+ Duration::from_secs(30),
+ 0.5, // 50% failure threshold
+ 10, // minimum 10 requests
+ ),
+ probes: ProbesOptions::quick(Duration::from_secs(2)),
+ };
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Record 6 failures and 4 successes (60% failure rate, 10 total requests)
+ for _ in 0..6 {
+ engine.enter();
+ engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ }
+ for _ in 0..3 {
+ engine.enter();
+ engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+ }
+
+ // Add one more failure to make it 7 failures out of 10 (70% > 50% threshold)
+ engine.enter();
+ let result = engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+
+ assert!(matches!(result, ExitCircuitResult::Opened(_)));
+ }
+
+ #[test]
+ fn stats_record_probe_execution_result_increments_correctly() {
+ let mut stats = Stats::new(Instant::now());
+
+ stats.record_probe_execution_result(ExecutionResult::Success);
+ assert_eq!(stats.probes_successes, 1);
+ assert_eq!(stats.probes_failures, 0);
+
+ stats.record_probe_execution_result(ExecutionResult::Failure);
+ assert_eq!(stats.probes_successes, 1);
+ assert_eq!(stats.probes_failures, 1);
+ }
+
+ #[test]
+ fn stats_record_allow_result_increments_correctly() {
+ let mut stats = Stats::new(Instant::now());
+
+ stats.record_allow_result(AllowProbeResult::Accepted);
+ assert_eq!(stats.probes_total, 1);
+ assert_eq!(stats.rejected, 0);
+
+ stats.record_allow_result(AllowProbeResult::Rejected);
+ assert_eq!(stats.probes_total, 1);
+ assert_eq!(stats.rejected, 1);
+ }
+
+ #[test]
+ fn stats_opened_for_calculates_duration_correctly() {
+ let opened_at = Instant::now();
+ let stats = Stats::new(opened_at);
+
+ // Simulate some time passing
+ let later = opened_at + Duration::from_secs(10);
+
+ assert_eq!(stats.opened_duration(later), Duration::from_secs(10));
+ }
+
+ #[test]
+ fn exit_when_half_open_with_pending_probe_returns_unchanged() {
+ use crate::circuit_breaker::engine::probing::{HealthProbeOptions, ProbeOptions};
+
+ let settings = EngineOptions {
+ break_duration: Duration::from_secs(5),
+ health_metrics_builder: HealthMetricsBuilder::new(Duration::from_secs(30), 0.1, 10),
+ // Use a HealthProbe with long sampling duration so it returns Pending
+ probes: ProbesOptions::new([ProbeOptions::HealthProbe(HealthProbeOptions::new(
+ Duration::from_secs(60),
+ 0.2,
+ 1.0,
+ ))]),
+ };
+ let control = ClockControl::new();
+ let clock = control.to_clock();
+ let engine = EngineCore::new(settings, clock);
+
+ // Force to open state
+ open_engine(&engine);
+
+ // Advance time to transition to half-open
+ control.advance(Duration::from_secs(6));
+ engine.enter();
+
+ // Record success - should return Unchanged because HealthProbe is still sampling
+ let result = engine.exit(ExecutionResult::Success, ExecutionMode::Probe);
+ assert!(matches!(result, ExitCircuitResult::Unchanged));
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/engine_fake.rs b/crates/seatbelt/src/circuit_breaker/engine/engine_fake.rs
new file mode 100644
index 00000000..e35a9cfa
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/engine_fake.rs
@@ -0,0 +1,27 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use crate::circuit_breaker::{CircuitEngine, EnterCircuitResult, ExecutionMode, ExecutionResult, ExitCircuitResult};
+
+/// Fake engine to be used in tests.
+#[derive(Debug)]
+pub(crate) struct EngineFake {
+ enter_result: EnterCircuitResult,
+ exit_result: ExitCircuitResult,
+}
+
+impl EngineFake {
+ pub fn new(enter_result: EnterCircuitResult, exit_result: ExitCircuitResult) -> Self {
+ Self { enter_result, exit_result }
+ }
+}
+
+impl CircuitEngine for EngineFake {
+ fn enter(&self) -> EnterCircuitResult {
+ self.enter_result.clone()
+ }
+
+ fn exit(&self, _result: ExecutionResult, _mode: ExecutionMode) -> ExitCircuitResult {
+ self.exit_result.clone()
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/engine_telemetry.rs b/crates/seatbelt/src/circuit_breaker/engine/engine_telemetry.rs
new file mode 100644
index 00000000..1cf92831
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/engine_telemetry.rs
@@ -0,0 +1,297 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::borrow::Cow;
+
+use tick::Clock;
+
+#[cfg(any(feature = "metrics", feature = "logs", test))]
+use crate::circuit_breaker::CircuitState;
+#[cfg(any(feature = "metrics", test))]
+use crate::circuit_breaker::telemetry::*;
+use crate::circuit_breaker::{CircuitEngine, EnterCircuitResult, ExecutionMode, ExecutionResult, ExitCircuitResult};
+
+use crate::utils::TelemetryHelper;
+#[cfg(any(feature = "metrics", test))]
+use crate::utils::{EVENT_NAME, PIPELINE_NAME, STRATEGY_NAME};
+
+/// Wrapper around a circuit engine to add telemetry capabilities.
+#[derive(Debug)]
+pub(crate) struct EngineTelemetry {
+ inner: T,
+ #[cfg(any(feature = "metrics", feature = "logs", test))]
+ pub(super) telemetry: TelemetryHelper,
+ #[cfg(any(feature = "metrics", feature = "logs", test))]
+ pub(super) partition_key: Cow<'static, str>,
+ #[cfg(any(feature = "metrics", feature = "logs", test))]
+ pub(super) clock: Clock,
+}
+
+impl EngineTelemetry {
+ #[cfg(any(feature = "metrics", feature = "logs", test))]
+ pub fn new(inner: T, telemetry: TelemetryHelper, partition_key: Cow<'static, str>, clock: Clock) -> Self {
+ Self {
+ inner,
+ telemetry,
+ partition_key,
+ clock,
+ }
+ }
+
+ #[cfg(not(any(feature = "metrics", feature = "logs", test)))]
+ pub fn new(inner: T, _telemetry: TelemetryHelper, _partition_key: Cow<'static, str>, _clock: Clock) -> Self {
+ Self { inner }
+ }
+}
+
+impl CircuitEngine for EngineTelemetry {
+ fn enter(&self) -> EnterCircuitResult {
+ let enter_result = self.inner.enter();
+
+ if matches!(enter_result, EnterCircuitResult::Rejected) {
+ #[cfg(any(feature = "metrics", test))]
+ if self.telemetry.metrics_enabled() {
+ self.telemetry.report_metrics(&[
+ opentelemetry::KeyValue::new(PIPELINE_NAME, self.telemetry.pipeline_name.clone()),
+ opentelemetry::KeyValue::new(STRATEGY_NAME, self.telemetry.strategy_name.clone()),
+ opentelemetry::KeyValue::new(EVENT_NAME, CIRCUIT_REJECTED_EVENT_NAME),
+ opentelemetry::KeyValue::new(CIRCUIT_STATE, CircuitState::Open.as_str()),
+ opentelemetry::KeyValue::new(CIRCUIT_PARTITION, self.partition_key.clone()),
+ ]);
+ }
+
+ #[cfg(any(feature = "logs", test))]
+ if self.telemetry.logs_enabled {
+ tracing::event!(
+ name: "seatbelt.circuit_breaker.rejected",
+ tracing::Level::WARN,
+ pipeline.name = %self.telemetry.pipeline_name,
+ strategy.name = %self.telemetry.strategy_name,
+ circuit_breaker.state = CircuitState::Open.as_str(),
+ circuit_breaker.partition = %self.partition_key,
+ );
+ }
+ }
+
+ enter_result
+ }
+
+ fn exit(&self, result: ExecutionResult, mode: ExecutionMode) -> ExitCircuitResult {
+ if mode == ExecutionMode::Probe {
+ #[cfg(any(feature = "metrics", test))]
+ if self.telemetry.metrics_enabled() {
+ self.telemetry.report_metrics(&[
+ opentelemetry::KeyValue::new(PIPELINE_NAME, self.telemetry.pipeline_name.clone()),
+ opentelemetry::KeyValue::new(STRATEGY_NAME, self.telemetry.strategy_name.clone()),
+ opentelemetry::KeyValue::new(EVENT_NAME, CIRCUIT_PROBE_EVENT_NAME),
+ opentelemetry::KeyValue::new(CIRCUIT_STATE, CircuitState::HalfOpen.as_str()),
+ opentelemetry::KeyValue::new(CIRCUIT_PARTITION, self.partition_key.clone()),
+ opentelemetry::KeyValue::new(CIRCUIT_PROBE_RESULT, result.as_str()),
+ ]);
+ }
+
+ #[cfg(any(feature = "logs", test))]
+ if self.telemetry.logs_enabled {
+ tracing::event!(
+ name: "seatbelt.circuit_breaker.probe",
+ tracing::Level::INFO,
+ pipeline.name = %self.telemetry.pipeline_name,
+ strategy.name = %self.telemetry.strategy_name,
+ circuit_breaker.state = CircuitState::HalfOpen.as_str(),
+ circuit_breaker.partition = %self.partition_key,
+ circuit_breaker.probe.result = result.as_str(),
+ );
+ }
+ }
+
+ let exit_result = self.inner.exit(result, mode);
+
+ // Emit telemetry events for circuit state changes
+ match exit_result {
+ ExitCircuitResult::Opened(health) => {
+ #[cfg(any(feature = "metrics", test))]
+ if self.telemetry.metrics_enabled() {
+ self.telemetry.report_metrics(&[
+ opentelemetry::KeyValue::new(PIPELINE_NAME, self.telemetry.pipeline_name.clone()),
+ opentelemetry::KeyValue::new(STRATEGY_NAME, self.telemetry.strategy_name.clone()),
+ opentelemetry::KeyValue::new(EVENT_NAME, CIRCUIT_OPENED_EVENT_NAME),
+ opentelemetry::KeyValue::new(CIRCUIT_STATE, CircuitState::Open.as_str()),
+ opentelemetry::KeyValue::new(CIRCUIT_PARTITION, self.partition_key.clone()),
+ ]);
+ }
+
+ #[cfg(any(feature = "logs", test))]
+ if self.telemetry.logs_enabled {
+ tracing::event!(
+ name: "seatbelt.circuit_breaker.opened",
+ tracing::Level::WARN,
+ pipeline.name = %self.telemetry.pipeline_name,
+ strategy.name = %self.telemetry.strategy_name,
+ circuit_breaker.state = CircuitState::Open.as_str(),
+ circuit_breaker.partition = %self.partition_key,
+ circuit_breaker.health.failure_rate = health.failure_rate(),
+ circuit_breaker.health.throughput = health.throughput(),
+ );
+ }
+
+ _ = health;
+ }
+ ExitCircuitResult::Closed(ref stats) => {
+ #[cfg(any(feature = "metrics", test))]
+ if self.telemetry.metrics_enabled() {
+ self.telemetry.report_metrics(&[
+ opentelemetry::KeyValue::new(PIPELINE_NAME, self.telemetry.pipeline_name.clone()),
+ opentelemetry::KeyValue::new(STRATEGY_NAME, self.telemetry.strategy_name.clone()),
+ opentelemetry::KeyValue::new(EVENT_NAME, CIRCUIT_CLOSED_EVENT_NAME),
+ opentelemetry::KeyValue::new(CIRCUIT_STATE, CircuitState::Closed.as_str()),
+ opentelemetry::KeyValue::new(CIRCUIT_PARTITION, self.partition_key.clone()),
+ ]);
+ }
+
+ #[cfg(any(feature = "logs", test))]
+ if self.telemetry.logs_enabled {
+ tracing::event!(
+ name: "seatbelt.circuit_breaker.closed",
+ tracing::Level::INFO,
+ pipeline.name = %self.telemetry.pipeline_name,
+ strategy.name = %self.telemetry.strategy_name,
+ circuit_breaker.state = CircuitState::Closed.as_str(),
+ circuit_breaker.open.duration = stats.opened_duration(self.clock.instant()).as_secs(),
+ circuit_breaker.partition = %self.partition_key,
+ circuit_breaker.probes.total = stats.probes_total,
+ circuit_breaker.probes.successfull = stats.probes_successes,
+ circuit_breaker.probes.failed = stats.probes_failures,
+ circuit_breaker.probes.lost = stats.probes_lost,
+ circuit_breaker.rejections = stats.rejected,
+ circuit_breaker.re_opened = stats.re_opened,
+ );
+ }
+
+ _ = stats;
+ }
+ ExitCircuitResult::Reopened | ExitCircuitResult::Unchanged => {
+ // We do not report a telemetry event for reopening the circuit
+ // as it is redundant because it is always preceded by an "opened"
+ // event, or when there is no state change.
+ }
+ }
+
+ exit_result
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+#[cfg(not(miri))]
+mod tests {
+ use std::time::Instant;
+
+ use opentelemetry::KeyValue;
+
+ use super::*;
+ use crate::circuit_breaker::{EngineFake, HealthInfo, Stats};
+ use crate::metrics::{create_meter, create_resilience_event_counter};
+ use crate::testing::MetricTester;
+
+ #[test]
+ fn enter_rejected_ensure_telemetry() {
+ let (tester, telemetry_engine) = create_engine(EngineFake::new(
+ EnterCircuitResult::Rejected,
+ ExitCircuitResult::Closed(Stats::new(Instant::now())),
+ ));
+
+ let _ = telemetry_engine.enter();
+
+ tester.assert_attributes(
+ &[
+ KeyValue::new(PIPELINE_NAME, "test_pipeline"),
+ KeyValue::new(STRATEGY_NAME, "test_strategy"),
+ KeyValue::new(EVENT_NAME, CIRCUIT_REJECTED_EVENT_NAME),
+ KeyValue::new(CIRCUIT_PARTITION, "test_partition"),
+ KeyValue::new(CIRCUIT_STATE, CircuitState::Open.as_str()),
+ ],
+ Some(5),
+ );
+ }
+
+ #[test]
+ fn exit_probe_ensure_telemetry() {
+ let (tester, telemetry_engine) = create_engine(EngineFake::new(
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal,
+ },
+ ExitCircuitResult::Unchanged,
+ ));
+
+ let _ = telemetry_engine.exit(ExecutionResult::Success, ExecutionMode::Probe);
+
+ tester.assert_attributes(
+ &[
+ KeyValue::new(PIPELINE_NAME, "test_pipeline"),
+ KeyValue::new(STRATEGY_NAME, "test_strategy"),
+ KeyValue::new(EVENT_NAME, CIRCUIT_PROBE_EVENT_NAME),
+ KeyValue::new(CIRCUIT_PARTITION, "test_partition"),
+ KeyValue::new(CIRCUIT_STATE, CircuitState::HalfOpen.as_str()),
+ KeyValue::new(CIRCUIT_PROBE_RESULT, ExecutionResult::Success.as_str()),
+ ],
+ Some(6),
+ );
+ }
+
+ #[test]
+ fn circuit_closed_ensure_telemetry() {
+ let (tester, telemetry_engine) = create_engine(EngineFake::new(
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal,
+ },
+ ExitCircuitResult::Closed(Stats::new(Instant::now())),
+ ));
+
+ let _ = telemetry_engine.exit(ExecutionResult::Success, ExecutionMode::Normal);
+
+ tester.assert_attributes(
+ &[
+ KeyValue::new(PIPELINE_NAME, "test_pipeline"),
+ KeyValue::new(STRATEGY_NAME, "test_strategy"),
+ KeyValue::new(EVENT_NAME, CIRCUIT_CLOSED_EVENT_NAME),
+ KeyValue::new(CIRCUIT_PARTITION, "test_partition"),
+ KeyValue::new(CIRCUIT_STATE, CircuitState::Closed.as_str()),
+ ],
+ Some(5),
+ );
+ }
+
+ #[test]
+ fn circuit_opened_ensure_telemetry() {
+ let (tester, telemetry_engine) = create_engine(EngineFake::new(
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Normal,
+ },
+ ExitCircuitResult::Opened(HealthInfo::new(1, 0, 0.75, 100)),
+ ));
+
+ let _ = telemetry_engine.exit(ExecutionResult::Failure, ExecutionMode::Normal);
+ tester.assert_attributes(
+ &[
+ KeyValue::new(PIPELINE_NAME, "test_pipeline"),
+ KeyValue::new(STRATEGY_NAME, "test_strategy"),
+ KeyValue::new(EVENT_NAME, CIRCUIT_OPENED_EVENT_NAME),
+ KeyValue::new(CIRCUIT_PARTITION, "test_partition"),
+ KeyValue::new(CIRCUIT_STATE, CircuitState::Open.as_str()),
+ ],
+ Some(5),
+ );
+ }
+
+ fn create_engine(engine: EngineFake) -> (MetricTester, EngineTelemetry) {
+ let tester = MetricTester::new();
+ let telemetry = TelemetryHelper {
+ pipeline_name: "test_pipeline".into(),
+ strategy_name: "test_strategy".into(),
+ event_reporter: Some(create_resilience_event_counter(&create_meter(tester.meter_provider()))),
+ logs_enabled: true,
+ };
+ let telemetry_engine = EngineTelemetry::new(engine, telemetry, "test_partition".into(), Clock::new_frozen());
+ (tester, telemetry_engine)
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/engines.rs b/crates/seatbelt/src/circuit_breaker/engine/engines.rs
new file mode 100644
index 00000000..b4ee1729
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/engines.rs
@@ -0,0 +1,97 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use tick::Clock;
+
+use crate::circuit_breaker::constants::ERR_POISONED_LOCK;
+use crate::circuit_breaker::{Engine, EngineCore, EngineOptions, EngineTelemetry, PartitionKey};
+use crate::utils::TelemetryHelper;
+
+/// Manages circuit breaker engines for different partition keys.
+#[derive(Debug)]
+pub(crate) struct Engines {
+ map: Mutex>>,
+ engine_options: EngineOptions,
+ clock: Clock,
+ telemetry: TelemetryHelper,
+}
+
+impl Engines {
+ pub fn new(engine_options: EngineOptions, clock: Clock, telemetry: TelemetryHelper) -> Self {
+ Self {
+ map: Mutex::new(HashMap::new()),
+ engine_options,
+ clock,
+ telemetry,
+ }
+ }
+
+ pub fn get_engine(&self, key: &PartitionKey) -> Arc {
+ let mut map = self.map.lock().expect(ERR_POISONED_LOCK);
+
+ if let Some(engine) = map.get(key) {
+ return Arc::clone(engine);
+ }
+
+ let engine = Arc::new(self.create_engine(key));
+ map.insert(key.clone(), Arc::clone(&engine));
+ engine
+ }
+
+ #[cfg(test)]
+ fn len(&self) -> usize {
+ let map = self.map.lock().expect(ERR_POISONED_LOCK);
+ map.len()
+ }
+
+ fn create_engine(&self, key: &PartitionKey) -> Engine {
+ EngineTelemetry::new(
+ EngineCore::new(self.engine_options.clone(), self.clock.clone()),
+ self.telemetry.clone(),
+ key.clone().into(),
+ self.clock.clone(),
+ )
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use super::*;
+ use crate::circuit_breaker::HealthMetricsBuilder;
+ use crate::circuit_breaker::engine::probing::ProbesOptions;
+ use crate::metrics::create_resilience_event_counter;
+
+ #[test]
+ fn get_engine_ok() {
+ let telemetry = TelemetryHelper {
+ pipeline_name: "pipeline".into(),
+ strategy_name: "strategy".into(),
+ event_reporter: Some(create_resilience_event_counter(&opentelemetry::global::meter("test"))),
+ logs_enabled: false,
+ };
+ let engines = Engines::new(
+ EngineOptions {
+ break_duration: Duration::from_secs(60),
+ health_metrics_builder: HealthMetricsBuilder::new(Duration::from_millis(100), 0.5, 5),
+ probes: ProbesOptions::quick(Duration::from_secs(1)),
+ },
+ Clock::new_frozen(),
+ telemetry,
+ );
+
+ assert!(Arc::ptr_eq(
+ &engines.get_engine(&PartitionKey::from("test")),
+ &engines.get_engine(&PartitionKey::from("test"))
+ ));
+ assert_eq!(engines.len(), 1);
+
+ _ = engines.get_engine(&PartitionKey::from("test2"));
+ assert_eq!(engines.len(), 2);
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/mod.rs b/crates/seatbelt/src/circuit_breaker/engine/mod.rs
new file mode 100644
index 00000000..940cfdb8
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/mod.rs
@@ -0,0 +1,112 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::fmt::Debug;
+use std::time::Duration;
+
+use crate::circuit_breaker::{ExecutionResult, HealthInfo, HealthMetricsBuilder};
+
+pub(super) mod probing;
+
+#[cfg(any(feature = "metrics", feature = "logs", test))]
+#[derive(Debug, Copy, Clone)]
+pub(crate) enum CircuitState {
+ Closed,
+ Open,
+ HalfOpen,
+}
+
+#[cfg(any(feature = "metrics", feature = "logs", test))]
+impl CircuitState {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::Closed => "closed",
+ Self::Open => "open",
+ Self::HalfOpen => "half_open",
+ }
+ }
+}
+
+/// Result of attempting to enter the circuit.
+#[derive(Debug, Clone)]
+pub(crate) enum EnterCircuitResult {
+ /// The operation is allowed to proceed.
+ ///
+ /// The `probe` indicates that this is a test operation used to evaluate whether
+ /// the circuit can be closed again.
+ Accepted { mode: ExecutionMode },
+
+ /// Operation is rejected due to open circuit.
+ Rejected,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) enum ExitCircuitResult {
+ /// The state remains unchanged.
+ Unchanged,
+
+ /// Circuit transitioned to Open state.
+ Opened(HealthInfo),
+
+ /// Circuit re-transitioned to Open state due to a failure in Half-Open state.
+ Reopened,
+
+ /// Circuit transitioned back to Closed state.
+ Closed(Stats),
+}
+
+/// Configuration options for the circuit breaker engine.
+#[derive(Debug, Clone)]
+pub(crate) struct EngineOptions {
+ pub break_duration: Duration,
+ pub health_metrics_builder: HealthMetricsBuilder,
+ pub probes: probing::ProbesOptions,
+}
+
+/// Determines the mode of execution for an operation.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub(crate) enum ExecutionMode {
+ /// Regular operation.
+ Normal,
+
+ /// A probe operation to test the health of the underlying service.
+ Probe,
+}
+
+// Type alias for the default engine with telemetry.
+pub type Engine = EngineTelemetry;
+
+/// Trait defining the behavior of a circuit breaker engine.
+pub(crate) trait CircuitEngine: Debug + Send + Sync + 'static {
+ fn enter(&self) -> EnterCircuitResult;
+
+ fn exit(&self, result: ExecutionResult, mode: ExecutionMode) -> ExitCircuitResult;
+}
+
+mod engine_core;
+pub(crate) use engine_core::*;
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(all(test, not(miri)))]
+mod engine_fake;
+#[cfg(all(test, not(miri)))]
+pub(crate) use engine_fake::*;
+
+mod engine_telemetry;
+pub(crate) use engine_telemetry::*;
+
+mod engines;
+pub(crate) use engines::*;
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_circuit_state_as_str() {
+ assert_eq!(CircuitState::Closed.as_str(), "closed");
+ assert_eq!(CircuitState::Open.as_str(), "open");
+ assert_eq!(CircuitState::HalfOpen.as_str(), "half_open");
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/probing/health_probe.rs b/crates/seatbelt/src/circuit_breaker/engine/probing/health_probe.rs
new file mode 100644
index 00000000..b1496ec6
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/probing/health_probe.rs
@@ -0,0 +1,201 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Instant;
+
+use crate::circuit_breaker::engine::probing::{AllowProbeResult, HealthProbeOptions, ProbeOperation, ProbingResult};
+use crate::circuit_breaker::{ExecutionResult, HealthMetrics, HealthStatus};
+use crate::rnd::Rnd;
+
+#[derive(Debug)]
+pub(crate) struct HealthProbe {
+ options: HealthProbeOptions,
+ metrics: HealthMetrics,
+ fallback_after: Option,
+ sample_until: Option,
+ rnd: Rnd,
+}
+
+impl ProbeOperation for HealthProbe {
+ fn allow_probe(&mut self, now: Instant) -> AllowProbeResult {
+ // Sampling starts with the first probe attempt. Make sure relevant timestamps are set.
+ let sample_until = *self.sample_until.get_or_insert_with(|| now + self.options.stage_duration());
+
+ // Fallback probe is allowed only after the sampling duration has elapsed.
+ let fallback_after = *self.fallback_after.get_or_insert(sample_until);
+
+ // Allow probe based on the probing ratio.
+ if self.rnd.next_f64() < self.options.probing_ratio {
+ return AllowProbeResult::Accepted;
+ }
+
+ // Allow fallback probe to get through if we are past the sampling duration.
+ // This can happen if the traffic is very low and no probes were allowed
+ // by the rate sampling. This allows making progress in low-traffic scenarios
+ // as a last resort.
+ if now > fallback_after {
+ // Allow additional fallback probes only after another sampling duration
+ // in case allowed probe did not result in a recorded execution (e.g., due to timeout).
+ self.fallback_after = Some(now + self.options.stage_duration());
+ return AllowProbeResult::Accepted;
+ }
+
+ AllowProbeResult::Rejected
+ }
+
+ fn record(&mut self, result: ExecutionResult, now: Instant) -> ProbingResult {
+ // Always record the result
+ self.metrics.record(result, now);
+
+ // If we are still sampling, we cannot make a decision yet
+ if self.keep_sampling(now) {
+ return ProbingResult::Pending;
+ }
+
+ // Sampling duration elapsed, use the health metrics to determine the result
+ match self.metrics.health_info().status() {
+ HealthStatus::Healthy => ProbingResult::Success,
+ HealthStatus::Unhealthy => ProbingResult::Failure,
+ }
+ }
+}
+
+impl HealthProbe {
+ pub fn new(options: HealthProbeOptions) -> Self {
+ Self {
+ metrics: options.builder.build(),
+ options,
+ fallback_after: None,
+ sample_until: None,
+ rnd: Rnd::Real,
+ }
+ }
+
+ fn keep_sampling(&self, now: Instant) -> bool {
+ match self.sample_until {
+ None => true,
+ Some(until) if now < until => true,
+ _ => false,
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use super::*;
+
+ #[test]
+ fn allow_probe_fallback() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.5, 0.1);
+ let mut probe = HealthProbe::new(options);
+ probe.rnd = Rnd::new_fixed(0.5);
+ let now = Instant::now();
+
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Rejected);
+
+ let later = now + Duration::from_secs(5);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Rejected);
+
+ // Allowed, because we are past the sampling duration and no probes were allowed yet
+ let later = now + Duration::from_secs(5) + Duration::from_micros(1);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Accepted);
+
+ // Not allowed, because we already let the fallback probe through
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Rejected);
+
+ // Allowed again
+ let later = now + Duration::from_secs(10) + Duration::from_micros(2);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Accepted);
+ }
+
+ #[test]
+ fn allow_probe_rejected_when_at_ratio() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.5, 0.1);
+ let mut probe = HealthProbe::new(options);
+ probe.rnd = Rnd::new_fixed(0.1);
+
+ assert_eq!(probe.allow_probe(Instant::now()), AllowProbeResult::Rejected);
+ }
+
+ #[test]
+ fn record_not_allowed_before() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.99, 0.1);
+ let mut probe = HealthProbe::new(options);
+ let now = Instant::now();
+
+ assert_eq!(probe.record(ExecutionResult::Success, now), ProbingResult::Pending,);
+
+ assert_eq!(probe.record(ExecutionResult::Success, now), ProbingResult::Pending,);
+
+ let status = probe.metrics.health_info();
+ assert_eq!(status.status(), HealthStatus::Healthy);
+ assert_eq!(status.throughput(), 2);
+ }
+
+ #[test]
+ fn allow_then_record_after_sampling_period_healthy() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.1, 1.0);
+ let mut probe = HealthProbe::new(options);
+ let now = Instant::now();
+
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+
+ // At the edge of a sampling period, success
+ assert_eq!(
+ probe.record(ExecutionResult::Success, now + Duration::from_secs(5)),
+ ProbingResult::Success,
+ );
+
+ assert_eq!(
+ probe.record(ExecutionResult::Success, now + Duration::from_secs(10)),
+ ProbingResult::Success,
+ );
+ }
+
+ #[test]
+ fn allow_then_record_after_sampling_period_unhealthy() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.1, 1.0);
+ let mut probe = HealthProbe::new(options);
+ let now = Instant::now();
+
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+ assert_eq!(
+ probe.record(ExecutionResult::Failure, now + Duration::from_secs(10)),
+ ProbingResult::Failure,
+ );
+ }
+
+ #[test]
+ fn record_multiple_ensure_health_evaluated() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.6, 1.0);
+ let mut probe = HealthProbe::new(options);
+ let now = Instant::now();
+
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+ assert_eq!(
+ probe.record(ExecutionResult::Success, now + Duration::from_secs(1)),
+ ProbingResult::Pending,
+ );
+ assert_eq!(
+ probe.record(ExecutionResult::Failure, now + Duration::from_secs(2)),
+ ProbingResult::Pending,
+ );
+ assert_eq!(
+ probe.record(ExecutionResult::Success, now + Duration::from_secs(6)),
+ ProbingResult::Success,
+ );
+
+ assert_eq!(
+ probe.record(ExecutionResult::Failure, now + Duration::from_secs(6)),
+ ProbingResult::Success,
+ );
+
+ assert_eq!(
+ probe.record(ExecutionResult::Failure, now + Duration::from_secs(6)),
+ ProbingResult::Failure,
+ );
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/probing/mod.rs b/crates/seatbelt/src/circuit_breaker/engine/probing/mod.rs
new file mode 100644
index 00000000..58a90dc4
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/probing/mod.rs
@@ -0,0 +1,186 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Probing mechanisms for circuit breakers.
+//!
+//! Probing is used to test if a service has recovered after a failure.
+//! Different probing strategies can be implemented by implementing the `ProbeOperation` trait.
+//!
+//! - Various probes can be combined in sequence using the [`Probes`] struct.
+//! - Unified view over various probe types is provided by the [`Probe`] enum.
+
+use std::fmt::Debug;
+use std::time::Instant;
+
+use crate::circuit_breaker::{EnterCircuitResult, ExecutionMode, ExecutionResult};
+
+mod health_probe;
+mod options;
+mod probes;
+mod single_probe;
+
+pub(crate) use health_probe::*;
+pub(crate) use options::*;
+pub(crate) use probes::*;
+pub(crate) use single_probe::*;
+
+/// Result of a probing attempt.
+#[derive(Debug, Copy, Clone, PartialEq)]
+pub(crate) enum ProbingResult {
+ /// Probing succeeded, no more probing needed.
+ Success,
+
+ /// Probing failed, circuit should remain open.
+ Failure,
+
+ /// Probing is still in progress, more probes are needed.
+ Pending,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+pub(crate) enum AllowProbeResult {
+ Accepted,
+ Rejected,
+}
+
+impl From for EnterCircuitResult {
+ fn from(value: AllowProbeResult) -> Self {
+ match value {
+ AllowProbeResult::Accepted => Self::Accepted {
+ mode: ExecutionMode::Probe,
+ },
+ AllowProbeResult::Rejected => Self::Rejected,
+ }
+ }
+}
+
+/// Trait defining the behavior of a probing mechanism in a circuit breaker.
+pub(crate) trait ProbeOperation: Send + Sync + Debug + 'static {
+ fn allow_probe(&mut self, now: Instant) -> AllowProbeResult;
+
+ fn record(&mut self, result: ExecutionResult, now: Instant) -> ProbingResult;
+}
+
+/// View over multiple probe types.
+#[derive(Debug)]
+pub(crate) enum Probe {
+ Single(SingleProbe),
+ Health(HealthProbe),
+}
+
+impl Probe {
+ pub fn new(options: ProbeOptions) -> Self {
+ match options {
+ ProbeOptions::SingleProbe { cooldown } => Self::Single(SingleProbe::new(cooldown)),
+ ProbeOptions::HealthProbe(options) => Self::Health(HealthProbe::new(options)),
+ }
+ }
+}
+
+impl ProbeOperation for Probe {
+ fn allow_probe(&mut self, now: Instant) -> AllowProbeResult {
+ match self {
+ Self::Single(probe) => probe.allow_probe(now),
+ Self::Health(health) => health.allow_probe(now),
+ }
+ }
+
+ /// Record the result of a probing attempt.
+ ///
+ /// Once the probe reports success or failure, it is considered complete and
+ /// should never be used again.
+ fn record(&mut self, result: ExecutionResult, now: Instant) -> ProbingResult {
+ match self {
+ Self::Single(probe) => probe.record(result, now),
+ Self::Health(health) => health.record(result, now),
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use super::*;
+
+ #[test]
+ fn probe_new_creates_single_probe() {
+ let cooldown = Duration::from_secs(5);
+ let probe = Probe::new(ProbeOptions::SingleProbe { cooldown });
+ assert!(matches!(probe, Probe::Single(duration) if duration.probe_cooldown() == cooldown));
+ }
+
+ #[test]
+ fn probe_allow_probe_delegates_to_inner() {
+ let mut probe = Probe::new(ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(5),
+ });
+ let now = Instant::now();
+
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Rejected);
+ }
+
+ #[test]
+ fn probe_record_delegates_to_inner() {
+ let mut probe = Probe::new(ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(5),
+ });
+ let now = Instant::now();
+
+ assert_eq!(probe.record(ExecutionResult::Success, now), ProbingResult::Success);
+ assert_eq!(probe.record(ExecutionResult::Failure, now), ProbingResult::Failure);
+ }
+
+ #[test]
+ fn allow_probe_result_to_enter_circuit_result_ok() {
+ assert!(matches!(
+ EnterCircuitResult::from(AllowProbeResult::Accepted),
+ EnterCircuitResult::Accepted {
+ mode: ExecutionMode::Probe
+ }
+ ));
+
+ assert!(matches!(
+ EnterCircuitResult::from(AllowProbeResult::Rejected),
+ EnterCircuitResult::Rejected
+ ));
+ }
+
+ #[test]
+ fn probe_new_creates_health_probe() {
+ let options = HealthProbeOptions::new(Duration::from_secs(10), 0.2, 0.5);
+ let probe = Probe::new(ProbeOptions::HealthProbe(options));
+ assert!(matches!(probe, Probe::Health(_)));
+ }
+
+ #[test]
+ fn probe_health_allow_probe_delegates_to_inner() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.2, 1.0);
+ let mut probe = Probe::new(ProbeOptions::HealthProbe(options));
+ let now = Instant::now();
+
+ // With probing_ratio=1.0, all probes should be accepted
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+ }
+
+ #[test]
+ fn probe_health_record_delegates_to_inner() {
+ let options = HealthProbeOptions::new(Duration::from_secs(5), 0.2, 1.0);
+ let mut probe = Probe::new(ProbeOptions::HealthProbe(options));
+ let now = Instant::now();
+
+ // allow_probe initializes the sampling period
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+
+ // Record before sampling period ends returns Pending
+ assert_eq!(probe.record(ExecutionResult::Success, now), ProbingResult::Pending);
+
+ // Record after sampling period with success returns Success
+ assert_eq!(
+ probe.record(ExecutionResult::Success, now + Duration::from_secs(5)),
+ ProbingResult::Success
+ );
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/probing/options.rs b/crates/seatbelt/src/circuit_breaker/engine/probing/options.rs
new file mode 100644
index 00000000..a06ba905
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/probing/options.rs
@@ -0,0 +1,218 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Duration;
+use std::vec::IntoIter;
+
+use crate::circuit_breaker::HealthMetricsBuilder;
+
+/// The minimum throughput during probing stage is set to 1, so at least one request must come
+/// through in each probing stage to evaluate the health.
+const MIN_THROUGHPUT: u32 = 1;
+
+/// Options for a single probe type.
+#[derive(Debug, Clone)]
+pub(crate) enum ProbeOptions {
+ /// A single probe that allows one probe.
+ ///
+ /// After the initial probe is allowed, it enters a cool-down period during which
+ /// no further probes are allowed.
+ SingleProbe { cooldown: Duration },
+
+ /// A health-based probe that uses health metrics to determine the health of the system.
+ HealthProbe(HealthProbeOptions),
+}
+
+/// Configuration options for the probing mechanism.
+#[derive(Debug, Clone)]
+pub(crate) struct ProbesOptions {
+ probes: Vec,
+}
+
+impl ProbesOptions {
+ pub fn quick(cooldown: Duration) -> Self {
+ Self::new([ProbeOptions::SingleProbe { cooldown }])
+ }
+
+ pub fn reliable(stage_duration: Duration, failure_threshold: f32) -> Self {
+ Self::gradual(&[0.001, 0.01, 0.05, 0.1, 0.25, 0.5], stage_duration, failure_threshold)
+ }
+
+ pub fn gradual(probing_ratio: &[f64], stage_duration: Duration, failure_threshold: f32) -> Self {
+ // Start with a single probe
+ let initial = std::iter::once(ProbeOptions::SingleProbe { cooldown: stage_duration });
+
+ // Then continue with health-based probes
+ let health = probing_ratio
+ .iter()
+ .map(|probing_ratio| ProbeOptions::HealthProbe(HealthProbeOptions::new(stage_duration, failure_threshold, *probing_ratio)));
+
+ Self::new(initial.chain(health))
+ }
+
+ pub fn new(probes: impl IntoIterator- ) -> Self {
+ let probes: Vec = probes.into_iter().collect();
+ assert!(!probes.is_empty(), "the probes list cannot be empty");
+ Self { probes }
+ }
+
+ pub fn probes(&self) -> IntoIter {
+ self.probes.clone().into_iter()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct HealthProbeOptions {
+ pub(super) builder: HealthMetricsBuilder,
+ pub(super) probing_ratio: f64,
+}
+
+impl HealthProbeOptions {
+ pub fn new(stage_duration: Duration, failure_threshold: f32, probing_ratio: f64) -> Self {
+ assert!(probing_ratio > 0.0 && probing_ratio <= 1.0, "probing_ratio must be in (0.0, 1.0]");
+ assert!((0.0..1.0).contains(&failure_threshold), "failure_threshold must be in [0.0, 1.0)");
+ assert!(stage_duration > Duration::ZERO, "stage_duration must be greater than zero");
+
+ Self {
+ // The min throughput is set to 0, so if no requests come in during the probing stage,
+ // the health will be considered healthy by default.
+ builder: HealthMetricsBuilder::new(stage_duration, failure_threshold, MIN_THROUGHPUT),
+ probing_ratio,
+ }
+ }
+
+ pub fn stage_duration(&self) -> Duration {
+ self.builder.sampling_duration
+ }
+
+ #[cfg(test)]
+ pub fn failure_threshold(&self) -> f32 {
+ self.builder.failure_threshold
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use static_assertions::assert_impl_all;
+
+ use super::*;
+
+ assert_impl_all!(ProbeOptions: Clone, std::fmt::Debug);
+ assert_impl_all!(ProbesOptions: Clone, std::fmt::Debug);
+ #[test]
+ fn single_probe_constructor_creates_correct_options() {
+ let cooldown = Duration::from_secs(15);
+ let options = ProbesOptions::quick(cooldown);
+ let probes: Vec<_> = options.probes().collect();
+
+ assert_eq!(probes.len(), 1);
+ assert!(matches!(
+ &probes[0],
+ ProbeOptions::SingleProbe { cooldown: c } if *c == Duration::from_secs(15)
+ ));
+ }
+
+ #[test]
+ fn new_accepts_multiple_probes() {
+ let options = ProbesOptions::new([
+ ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(10),
+ },
+ ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(20),
+ },
+ ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(30),
+ },
+ ]);
+
+ let probes: Vec<_> = options.probes().collect();
+ assert_eq!(probes.len(), 3);
+ assert!(matches!(&probes[0], ProbeOptions::SingleProbe { cooldown } if *cooldown == Duration::from_secs(10)));
+ assert!(matches!(&probes[1], ProbeOptions::SingleProbe { cooldown } if *cooldown == Duration::from_secs(20)));
+ assert!(matches!(&probes[2], ProbeOptions::SingleProbe { cooldown } if *cooldown == Duration::from_secs(30)));
+ }
+
+ #[test]
+ fn clone_preserves_probe_count() {
+ let options = ProbesOptions::quick(Duration::from_secs(25));
+ let cloned = options.clone();
+
+ assert_eq!(options.probes().count(), cloned.probes().count());
+ }
+
+ #[test]
+ fn probes_iterator_is_reusable() {
+ let options = ProbesOptions::quick(Duration::from_secs(30));
+
+ assert_eq!(options.probes().count(), 1);
+ assert_eq!(options.probes().count(), 1);
+ }
+
+ #[test]
+ #[should_panic(expected = "the probes list cannot be empty")]
+ fn new_panics_with_empty_iterator() {
+ let _ = ProbesOptions::new(Vec::::new());
+ }
+
+ #[test]
+ #[expect(clippy::float_cmp, reason = "Test")]
+ fn health_probe_options_ctor_ok() {
+ let sampling_duration = Duration::from_secs(60);
+ let failure_threshold = 0.2;
+ let probing_ratio = 0.1;
+
+ let options = HealthProbeOptions::new(sampling_duration, failure_threshold, probing_ratio);
+
+ assert_eq!(options.stage_duration(), sampling_duration);
+ assert_eq!(options.probing_ratio, probing_ratio);
+ assert_eq!(options.builder.failure_threshold, failure_threshold);
+ assert_eq!(options.builder.min_throughput, 1);
+ }
+
+ #[should_panic(expected = "stage_duration must be greater than zero")]
+ #[test]
+ fn health_probe_options_ctor_sampling_duration() {
+ let _ = HealthProbeOptions::new(Duration::ZERO, 0.1, 0.5);
+ }
+
+ #[should_panic(expected = "failure_threshold must be in [0.0, 1.0)")]
+ #[test]
+ fn health_probe_options_ctor_failure_threshold() {
+ let _ = HealthProbeOptions::new(Duration::from_secs(10), 1.0, 0.5);
+ }
+
+ #[should_panic(expected = "probing_ratio must be in (0.0, 1.0]")]
+ #[test]
+ fn health_probe_options_ctor_probing_ratio() {
+ let _ = HealthProbeOptions::new(Duration::from_secs(10), 0.1, 0.0);
+ }
+
+ #[test]
+ #[expect(clippy::float_cmp, reason = "Test")]
+ fn probes_options_reliable_ok() {
+ let options = ProbesOptions::reliable(Duration::from_secs(30), 0.2);
+ let probes: Vec<_> = options.probes().collect();
+
+ assert_eq!(probes.len(), 7);
+ assert!(matches!(
+ &probes[0],
+ ProbeOptions::SingleProbe { cooldown } if *cooldown == Duration::from_secs(30)
+ ));
+
+ let expected_ratios = [0.001, 0.01, 0.05, 0.1, 0.25, 0.5];
+ for (i, ratio) in expected_ratios.iter().enumerate() {
+ let probe = &probes[i + 1];
+
+ match probe {
+ ProbeOptions::HealthProbe(options) => {
+ assert_eq!(options.builder.sampling_duration, Duration::from_secs(30));
+ assert_eq!(options.builder.failure_threshold, 0.2);
+ assert_eq!(options.probing_ratio, *ratio);
+ }
+ ProbeOptions::SingleProbe { .. } => panic!("expected HealthProbe"),
+ }
+ }
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/probing/probes.rs b/crates/seatbelt/src/circuit_breaker/engine/probing/probes.rs
new file mode 100644
index 00000000..2b9529fd
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/probing/probes.rs
@@ -0,0 +1,97 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Instant;
+use std::vec;
+
+use super::{AllowProbeResult, Probe, ProbeOperation, ProbeOptions, ProbesOptions, ProbingResult};
+use crate::circuit_breaker::ExecutionResult;
+
+/// Manages a sequence of probes.
+#[derive(Debug)]
+pub(crate) struct Probes {
+ probes: vec::IntoIter,
+ current: Probe,
+}
+
+impl Probes {
+ pub fn new(options: &ProbesOptions) -> Self {
+ let mut probes = options.probes();
+ let probe = probes.next().expect("probes are never empty because ProbesOptions enforces that");
+
+ Self {
+ probes,
+ current: Probe::new(probe),
+ }
+ }
+
+ pub fn allow_probe(&mut self, now: Instant) -> AllowProbeResult {
+ self.current.allow_probe(now)
+ }
+
+ pub fn record(&mut self, result: ExecutionResult, now: Instant) -> ProbingResult {
+ match self.current.record(result, now) {
+ ProbingResult::Success => {
+ // check if there are more probes to try
+ match self.probes.next() {
+ Some(probe) => {
+ self.current = Probe::new(probe);
+ ProbingResult::Pending
+ }
+ None => ProbingResult::Success,
+ }
+ }
+ ProbingResult::Pending => ProbingResult::Pending,
+ ProbingResult::Failure => ProbingResult::Failure,
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+
+ use std::time::Duration;
+
+ use tick::Clock;
+
+ use super::*;
+ use crate::circuit_breaker::engine::probing::HealthProbeOptions;
+
+ #[test]
+ fn multiple_probes_ok() {
+ let options = ProbesOptions::new([
+ ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(1),
+ },
+ ProbeOptions::SingleProbe {
+ cooldown: Duration::from_secs(2),
+ },
+ ]);
+ let mut probes = Probes::new(&options);
+ let now = Instant::now();
+
+ assert_eq!(probes.allow_probe(now), AllowProbeResult::Accepted);
+ assert_eq!(probes.allow_probe(now), AllowProbeResult::Rejected);
+ assert_eq!(probes.record(ExecutionResult::Success, now), ProbingResult::Pending);
+
+ assert_eq!(probes.allow_probe(now), AllowProbeResult::Accepted);
+ assert_eq!(probes.record(ExecutionResult::Success, now), ProbingResult::Success);
+
+ assert!(probes.probes.next().is_none());
+ }
+
+ #[test]
+ fn record_returns_pending_when_probe_returns_pending() {
+ let now = Clock::new_frozen().instant();
+
+ let options = ProbesOptions::new([ProbeOptions::HealthProbe(HealthProbeOptions::new(Duration::from_secs(5), 0.2, 1.0))]);
+ let mut probes = Probes::new(&options);
+
+ // Initialize sampling period
+ assert_eq!(probes.allow_probe(now), AllowProbeResult::Accepted);
+
+ // Record during sampling period returns Pending
+ assert_eq!(probes.record(ExecutionResult::Success, now), ProbingResult::Pending);
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/engine/probing/single_probe.rs b/crates/seatbelt/src/circuit_breaker/engine/probing/single_probe.rs
new file mode 100644
index 00000000..523b5bd0
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/engine/probing/single_probe.rs
@@ -0,0 +1,111 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::{Duration, Instant};
+
+use super::{AllowProbeResult, ProbeOperation, ProbingResult};
+use crate::circuit_breaker::ExecutionResult;
+
+/// Allows a single probe to get in and based on the result either closes the circuit
+/// or goes back to open state.
+#[derive(Debug, Clone)]
+pub(crate) struct SingleProbe {
+ probe_cooldown: Duration,
+ entered_at: Option,
+}
+
+impl SingleProbe {
+ pub fn new(probe_cooldown: Duration) -> Self {
+ Self {
+ probe_cooldown,
+ entered_at: None,
+ }
+ }
+
+ #[cfg(test)]
+ pub fn probe_cooldown(&self) -> Duration {
+ self.probe_cooldown
+ }
+}
+
+impl ProbeOperation for SingleProbe {
+ fn allow_probe(&mut self, now: Instant) -> AllowProbeResult {
+ match self.entered_at {
+ // First probe attempt - record the timestamp to start the cool-down period
+ None => {
+ self.entered_at = Some(now);
+ AllowProbeResult::Accepted
+ }
+ // Cool-down has elapsed, allow the probe and reset the cool-down timer.
+ // We allow additional probe after the cool-down period to handle the case
+ // where the probe result is not recorded due to future being dropped.
+ Some(entered_at) if now.saturating_duration_since(entered_at) > self.probe_cooldown => {
+ self.entered_at = Some(now);
+ AllowProbeResult::Accepted
+ }
+ Some(_) => AllowProbeResult::Rejected,
+ }
+ }
+
+ fn record(&mut self, result: ExecutionResult, _now: Instant) -> ProbingResult {
+ match result {
+ ExecutionResult::Success => ProbingResult::Success,
+ ExecutionResult::Failure => ProbingResult::Failure,
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn allow_probe_accepts_single_probe() {
+ let mut probe = SingleProbe::new(Duration::from_secs(5));
+ let now = Instant::now();
+
+ // The first probe should be accepted
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+
+ // The second probe immediately should be rejected
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Rejected);
+
+ // After 3 seconds, still should be rejected
+ let later = now + Duration::from_secs(3);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Rejected);
+
+ // After cooldown, the probe should be accepted again
+ let later = now + Duration::from_secs(6);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Accepted);
+ }
+
+ #[test]
+ fn allow_probe_check_bounds() {
+ let mut probe = SingleProbe::new(Duration::from_secs(5));
+ let now = Instant::now();
+
+ // The first probe should be accepted
+ assert_eq!(probe.allow_probe(now), AllowProbeResult::Accepted);
+
+ // After exactly cool-down duration, the probe should still be rejected
+ let later = now + Duration::from_secs(5);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Rejected);
+
+ // After cool-down + 1 microsecond, the probe should be accepted
+ let later = now + Duration::from_secs(5) + Duration::from_micros(1);
+ assert_eq!(probe.allow_probe(later), AllowProbeResult::Accepted);
+ }
+
+ #[test]
+ fn record_ensure_correct_result() {
+ let mut probe = SingleProbe::new(Duration::from_secs(5));
+ let now = Instant::now();
+
+ // Record a success
+ assert_eq!(probe.record(ExecutionResult::Success, now), ProbingResult::Success);
+
+ // Record a failure
+ assert_eq!(probe.record(ExecutionResult::Failure, now), ProbingResult::Failure);
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/execution_result.rs b/crates/seatbelt/src/circuit_breaker/execution_result.rs
new file mode 100644
index 00000000..2d38f8f7
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/execution_result.rs
@@ -0,0 +1,56 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use crate::RecoveryInfo;
+
+/// An evaluated execution result.
+///
+/// From the perspective of a circuit breaker, an execution can either
+/// succeed or fail. This enum captures that binary outcome.
+#[derive(Debug, PartialEq, Copy, Clone)]
+pub(crate) enum ExecutionResult {
+ Success,
+ Failure,
+}
+
+#[cfg(any(feature = "logs", feature = "metrics", test))]
+impl ExecutionResult {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::Success => "success",
+ Self::Failure => "failure",
+ }
+ }
+}
+
+impl ExecutionResult {
+ pub fn from_recovery(recovery: &RecoveryInfo) -> Self {
+ match recovery.kind() {
+ crate::RecoveryKind::Retry | crate::RecoveryKind::Unavailable => Self::Failure,
+ _ => Self::Success,
+ }
+ }
+}
+
+#[cfg_attr(coverage_nightly, coverage(off))]
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_execution_result_from_recovery() {
+ assert_eq!(ExecutionResult::from_recovery(&RecoveryInfo::retry()), ExecutionResult::Failure);
+ assert_eq!(
+ ExecutionResult::from_recovery(&RecoveryInfo::unavailable()),
+ ExecutionResult::Failure
+ );
+ assert_eq!(ExecutionResult::from_recovery(&RecoveryInfo::never()), ExecutionResult::Success);
+ assert_eq!(ExecutionResult::from_recovery(&RecoveryInfo::unknown()), ExecutionResult::Success);
+ }
+
+ #[test]
+ fn test_execution_result_as_str() {
+ assert_eq!(ExecutionResult::Success.as_str(), "success");
+ assert_eq!(ExecutionResult::Failure.as_str(), "failure");
+ }
+}
diff --git a/crates/seatbelt/src/circuit_breaker/half_open_mode.rs b/crates/seatbelt/src/circuit_breaker/half_open_mode.rs
new file mode 100644
index 00000000..13de619b
--- /dev/null
+++ b/crates/seatbelt/src/circuit_breaker/half_open_mode.rs
@@ -0,0 +1,173 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::time::Duration;
+
+use crate::circuit_breaker::constants::MIN_SAMPLING_DURATION;
+use crate::circuit_breaker::engine::probing::ProbesOptions;
+
+/// Defines the behavior of the circuit breaker when transitioning from half-open to closed state.
+///
+/// The half-open state is a transitional phase where the circuit breaker allows a limited number of
+/// requests to pass through to test if the underlying service has recovered. The chosen mode
+/// determines how aggressively the circuit breaker probes the service during this phase.
+///
+/// Currently, two modes are supported:
+///
+/// - [`HalfOpenMode::quick`]: Allows a single probe request to determine if the service has recovered.
+/// - [`HalfOpenMode::reliable`]: Gradually increases the percentage of probing requests over multiple stages (default).
+#[derive(Debug, Clone, PartialEq)]
+pub struct HalfOpenMode {
+ inner: Mode,
+}
+
+impl HalfOpenMode {
+ /// Allow quick recovery from half-open state with a single probe.
+ ///
+ /// This approach is less reliable compared to the [`HalfOpenMode::reliable`] mode, but
+ /// can close the circuit faster.
+ ///
+ /// The downside of this approach is that it relies on a single execution to determine
+ /// the health of the service. If that execution happens to succeed by chance, the circuit
+ /// closes and later requests may fail again, leading to instability and re-opening the circuit
+ /// again.
+ #[must_use]
+ pub fn quick() -> Self {
+ Self { inner: Mode::Quick }
+ }
+
+ /// Gradually increase the percentage of probing requests over multiple stages.
+ ///
+ /// This approach allows more requests to pass through in a controlled manner,
+ /// increasing the probing rate over time. This can help more reliably evaluate the
+ /// health of the underlying service over time rather than relying on a single execution.
+ ///
+ /// The pre-configured ratios for each probing stage are:
+ /// `0.1%, 1%, 5%, 10%, 25%, 50%`
+ ///
+ /// Each probing stage advances after the stage duration has elapsed, and the health
+ /// metrics indicate that the failure rate is below the configured threshold. If any probing stage
+ /// fails, the circuit reopens immediately and the cycle starts over.
+ ///
+ /// # Arguments
+ ///
+ /// - `stage_duration` - Optional custom stage duration for each probing stage. If not provided,
+ /// the value of [`break_duration`][crate::circuit_breaker::CircuitLayer::break_duration] is used. The provided stage
+ /// duration is clamped to a minimum of 1 second.
+ #[must_use]
+ pub fn reliable(stage_duration: impl Into