Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ktor-client/ktor-client-core/api/ktor-client-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,8 @@ public final class io/ktor/client/statement/HttpResponsePipeline$Phases {

public final class io/ktor/client/statement/HttpStatement {
public fun <init> (Lio/ktor/client/request/HttpRequestBuilder;Lio/ktor/client/HttpClient;)V
public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Ljava/lang/Throwable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final synthetic fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's switch the base branch to main

public final fun execute (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun execute (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun fetchResponse (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
1 change: 1 addition & 0 deletions ktor-client/ktor-client-core/api/ktor-client-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ final class io.ktor.client.statement/HttpStatement { // io.ktor.client.statement

final fun toString(): kotlin/String // io.ktor.client.statement/HttpStatement.toString|toString(){}[0]
final suspend fun (io.ktor.client.statement/HttpResponse).cleanup() // io.ktor.client.statement/HttpStatement.cleanup|cleanup@io.ktor.client.statement.HttpResponse(){}[0]
final suspend fun (io.ktor.client.statement/HttpResponse).cleanup(kotlin/Throwable?) // io.ktor.client.statement/HttpStatement.cleanup|cleanup@io.ktor.client.statement.HttpResponse(kotlin.Throwable?){}[0]
final suspend fun <#A1: kotlin/Any?> execute(kotlin.coroutines/SuspendFunction1<io.ktor.client.statement/HttpResponse, #A1>): #A1 // io.ktor.client.statement/HttpStatement.execute|execute(kotlin.coroutines.SuspendFunction1<io.ktor.client.statement.HttpResponse,0:0>){0§<kotlin.Any?>}[0]
final suspend fun execute(): io.ktor.client.statement/HttpResponse // io.ktor.client.statement/HttpStatement.execute|execute(){}[0]
final suspend fun fetchResponse(): io.ktor.client.statement/HttpResponse // io.ktor.client.statement/HttpStatement.fetchResponse|fetchResponse(){}[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.ktor.client.call.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.utils.io.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.job
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -66,6 +67,7 @@ public class HttpStatement(
public suspend fun <T> execute(block: suspend (response: HttpResponse) -> T): T = unwrapRequestTimeoutException {
val response = fetchStreamingResponse()

var callFailure: Throwable? = null
try {
return if (useEngineDispatcher) {
withContext(response.coroutineContext[ContinuationInterceptor]!!) {
Expand All @@ -74,8 +76,11 @@ public class HttpStatement(
} else {
block(response)
}
} catch (cause: Throwable) {
callFailure = cause
throw cause
} finally {
response.cleanup()
response.cleanup(callFailure)
}
}

Expand Down Expand Up @@ -159,6 +164,7 @@ public class HttpStatement(
crossinline block: suspend (response: T) -> R
): R = unwrapRequestTimeoutException {
val response: HttpResponse = fetchStreamingResponse()
var callFailure: Throwable? = null
try {
return if (useEngineDispatcher) {
withContext(response.coroutineContext[ContinuationInterceptor]!!) {
Expand All @@ -169,8 +175,11 @@ public class HttpStatement(
val result = response.body<T>()
block(result)
}
} catch (cause: Throwable) {
callFailure = cause
throw cause
} finally {
response.cleanup()
response.cleanup(callFailure)
}
}

Expand Down Expand Up @@ -199,21 +208,33 @@ public class HttpStatement(
// Save the body again to make sure that it is replayable after pipeline execution
// We need this because wrongly implemented plugins could make response body non-replayable
val result = call.save().response
call.response.cleanup()
call.response.cleanup(cause = null)

return result
}

@PublishedApi
@OptIn(InternalAPI::class)
@Deprecated("Use cleanup(cause) instead", level = DeprecationLevel.HIDDEN)
internal suspend fun HttpResponse.cleanup(): Unit = cleanup(cause = null)

/**
* Completes [HttpResponse] and releases resources.
*
* @param cause If not null, cancels the response job with this cause to immediately interrupt
* any pending network operations.
*/
@PublishedApi
@OptIn(InternalAPI::class)
internal suspend fun HttpResponse.cleanup() {
internal suspend fun HttpResponse.cleanup(cause: Throwable?) {
val job = coroutineContext.job as CompletableJob

job.apply {
complete()
when (cause) {
null -> complete()
is CancellationException -> cancel(cause)
else -> cancel(CancellationException("Exception occurred during request execution", cause))
}
// If the response is saved, the underlying channel is already closed and
// calling `rawContent` would create a new one
if (!isSaved) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.tests
Expand All @@ -10,9 +10,11 @@ import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.client.test.base.*
import io.ktor.client.tests.utils.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.Job
import kotlinx.coroutines.job
import kotlinx.coroutines.withTimeout
import kotlinx.io.readByteArray
import kotlin.test.*

Expand Down Expand Up @@ -71,4 +73,37 @@ class HttpStatementTest : ClientLoader() {
}
}
}

// Darwin/DarwinLegacy: NSURLSession buffers the first 512 bytes before calling didReceiveResponse/didReceiveData,
// so the test times out waiting for enough data to arrive unless the content type is octet/stream or application/json.
// See: https://developer.apple.com/forums/thread/64875
@Test
fun testStreamingResponseExceptionCancelsImmediately() = clientTests {
test { client ->
val exception = assertFailsWith<IllegalStateException> {
withTimeout(2000) {
client.prepareGet("$TEST_SERVER/content/stream?delay=60000").execute {
// Headers are received, throw exception while waiting for the body
throw IllegalStateException("Test exception from execute block")
}
}
}
assertEquals("Test exception from execute block", exception.message)
}
}

@Test
fun testStreamingResponseExceptionInBodyCancelsImmediately() = clientTests {
test { client ->
val exception = assertFailsWith<IllegalStateException> {
withTimeout(2000) {
client.prepareGet("$TEST_SERVER/content/stream?delay=60000").body<ByteReadChannel, Unit> {
// Throw exception while a channel is open
throw IllegalStateException("Test exception from body block")
}
}
}
assertEquals("Test exception from body block", exception.message)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.testing.client
Expand All @@ -14,8 +14,7 @@ internal class TestEngineWebsocketSession(
override val incoming: ReceiveChannel<Frame>,
override val outgoing: SendChannel<Frame>
) : WebSocketSession {
private val socketJob = Job(callContext[Job])
override val coroutineContext: CoroutineContext = callContext + socketJob + CoroutineName("test-ws")
override val coroutineContext: CoroutineContext = callContext + CoroutineName("test-ws")

override var masking: Boolean
get() = true
Expand All @@ -30,14 +29,15 @@ internal class TestEngineWebsocketSession(
override suspend fun flush() {}

suspend fun run() {
outgoing.invokeOnClose {
if (it != null) {
socketJob.completeExceptionally(it)
} else {
socketJob.complete()
suspendCancellableCoroutine { cont ->
outgoing.invokeOnClose { ex ->
when {
!cont.isActive -> return@invokeOnClose
ex == null -> cont.resume(Unit)
else -> cont.resumeWithException(ex)
}
}
}
socketJob.join()
}

@Deprecated(
Expand Down
3 changes: 3 additions & 0 deletions ktor-test-server/src/main/kotlin/test/server/tests/Content.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ internal fun Application.contentTestServer() {
val delay = call.parameters["delay"]?.toLong() ?: 0L
call.respond(
object : OutgoingContent.WriteChannelContent() {
override val contentType: ContentType
get() = ContentType.Application.OctetStream

override suspend fun writeTo(channel: ByteWriteChannel) {
while (true) {
channel.writeInt(42)
Expand Down
Loading