From 552b182826a165e27a31a07f197181c572cc18b1 Mon Sep 17 00:00:00 2001 From: Subhanshu Bansal Date: Mon, 30 Mar 2026 23:46:39 +0530 Subject: [PATCH] Added Circuit Breaker Module --- .../ktor-client-circuit-breaker/README.md | 146 ++++++ .../api/ktor-client-circuit-breaker.api | 29 ++ .../api/ktor-client-circuit-breaker.klib.api | 45 ++ .../build.gradle.kts | 18 + .../plugins/circuitbreaker/CircuitBreaker.kt | 366 +++++++++++++++ .../circuitbreaker/CircuitBreakerTest.kt | 438 ++++++++++++++++++ settings.gradle.kts | 1 + 7 files changed, 1043 insertions(+) create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/README.md create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.api create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.klib.api create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/build.gradle.kts create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/src/io/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt create mode 100644 ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/test/io/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/README.md b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/README.md new file mode 100644 index 00000000000..231042c9f88 --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/README.md @@ -0,0 +1,146 @@ +# ktor-client-circuit-breaker + +A Ktor HTTP client plugin implementing the [Circuit Breaker](https://learn.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker) pattern to prevent cascading failures in distributed systems. + +## Motivation + +The Ktor client ships with `HttpRequestRetry` and `HttpTimeout`, but has no built-in protection against repeatedly calling a failing downstream service. Without a circuit breaker, a single unhealthy dependency can exhaust connection pools, saturate thread pools, and cascade failures through the call graph. This plugin fills that gap. + +## How it works + +Each named circuit breaker is a state machine with three states: + +``` + failures >= threshold + ┌────────┐ ┌──────┐ + │ CLOSED ├─────────────────►│ OPEN │ + └───┬────┘ └──┬───┘ + │ │ resetTimeout elapsed + │ all trial requests │ + │ succeed ▼ + │ ┌───────────┐ + └────────────────────┤ HALF-OPEN │ + └─────┬─────┘ + │ any trial request fails + │ + ▼ + ┌──────┐ + │ OPEN │ + └──────┘ +``` + +- **Closed** -- Normal operation. Requests pass through. Consecutive failures are counted. A successful response resets the counter. When the counter reaches `failureThreshold`, the circuit trips to Open. +- **Open** -- The circuit is tripped. All requests are immediately rejected with `CircuitBreakerOpenException` without hitting the network. After `resetTimeout` elapses, the circuit transitions to Half-Open. +- **Half-Open** -- The circuit allows up to `halfOpenRequests` trial requests through. If all succeed, the circuit closes. If any fails, the circuit re-opens. + +## Installation + +Add the dependency (published alongside `ktor-client-core`): + +```kotlin +dependencies { + implementation("io.ktor:ktor-client-circuit-breaker:$ktor_version") +} +``` + +## Usage + +### Basic configuration + +```kotlin +val client = HttpClient(CIO) { + install(CircuitBreaker) { + register("payment-service") { + failureThreshold = 5 // open after 5 consecutive failures + resetTimeout = 30.seconds // wait 30s before probing + halfOpenRequests = 3 // allow 3 trial requests in half-open + } + } +} +``` + +Tag each request with the circuit breaker it belongs to: + +```kotlin +val response = client.get("https://payment.example.com/api/charge") { + circuitBreaker("payment-service") +} +``` + +### Multiple services + +```kotlin +install(CircuitBreaker) { + register("payment-service") { + failureThreshold = 5 + resetTimeout = 30.seconds + } + register("inventory-service") { + failureThreshold = 10 + resetTimeout = 1.minutes + } +} +``` + +Each circuit is independent -- tripping `payment-service` does not affect `inventory-service`. + +### Automatic routing by host + +Instead of tagging every request manually, route by host: + +```kotlin +install(CircuitBreaker) { + routeRequests { request -> request.url.host } + global { + failureThreshold = 5 + resetTimeout = 30.seconds + } +} +``` + +An explicit `circuitBreaker("name")` attribute on a request always takes priority over the router. + +### Custom failure detection + +By default, responses with status code >= 500 are treated as failures. Customize this per circuit: + +```kotlin +register("strict-service") { + failureThreshold = 3 + resetTimeout = 10.seconds + isFailure { response -> + response.status.value >= 400 + } +} +``` + +Exceptions thrown during the request (network errors, timeouts) always count as failures regardless of this predicate. + +### Handling rejections + +When the circuit is open, requests throw `CircuitBreakerOpenException`: + +```kotlin +try { + client.get("https://payment.example.com/api/charge") { + circuitBreaker("payment-service") + } +} catch (e: CircuitBreakerOpenException) { + // Circuit is open -- return a fallback or cached response + println("${e.circuitBreakerName} is unavailable (reset in ${e.resetTimeout})") +} +``` + +## Configuration reference + +| Property | Default | Description | +|---|---|---| +| `failureThreshold` | `5` | Consecutive failures required to trip the circuit | +| `resetTimeout` | `60s` | Duration the circuit stays open before transitioning to half-open | +| `halfOpenRequests` | `3` | Number of trial requests in the half-open state | +| `isFailure { }` | `status >= 500` | Predicate to classify a response as a failure | + +## Interaction with other plugins + +- **HttpRequestRetry** -- Install `HttpRequestRetry` *before* `CircuitBreaker` so the circuit breaker wraps the retry logic. This way the circuit sees the final outcome after all retries, and `CircuitBreakerOpenException` is not retried. +- **HttpTimeout** -- Timeout exceptions are counted as circuit breaker failures. diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.api b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.api new file mode 100644 index 00000000000..6f4fce43c9c --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.api @@ -0,0 +1,29 @@ +public final class io/ktor/client/plugins/circuitbreaker/CircuitBreakerConfig { + public fun ()V + public final fun global (Lkotlin/jvm/functions/Function1;)V + public final fun register (Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V + public final fun routeRequests (Lkotlin/jvm/functions/Function1;)V +} + +public final class io/ktor/client/plugins/circuitbreaker/CircuitBreakerKt { + public static final fun circuitBreaker (Lio/ktor/client/request/HttpRequestBuilder;Ljava/lang/String;)V + public static final fun getCircuitBreaker ()Lio/ktor/client/plugins/api/ClientPlugin; +} + +public final class io/ktor/client/plugins/circuitbreaker/CircuitBreakerOpenException : java/lang/IllegalStateException { + public synthetic fun (Ljava/lang/String;JLkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun getCircuitBreakerName ()Ljava/lang/String; + public final fun getResetTimeout-UwyO8pc ()J +} + +public final class io/ktor/client/plugins/circuitbreaker/ServiceCircuitBreakerConfig { + public fun ()V + public final fun getFailureThreshold ()I + public final fun getHalfOpenRequests ()I + public final fun getResetTimeout-UwyO8pc ()J + public final fun isFailure (Lkotlin/jvm/functions/Function1;)V + public final fun setFailureThreshold (I)V + public final fun setHalfOpenRequests (I)V + public final fun setResetTimeout-LRDsOJo (J)V +} + diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.klib.api b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.klib.api new file mode 100644 index 00000000000..c6412081f93 --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/api/ktor-client-circuit-breaker.klib.api @@ -0,0 +1,45 @@ +// Klib ABI Dump +// Targets: [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64] +// Rendering settings: +// - Signature version: 2 +// - Show manifest properties: true +// - Show declarations: true + +// Library unique name: +final class io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig { // io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig|null[0] + constructor () // io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig.|(){}[0] + + final fun global(kotlin/Function1) // io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig.global|global(kotlin.Function1){}[0] + final fun register(kotlin/String, kotlin/Function1) // io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig.register|register(kotlin.String;kotlin.Function1){}[0] + final fun routeRequests(kotlin/Function1) // io.ktor.client.plugins.circuitbreaker/CircuitBreakerConfig.routeRequests|routeRequests(kotlin.Function1){}[0] +} + +final class io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException : kotlin/IllegalStateException { // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException|null[0] + constructor (kotlin/String, kotlin.time/Duration) // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException.|(kotlin.String;kotlin.time.Duration){}[0] + + final val circuitBreakerName // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException.circuitBreakerName|{}circuitBreakerName[0] + final fun (): kotlin/String // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException.circuitBreakerName.|(){}[0] + final val resetTimeout // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException.resetTimeout|{}resetTimeout[0] + final fun (): kotlin.time/Duration // io.ktor.client.plugins.circuitbreaker/CircuitBreakerOpenException.resetTimeout.|(){}[0] +} + +final class io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig { // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig|null[0] + constructor () // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.|(){}[0] + + final var failureThreshold // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.failureThreshold|{}failureThreshold[0] + final fun (): kotlin/Int // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.failureThreshold.|(){}[0] + final fun (kotlin/Int) // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.failureThreshold.|(kotlin.Int){}[0] + final var halfOpenRequests // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.halfOpenRequests|{}halfOpenRequests[0] + final fun (): kotlin/Int // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.halfOpenRequests.|(){}[0] + final fun (kotlin/Int) // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.halfOpenRequests.|(kotlin.Int){}[0] + final var resetTimeout // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.resetTimeout|{}resetTimeout[0] + final fun (): kotlin.time/Duration // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.resetTimeout.|(){}[0] + final fun (kotlin.time/Duration) // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.resetTimeout.|(kotlin.time.Duration){}[0] + + final fun isFailure(kotlin/Function1) // io.ktor.client.plugins.circuitbreaker/ServiceCircuitBreakerConfig.isFailure|isFailure(kotlin.Function1){}[0] +} + +final val io.ktor.client.plugins.circuitbreaker/CircuitBreaker // io.ktor.client.plugins.circuitbreaker/CircuitBreaker|{}CircuitBreaker[0] + final fun (): io.ktor.client.plugins.api/ClientPlugin // io.ktor.client.plugins.circuitbreaker/CircuitBreaker.|(){}[0] + +final fun (io.ktor.client.request/HttpRequestBuilder).io.ktor.client.plugins.circuitbreaker/circuitBreaker(kotlin/String) // io.ktor.client.plugins.circuitbreaker/circuitBreaker|circuitBreaker@io.ktor.client.request.HttpRequestBuilder(kotlin.String){}[0] diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/build.gradle.kts b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/build.gradle.kts new file mode 100644 index 00000000000..dc4e75f5b4f --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/build.gradle.kts @@ -0,0 +1,18 @@ +/* + * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +description = "Ktor client Circuit Breaker support" + +plugins { + id("ktorbuild.project.client-plugin") +} + +kotlin { + sourceSets { + commonTest.dependencies { + implementation(projects.ktorClientMock) + implementation(projects.ktorTestDispatcher) + } + } +} diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/src/io/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/src/io/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt new file mode 100644 index 00000000000..01cbb29ea13 --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/src/io/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt @@ -0,0 +1,366 @@ +/* + * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package io.ktor.client.plugins.circuitbreaker + +import io.ktor.client.plugins.api.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.util.* +import io.ktor.util.logging.* +import io.ktor.utils.io.* +import kotlinx.coroutines.sync.* +import kotlin.time.* +import kotlin.time.Duration.Companion.seconds + +private val LOGGER = KtorSimpleLogger("io.ktor.client.plugins.circuitbreaker.CircuitBreaker") + +/** + * Configuration for an individual service circuit breaker. + * + * The circuit breaker operates as a state machine with three states: + * - **Closed**: Requests flow normally. Consecutive failures are tracked. + * - **Open**: All requests are rejected with [CircuitBreakerOpenException]. + * - **Half-Open**: A limited number of trial requests are allowed through to probe service health. + */ +@KtorDsl +public class ServiceCircuitBreakerConfig { + /** + * Number of consecutive failures required to trip the circuit from Closed to Open. + */ + public var failureThreshold: Int = 5 + + /** + * Duration the circuit remains in the Open state before transitioning to Half-Open. + */ + public var resetTimeout: Duration = 60.seconds + + /** + * Number of trial requests allowed in the Half-Open state. + * If all trial requests succeed, the circuit closes. + * If any trial request fails, the circuit re-opens. + */ + public var halfOpenRequests: Int = 3 + + internal var failurePredicate: (HttpResponse) -> Boolean = { it.status.value >= 500 } + + /** + * Defines a custom predicate to classify a response as a failure. + * By default, any response with a status code >= 500 is treated as a failure. + * + * ```kotlin + * register("payment-service") { + * isFailure { response -> + * response.status.value >= 400 + * } + * } + * ``` + */ + public fun isFailure(predicate: (HttpResponse) -> Boolean) { + failurePredicate = predicate + } + + internal fun validate() { + require(failureThreshold > 0) { "failureThreshold must be positive, got $failureThreshold" } + require(halfOpenRequests > 0) { "halfOpenRequests must be positive, got $halfOpenRequests" } + require(resetTimeout.isPositive()) { "resetTimeout must be positive, got $resetTimeout" } + } +} + +/** + * Configuration for the [CircuitBreaker] client plugin. + * + * Use [register] to define named circuit breakers for different services: + * ```kotlin + * install(CircuitBreaker) { + * register("payment-service") { + * failureThreshold = 5 + * resetTimeout = 30.seconds + * halfOpenRequests = 3 + * } + * register("inventory-service") { + * failureThreshold = 10 + * resetTimeout = 1.minutes + * } + * } + * ``` + * + * Tag individual requests with a circuit breaker name using [circuitBreaker]: + * ```kotlin + * client.get("https://api.example.com/pay") { + * circuitBreaker("payment-service") + * } + * ``` + */ +@KtorDsl +public class CircuitBreakerConfig { + internal val circuits: MutableMap = mutableMapOf() + internal var globalConfig: ServiceCircuitBreakerConfig = ServiceCircuitBreakerConfig() + internal var requestRouter: ((HttpRequestBuilder) -> String?)? = null + internal var timeSource: TimeSource.WithComparableMarks = TimeSource.Monotonic + + /** + * Registers a named circuit breaker with custom configuration. + * + * @param name unique identifier for this circuit breaker, used to associate requests via [circuitBreaker] + * @param block configuration block for this circuit breaker's behavior + */ + public fun register(name: String, block: ServiceCircuitBreakerConfig.() -> Unit) { + circuits[name] = ServiceCircuitBreakerConfig().apply(block) + } + + /** + * Configures default settings for circuit breakers that are created dynamically + * (for example, via [routeRequests] for names not explicitly [register]ed). + */ + public fun global(block: ServiceCircuitBreakerConfig.() -> Unit) { + globalConfig.apply(block) + } + + /** + * Installs a request router that automatically determines which circuit breaker to apply + * to each request. The router receives the [HttpRequestBuilder] and returns the circuit + * breaker name, or `null` to skip circuit breaking. + * + * An explicit [circuitBreaker] attribute on a request takes priority over this router. + * + * ```kotlin + * install(CircuitBreaker) { + * routeRequests { request -> request.url.host } + * } + * ``` + */ + public fun routeRequests(router: (HttpRequestBuilder) -> String?) { + requestRouter = router + } +} + +internal enum class CircuitState { + CLOSED, + OPEN, + HALF_OPEN +} + +internal class CircuitBreakerInstance( + private val name: String, + private val config: ServiceCircuitBreakerConfig, + private val timeSource: TimeSource.WithComparableMarks +) { + private val mutex = Mutex() + private var state: CircuitState = CircuitState.CLOSED + private var consecutiveFailures: Int = 0 + private var halfOpenSuccesses: Int = 0 + private var halfOpenAttempts: Int = 0 + private var lastOpenedMark: ComparableTimeMark? = null + + suspend fun acquirePermission() { + mutex.withLock { + when (state) { + CircuitState.CLOSED -> {} + + CircuitState.OPEN -> { + val mark = lastOpenedMark + if (mark != null && mark.elapsedNow() >= config.resetTimeout) { + LOGGER.trace("Circuit breaker '$name': OPEN -> HALF_OPEN (reset timeout elapsed)") + transitionTo(CircuitState.HALF_OPEN) + halfOpenAttempts++ + } else { + throw CircuitBreakerOpenException(name, config.resetTimeout) + } + } + + CircuitState.HALF_OPEN -> { + if (halfOpenAttempts >= config.halfOpenRequests) { + throw CircuitBreakerOpenException(name, config.resetTimeout) + } + halfOpenAttempts++ + } + } + } + } + + suspend fun recordSuccess() { + mutex.withLock { + when (state) { + CircuitState.CLOSED -> consecutiveFailures = 0 + + CircuitState.HALF_OPEN -> { + halfOpenSuccesses++ + if (halfOpenSuccesses >= config.halfOpenRequests) { + LOGGER.trace( + "Circuit breaker '$name': HALF_OPEN -> CLOSED (all trial requests succeeded)" + ) + transitionTo(CircuitState.CLOSED) + } + } + + CircuitState.OPEN -> {} + } + } + } + + suspend fun recordFailure() { + mutex.withLock { + when (state) { + CircuitState.CLOSED -> { + consecutiveFailures++ + LOGGER.trace( + "Circuit breaker '$name': failure $consecutiveFailures/${config.failureThreshold}" + ) + if (consecutiveFailures >= config.failureThreshold) { + LOGGER.trace("Circuit breaker '$name': CLOSED -> OPEN (failure threshold reached)") + transitionTo(CircuitState.OPEN) + } + } + + CircuitState.HALF_OPEN -> { + LOGGER.trace("Circuit breaker '$name': HALF_OPEN -> OPEN (trial request failed)") + transitionTo(CircuitState.OPEN) + } + + CircuitState.OPEN -> {} + } + } + } + + private fun transitionTo(newState: CircuitState) { + state = newState + when (newState) { + CircuitState.CLOSED -> { + consecutiveFailures = 0 + halfOpenSuccesses = 0 + halfOpenAttempts = 0 + lastOpenedMark = null + } + + CircuitState.OPEN -> { + lastOpenedMark = timeSource.markNow() + halfOpenSuccesses = 0 + halfOpenAttempts = 0 + } + + CircuitState.HALF_OPEN -> { + halfOpenSuccesses = 0 + halfOpenAttempts = 0 + } + } + } +} + +/** + * Thrown when a request is rejected because the associated circuit breaker is in the Open state. + * + * @property circuitBreakerName the name of the circuit breaker that rejected the request + * @property resetTimeout the configured duration the circuit stays open before allowing trial requests + */ +public class CircuitBreakerOpenException( + public val circuitBreakerName: String, + public val resetTimeout: Duration +) : IllegalStateException( + "Circuit breaker '$circuitBreakerName' is OPEN. " + + "Requests are rejected until the reset timeout ($resetTimeout) elapses." +) + +private val CircuitBreakerNameKey: AttributeKey = AttributeKey("CircuitBreakerName") + +/** + * Associates this request with the specified named circuit breaker. + * + * The circuit breaker should be registered via [CircuitBreakerConfig.register], + * or it will be created dynamically using the [global][CircuitBreakerConfig.global] defaults. + * + * @param name the circuit breaker name to use for this request + */ +public fun HttpRequestBuilder.circuitBreaker(name: String) { + attributes.put(CircuitBreakerNameKey, name) +} + +/** + * A client plugin implementing the + * [Circuit Breaker](https://learn.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker) + * pattern to prevent cascading failures in distributed systems. + * + * The circuit breaker monitors consecutive failures for each named service. When the failure count + * reaches [ServiceCircuitBreakerConfig.failureThreshold], the circuit trips to the Open state and + * subsequent requests are immediately rejected with [CircuitBreakerOpenException]. After + * [ServiceCircuitBreakerConfig.resetTimeout] elapses, the circuit enters the Half-Open state, + * allowing [ServiceCircuitBreakerConfig.halfOpenRequests] trial requests. If all trial requests + * succeed, the circuit closes and normal operation resumes. If any trial request fails, the circuit + * re-opens. + * + * Usage: + * ```kotlin + * val client = HttpClient { + * install(CircuitBreaker) { + * register("payment-service") { + * failureThreshold = 5 + * resetTimeout = 30.seconds + * halfOpenRequests = 3 + * } + * } + * } + * + * client.get("https://payment.example.com/api/charge") { + * circuitBreaker("payment-service") + * } + * ``` + * + * @see CircuitBreakerConfig + * @see CircuitBreakerOpenException + */ +public val CircuitBreaker: ClientPlugin = createClientPlugin( + "CircuitBreaker", + ::CircuitBreakerConfig +) { + val config = pluginConfig + val timeSource = config.timeSource + + config.circuits.values.forEach { it.validate() } + config.globalConfig.validate() + + val instances = mutableMapOf() + val instancesMutex = Mutex() + + for ((name, serviceConfig) in config.circuits) { + instances[name] = CircuitBreakerInstance(name, serviceConfig, timeSource) + } + + suspend fun getInstance(name: String): CircuitBreakerInstance { + instances[name]?.let { return it } + return instancesMutex.withLock { + instances.getOrPut(name) { + CircuitBreakerInstance(name, config.globalConfig, timeSource) + } + } + } + + fun resolveCircuitName(request: HttpRequestBuilder): String? { + request.attributes.getOrNull(CircuitBreakerNameKey)?.let { return it } + config.requestRouter?.invoke(request)?.let { return it } + return null + } + + on(Send) { request -> + val circuitName = resolveCircuitName(request) ?: return@on proceed(request) + val instance = getInstance(circuitName) + val serviceConfig = config.circuits[circuitName] ?: config.globalConfig + + instance.acquirePermission() + + val call = try { + proceed(request) + } catch (cause: Throwable) { + instance.recordFailure() + throw cause + } + + if (serviceConfig.failurePredicate(call.response)) { + instance.recordFailure() + } else { + instance.recordSuccess() + } + + call + } +} diff --git a/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/test/io/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/test/io/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt new file mode 100644 index 00000000000..4175529fd6b --- /dev/null +++ b/ktor-client/ktor-client-plugins/ktor-client-circuit-breaker/common/test/io/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt @@ -0,0 +1,438 @@ +/* + * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package io.ktor.client.plugins.circuitbreaker + +import io.ktor.client.* +import io.ktor.client.engine.mock.* +import io.ktor.client.request.* +import io.ktor.http.* +import io.ktor.test.dispatcher.* +import kotlin.test.* +import kotlin.time.* +import kotlin.time.Duration.Companion.seconds + +class CircuitBreakerTest { + + @Test + fun `requests pass through a closed circuit`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.OK }) { + register("svc") { + failureThreshold = 5 + resetTimeout = 30.seconds + } + } + + repeat(10) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `circuit opens after reaching failure threshold`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.InternalServerError }) { + register("svc") { + failureThreshold = 3 + resetTimeout = 30.seconds + } + } + + repeat(3) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `success resets consecutive failure count`() = testSuspend { + var status = HttpStatusCode.InternalServerError + val client = createTestClient(respondWith = { status }) { + register("svc") { + failureThreshold = 3 + resetTimeout = 30.seconds + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + status = HttpStatusCode.OK + client.get("http://svc/") { circuitBreaker("svc") } + + status = HttpStatusCode.InternalServerError + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + status = HttpStatusCode.OK + client.get("http://svc/") { circuitBreaker("svc") } + + client.close() + } + + @Test + fun `circuit transitions from open to half-open after reset timeout`() = testSuspend { + val timeSource = TestTimeSource() + var status = HttpStatusCode.InternalServerError + + val client = createTestClient(timeSource = timeSource, respondWith = { status }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 30.seconds + halfOpenRequests = 1 + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + timeSource += 31.seconds + status = HttpStatusCode.OK + + client.get("http://svc/") { circuitBreaker("svc") } + + client.close() + } + + @Test + fun `circuit closes after all half-open requests succeed`() = testSuspend { + val timeSource = TestTimeSource() + var status = HttpStatusCode.InternalServerError + + val client = createTestClient(timeSource = timeSource, respondWith = { status }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 10.seconds + halfOpenRequests = 3 + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + timeSource += 11.seconds + status = HttpStatusCode.OK + + repeat(3) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + repeat(10) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `circuit re-opens when a half-open request fails`() = testSuspend { + val timeSource = TestTimeSource() + var status = HttpStatusCode.InternalServerError + + val client = createTestClient(timeSource = timeSource, respondWith = { status }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 10.seconds + halfOpenRequests = 3 + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + timeSource += 11.seconds + + client.get("http://svc/") { circuitBreaker("svc") } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `requests without circuit breaker tag bypass the plugin`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.InternalServerError }) { + register("svc") { + failureThreshold = 1 + resetTimeout = 30.seconds + } + } + + repeat(10) { + val response = client.get("http://svc/") + assertEquals(HttpStatusCode.InternalServerError, response.status) + } + + client.close() + } + + @Test + fun `separate circuits operate independently`() = testSuspend { + var svcAStatus = HttpStatusCode.InternalServerError + var svcBStatus = HttpStatusCode.OK + + val client = HttpClient(MockEngine) { + engine { + addHandler { request -> + val status = if (request.url.host == "svc-a") svcAStatus else svcBStatus + respond("", status) + } + } + install(CircuitBreaker) { + register("svc-a") { + failureThreshold = 2 + resetTimeout = 30.seconds + } + register("svc-b") { + failureThreshold = 2 + resetTimeout = 30.seconds + } + } + } + + repeat(2) { + client.get("http://svc-a/") { circuitBreaker("svc-a") } + } + + assertFailsWith { + client.get("http://svc-a/") { circuitBreaker("svc-a") } + } + + repeat(5) { + client.get("http://svc-b/") { circuitBreaker("svc-b") } + } + + client.close() + } + + @Test + fun `custom failure predicate determines failures`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.BadRequest }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 30.seconds + isFailure { response -> response.status.value >= 400 } + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `custom request router assigns circuit breakers`() = testSuspend { + var status = HttpStatusCode.InternalServerError + + val client = HttpClient(MockEngine) { + engine { + addHandler { respond("", status) } + } + install(CircuitBreaker) { + routeRequests { request -> request.url.host } + global { + failureThreshold = 2 + resetTimeout = 30.seconds + } + } + } + + repeat(2) { + client.get("http://my-host/") + } + + assertFailsWith { + client.get("http://my-host/") + } + + status = HttpStatusCode.OK + client.get("http://other-host/") + + client.close() + } + + @Test + fun `explicit circuit breaker attribute takes priority over router`() = testSuspend { + val timeSource = TestTimeSource() + var status = HttpStatusCode.InternalServerError + + val client = HttpClient(MockEngine) { + engine { + addHandler { respond("", status) } + } + install(CircuitBreaker) { + this.timeSource = timeSource + routeRequests { request -> request.url.host } + register("explicit") { + failureThreshold = 1 + resetTimeout = 30.seconds + } + global { + failureThreshold = 100 + resetTimeout = 30.seconds + } + } + } + + client.get("http://my-host/") { circuitBreaker("explicit") } + + assertFailsWith { + client.get("http://my-host/") { circuitBreaker("explicit") } + } + + client.get("http://my-host/") + + client.close() + } + + @Test + fun `exception contains circuit breaker name and reset timeout`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.InternalServerError }) { + register("payment-service") { + failureThreshold = 1 + resetTimeout = 45.seconds + } + } + + client.get("http://svc/") { circuitBreaker("payment-service") } + + val exception = assertFailsWith { + client.get("http://svc/") { circuitBreaker("payment-service") } + } + assertEquals("payment-service", exception.circuitBreakerName) + assertEquals(45.seconds, exception.resetTimeout) + assertContains(exception.message!!, "payment-service") + + client.close() + } + + @Test + fun `invalid configuration is rejected`() { + assertFailsWith { + HttpClient(MockEngine) { + engine { addHandler { respond("") } } + install(CircuitBreaker) { + register("svc") { failureThreshold = 0 } + } + } + } + + assertFailsWith { + HttpClient(MockEngine) { + engine { addHandler { respond("") } } + install(CircuitBreaker) { + register("svc") { halfOpenRequests = -1 } + } + } + } + + assertFailsWith { + HttpClient(MockEngine) { + engine { addHandler { respond("") } } + install(CircuitBreaker) { + register("svc") { resetTimeout = Duration.ZERO } + } + } + } + } + + @Test + fun `4xx responses do not trip the circuit by default`() = testSuspend { + val client = createTestClient(respondWith = { HttpStatusCode.BadRequest }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 30.seconds + } + } + + repeat(10) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + @Test + fun `circuit breaker recovers after full cycle`() = testSuspend { + val timeSource = TestTimeSource() + var status = HttpStatusCode.InternalServerError + + val client = createTestClient(timeSource = timeSource, respondWith = { status }) { + register("svc") { + failureThreshold = 2 + resetTimeout = 10.seconds + halfOpenRequests = 2 + } + } + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + timeSource += 11.seconds + status = HttpStatusCode.OK + + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + repeat(5) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + status = HttpStatusCode.InternalServerError + repeat(2) { + client.get("http://svc/") { circuitBreaker("svc") } + } + + assertFailsWith { + client.get("http://svc/") { circuitBreaker("svc") } + } + + client.close() + } + + private fun createTestClient( + timeSource: TimeSource.WithComparableMarks = TimeSource.Monotonic, + respondWith: () -> HttpStatusCode, + configure: CircuitBreakerConfig.() -> Unit + ): HttpClient { + return HttpClient(MockEngine) { + engine { + addHandler { respond("", respondWith()) } + } + install(CircuitBreaker) { + this.timeSource = timeSource + configure() + } + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index fe016796b01..213a64842c1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -145,6 +145,7 @@ projects { +"ktor-client-auth" +"ktor-client-bom-remover" +"ktor-client-call-id" + +"ktor-client-circuit-breaker" +"ktor-client-content-negotiation" including { +"ktor-client-content-negotiation-tests" }