diff --git a/ktor-client/ktor-client-core/api/ktor-client-core.api b/ktor-client/ktor-client-core/api/ktor-client-core.api index 1eb46bcd22d..b0546386904 100644 --- a/ktor-client/ktor-client-core/api/ktor-client-core.api +++ b/ktor-client/ktor-client-core/api/ktor-client-core.api @@ -1622,7 +1622,8 @@ public final class io/ktor/client/statement/HttpResponsePipeline$Phases { public final class io/ktor/client/statement/HttpStatement { public fun (Lio/ktor/client/request/HttpRequestBuilder;Lio/ktor/client/HttpClient;)V - public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Ljava/lang/Throwable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final synthetic fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun execute (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun execute (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun fetchResponse (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/ktor-client/ktor-client-core/api/ktor-client-core.klib.api b/ktor-client/ktor-client-core/api/ktor-client-core.klib.api index 98cee781fb5..feae493a2f8 100644 --- a/ktor-client/ktor-client-core/api/ktor-client-core.klib.api +++ b/ktor-client/ktor-client-core/api/ktor-client-core.klib.api @@ -1175,6 +1175,7 @@ final class io.ktor.client.statement/HttpStatement { // io.ktor.client.statement final fun toString(): kotlin/String // io.ktor.client.statement/HttpStatement.toString|toString(){}[0] final suspend fun (io.ktor.client.statement/HttpResponse).cleanup() // io.ktor.client.statement/HttpStatement.cleanup|cleanup@io.ktor.client.statement.HttpResponse(){}[0] + final suspend fun (io.ktor.client.statement/HttpResponse).cleanup(kotlin/Throwable?) // io.ktor.client.statement/HttpStatement.cleanup|cleanup@io.ktor.client.statement.HttpResponse(kotlin.Throwable?){}[0] final suspend fun <#A1: kotlin/Any?> execute(kotlin.coroutines/SuspendFunction1): #A1 // io.ktor.client.statement/HttpStatement.execute|execute(kotlin.coroutines.SuspendFunction1){0ยง}[0] final suspend fun execute(): io.ktor.client.statement/HttpResponse // io.ktor.client.statement/HttpStatement.execute|execute(){}[0] final suspend fun fetchResponse(): io.ktor.client.statement/HttpResponse // io.ktor.client.statement/HttpStatement.fetchResponse|fetchResponse(){}[0] diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt index 47d44bfdb58..c2c5137ef6c 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt @@ -9,6 +9,7 @@ import io.ktor.client.call.* import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.utils.io.* +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.job import kotlinx.coroutines.withContext @@ -66,6 +67,7 @@ public class HttpStatement( public suspend fun execute(block: suspend (response: HttpResponse) -> T): T = unwrapRequestTimeoutException { val response = fetchStreamingResponse() + var callFailure: Throwable? = null try { return if (useEngineDispatcher) { withContext(response.coroutineContext[ContinuationInterceptor]!!) { @@ -74,8 +76,11 @@ public class HttpStatement( } else { block(response) } + } catch (cause: Throwable) { + callFailure = cause + throw cause } finally { - response.cleanup() + response.cleanup(callFailure) } } @@ -159,6 +164,7 @@ public class HttpStatement( crossinline block: suspend (response: T) -> R ): R = unwrapRequestTimeoutException { val response: HttpResponse = fetchStreamingResponse() + var callFailure: Throwable? = null try { return if (useEngineDispatcher) { withContext(response.coroutineContext[ContinuationInterceptor]!!) { @@ -169,8 +175,11 @@ public class HttpStatement( val result = response.body() block(result) } + } catch (cause: Throwable) { + callFailure = cause + throw cause } finally { - response.cleanup() + response.cleanup(callFailure) } } @@ -199,21 +208,33 @@ public class HttpStatement( // Save the body again to make sure that it is replayable after pipeline execution // We need this because wrongly implemented plugins could make response body non-replayable val result = call.save().response - call.response.cleanup() + call.response.cleanup(cause = null) return result } + @PublishedApi + @OptIn(InternalAPI::class) + @Deprecated("Use cleanup(cause) instead", level = DeprecationLevel.HIDDEN) + internal suspend fun HttpResponse.cleanup(): Unit = cleanup(cause = null) + /** * Completes [HttpResponse] and releases resources. + * + * @param cause If not null, cancels the response job with this cause to immediately interrupt + * any pending network operations. */ @PublishedApi @OptIn(InternalAPI::class) - internal suspend fun HttpResponse.cleanup() { + internal suspend fun HttpResponse.cleanup(cause: Throwable?) { val job = coroutineContext.job as CompletableJob job.apply { - complete() + when (cause) { + null -> complete() + is CancellationException -> cancel(cause) + else -> cancel(CancellationException("Exception occurred during request execution", cause)) + } // If the response is saved, the underlying channel is already closed and // calling `rawContent` would create a new one if (!isSaved) { diff --git a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/HttpStatementTest.kt b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/HttpStatementTest.kt index 44e10b5448d..88d60348a13 100644 --- a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/HttpStatementTest.kt +++ b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/HttpStatementTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.client.tests @@ -10,9 +10,11 @@ import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.client.test.base.* import io.ktor.client.tests.utils.* +import io.ktor.utils.io.* import io.ktor.utils.io.core.* import kotlinx.coroutines.Job import kotlinx.coroutines.job +import kotlinx.coroutines.withTimeout import kotlinx.io.readByteArray import kotlin.test.* @@ -71,4 +73,37 @@ class HttpStatementTest : ClientLoader() { } } } + + // Darwin/DarwinLegacy: NSURLSession buffers the first 512 bytes before calling didReceiveResponse/didReceiveData, + // so the test times out waiting for enough data to arrive unless the content type is octet/stream or application/json. + // See: https://developer.apple.com/forums/thread/64875 + @Test + fun testStreamingResponseExceptionCancelsImmediately() = clientTests { + test { client -> + val exception = assertFailsWith { + withTimeout(2000) { + client.prepareGet("$TEST_SERVER/content/stream?delay=60000").execute { + // Headers are received, throw exception while waiting for the body + throw IllegalStateException("Test exception from execute block") + } + } + } + assertEquals("Test exception from execute block", exception.message) + } + } + + @Test + fun testStreamingResponseExceptionInBodyCancelsImmediately() = clientTests { + test { client -> + val exception = assertFailsWith { + withTimeout(2000) { + client.prepareGet("$TEST_SERVER/content/stream?delay=60000").body { + // Throw exception while a channel is open + throw IllegalStateException("Test exception from body block") + } + } + } + assertEquals("Test exception from body block", exception.message) + } + } } diff --git a/ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt b/ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt index 29ef02c0d07..a4a6aa981a6 100644 --- a/ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt +++ b/ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.testing.client @@ -14,8 +14,7 @@ internal class TestEngineWebsocketSession( override val incoming: ReceiveChannel, override val outgoing: SendChannel ) : WebSocketSession { - private val socketJob = Job(callContext[Job]) - override val coroutineContext: CoroutineContext = callContext + socketJob + CoroutineName("test-ws") + override val coroutineContext: CoroutineContext = callContext + CoroutineName("test-ws") override var masking: Boolean get() = true @@ -30,14 +29,15 @@ internal class TestEngineWebsocketSession( override suspend fun flush() {} suspend fun run() { - outgoing.invokeOnClose { - if (it != null) { - socketJob.completeExceptionally(it) - } else { - socketJob.complete() + suspendCancellableCoroutine { cont -> + outgoing.invokeOnClose { ex -> + when { + !cont.isActive -> return@invokeOnClose + ex == null -> cont.resume(Unit) + else -> cont.resumeWithException(ex) + } } } - socketJob.join() } @Deprecated( diff --git a/ktor-test-server/src/main/kotlin/test/server/tests/Content.kt b/ktor-test-server/src/main/kotlin/test/server/tests/Content.kt index 944cfc92063..c9eb4e7230b 100644 --- a/ktor-test-server/src/main/kotlin/test/server/tests/Content.kt +++ b/ktor-test-server/src/main/kotlin/test/server/tests/Content.kt @@ -110,6 +110,9 @@ internal fun Application.contentTestServer() { val delay = call.parameters["delay"]?.toLong() ?: 0L call.respond( object : OutgoingContent.WriteChannelContent() { + override val contentType: ContentType + get() = ContentType.Application.OctetStream + override suspend fun writeTo(channel: ByteWriteChannel) { while (true) { channel.writeInt(42)