KTOR-9398 Client Core. Cancel streaming on exception #5437
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughHttpStatement execution paths now capture exceptions and pass an optional Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
PS: Should we keep binary compatibility for the |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ktor-client/ktor-client-tests/jvm/src/io/ktor/client/tests/HttpClientTest.kt (1)
210-232: Test covers exception propagation well; consider adding cancellation verification.The test correctly validates that exceptions thrown inside the
executeblock are properly propagated. ThewithTimeout(2000)ensures the test won't hang if cancellation fails.However, the test name suggests it verifies "ImmediatelyCancels" but only validates exception propagation. To fully verify immediate cancellation, you could add an assertion that the
withTimeoutdidn't trigger (i.e., the exception was thrown before the 2-second timeout), or verify server-side that the connection was closed promptly.💡 Optional: Add timing assertion to verify immediate cancellation
`@Test` fun testStreamingResponseExceptionImmediatelyCancels() = runBlocking { val client = HttpClient(factory) { install(HttpTimeout) { requestTimeoutMillis = HttpTimeoutConfig.INFINITE_TIMEOUT_MS socketTimeoutMillis = HttpTimeoutConfig.INFINITE_TIMEOUT_MS connectTimeoutMillis = HttpTimeoutConfig.INFINITE_TIMEOUT_MS } } + val startTime = System.currentTimeMillis() val exception = assertThrows<IllegalStateException> { withTimeout(2000) { client.prepareGet("http://localhost:$serverPort/sse/delay/60000").execute { response -> // Headers are received immediately assertEquals(HttpStatusCode.OK, response.status) // Throw an exception while waiting for the body throw IllegalStateException("Test exception from execute block") } } } assertEquals("Test exception from execute block", exception.message) + // Verify cancellation happened promptly, not after timeout + assertTrue(System.currentTimeMillis() - startTime < 1000, "Exception should propagate immediately") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ktor-client/ktor-client-tests/jvm/src/io/ktor/client/tests/HttpClientTest.kt` around lines 210 - 232, The test testStreamingResponseExceptionImmediatelyCancels currently only checks exception propagation; add a timing assertion to verify cancellation happened before the withTimeout window by recording the start time before calling withTimeout (wrapping the client.prepareGet(...).execute block) and assert the elapsed time is well under 2000ms (e.g., < 1000ms) after the IllegalStateException is thrown; reference the existing withTimeout, client.prepareGet("http://localhost:$serverPort/sse/delay/60000") and execute block to locate where to insert the start-time capture and elapsed-time assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@ktor-client/ktor-client-tests/jvm/src/io/ktor/client/tests/HttpClientTest.kt`:
- Around line 210-232: The test testStreamingResponseExceptionImmediatelyCancels
currently only checks exception propagation; add a timing assertion to verify
cancellation happened before the withTimeout window by recording the start time
before calling withTimeout (wrapping the client.prepareGet(...).execute block)
and assert the elapsed time is well under 2000ms (e.g., < 1000ms) after the
IllegalStateException is thrown; reference the existing withTimeout,
client.prepareGet("http://localhost:$serverPort/sse/delay/60000") and execute
block to locate where to insert the start-time capture and elapsed-time
assertion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6871b365-8984-42a1-b6f8-109687bd1a4a
📒 Files selected for processing (4)
ktor-client/ktor-client-core/api/ktor-client-core.apiktor-client/ktor-client-core/api/ktor-client-core.klib.apiktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.ktktor-client/ktor-client-tests/jvm/src/io/ktor/client/tests/HttpClientTest.kt
e5l
left a comment
There was a problem hiding this comment.
Code Review
1. body<T, R> overload not updated (Critical)
The execute(block) overload now passes callFailure to cleanup(), but the body<T, R>(block) overload at HttpStatement.kt lines 138–150 still calls response.cleanup() without a cause:
public suspend inline fun <reified T, R> body(
crossinline block: suspend (response: T) -> R
): R = unwrapRequestTimeoutException {
val response: HttpResponse = fetchStreamingResponse()
try {
...
} finally {
response.cleanup() // <-- no cause passed
}
}This overload has the exact same hanging problem this PR fixes for execute. If a user calls statement.body<ByteReadChannel> { channel -> throw IllegalStateException() }, they'll still hang.
2. join() safety after cancel() is engine-dependent
In cleanup(cause), after cancel(CancellationException(...)) the code calls join(). Whether cancel() on the CompletableJob propagates cancellation to the underlying network channel depends on the engine implementation. If rawContent.cancel() in the if (!isSaved) guard is the only thing that actually interrupts pending I/O, and the job cancellation doesn't automatically propagate to the channel, join() could still hang for engines where the channel doesn't observe job cancellation.
Consider cancelling rawContent unconditionally (or at least before join()) on the exception path to guarantee the I/O is interrupted regardless of engine.
3. CancellationException message could be more actionable
Per project guidelines ("Make error messages actionable; include the problematic value/context"), the message "Request failed" in:
cancel(CancellationException("Request failed", cause))could include more context (e.g., the request URL or the cause's message) to aid debugging when inspecting the job's cancellation cause from a parent coroutine.
4. Test coverage
The new test is only in the JVM engine-specific test file. Consider adding a common-level test in HttpStatementTest.kt to verify the cancellation logic across all engines. The body<T, R> overload fix (issue #1) would also need its own test.
Overall the approach is correct — cancelling vs completing the job on exception is the right fix. The main gap is the body<T, R> overload which has the identical bug but wasn't updated.
Yes, since it's marked with |
30800db to
a063a62
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@ktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt`:
- Around line 31-41: The run() suspend function can call cont.resume/... after
the continuation is already completed, causing IllegalStateException; fix
TestEngineWebsocketSession.run by registering cont.invokeOnCancellation to set a
cancellation flag (e.g., var cancelled = false; cont.invokeOnCancellation {
cancelled = true }) and then in the outgoing.invokeOnClose callback guard the
resume/resumeWithException calls with if (!cancelled) { ... } so you only resume
when the continuation hasn't been externally completed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 297c6af5-a336-45ea-8cdd-56bbf2783d04
📒 Files selected for processing (5)
ktor-client/ktor-client-core/api/ktor-client-core.apiktor-client/ktor-client-core/api/ktor-client-core.klib.apiktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.ktktor-client/ktor-client-tests/common/test/io/ktor/client/tests/HttpStatementTest.ktktor-server/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt
🚧 Files skipped from review as they are similar to previous changes (1)
- ktor-client/ktor-client-core/api/ktor-client-core.klib.api
...er/ktor-server-test-host/jvm/src/io/ktor/server/testing/client/TestEngineWebsocketSession.kt
Show resolved
Hide resolved
| suspendCancellableCoroutine { cont -> | ||
| outgoing.invokeOnClose { ex -> | ||
| if (ex == null) { | ||
| cont.resume(Unit) |
| @PublishedApi | ||
| @OptIn(InternalAPI::class) | ||
| internal suspend fun HttpResponse.cleanup() { | ||
| internal suspend fun HttpResponse.cleanup(cause: Throwable? = null) { |
There was a problem hiding this comment.
Just commenting here because I opened the ticket on YouTrack and I'm interested in the details of the fix:
Wouldn't it be better to remove the default null value of this cause param, and pass in null explicitly in the call in fetchResponse(), such as not to add a cleanup$default method to the published JVM API?
There was a problem hiding this comment.
Makes sense
Thank you for the comment!
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt (1)
229-235: Preserve existing cancellation exceptions instead of always wrapping.When
causeis already aCancellationException, pass it through unchanged to keep cancellation diagnostics cleaner. Kotlin coroutines documentation explicitly recommends not wrapping existing cancellation exceptions, as this preserves the original cancellation context and avoids masking the root cause.♻️ Proposed tweak
internal suspend fun HttpResponse.cleanup(cause: Throwable?) { val job = coroutineContext.job as CompletableJob job.apply { if (cause != null) { - cancel(CancellationException("Exception occurred during request execution", cause)) + val cancellation = cause as? CancellationException + ?: CancellationException("Exception occurred during request execution", cause) + cancel(cancellation) } else { complete() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt` around lines 229 - 235, In HttpResponse.cleanup, when cancelling the job obtained via coroutineContext.job as CompletableJob, detect if the incoming cause is already a CancellationException and pass it through unchanged (call cancel(cause)) instead of always wrapping it in a new CancellationException; otherwise continue wrapping with CancellationException("Exception occurred during request execution", cause). This change should be made in the internal suspend fun HttpResponse.cleanup(cause: Throwable?) where job.apply currently calls cancel(...).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@ktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt`:
- Around line 229-235: In HttpResponse.cleanup, when cancelling the job obtained
via coroutineContext.job as CompletableJob, detect if the incoming cause is
already a CancellationException and pass it through unchanged (call
cancel(cause)) instead of always wrapping it in a new CancellationException;
otherwise continue wrapping with CancellationException("Exception occurred
during request execution", cause). This change should be made in the internal
suspend fun HttpResponse.cleanup(cause: Throwable?) where job.apply currently
calls cancel(...).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ccef1b24-3fc6-4358-b6a1-cde1ef4245f5
📒 Files selected for processing (3)
ktor-client/ktor-client-core/api/ktor-client-core.apiktor-client/ktor-client-core/api/ktor-client-core.klib.apiktor-client/ktor-client-core/common/src/io/ktor/client/statement/HttpStatement.kt
🚧 Files skipped from review as they are similar to previous changes (2)
- ktor-client/ktor-client-core/api/ktor-client-core.klib.api
- ktor-client/ktor-client-core/api/ktor-client-core.api
gervaisj
left a comment
There was a problem hiding this comment.
I think it's good to go, but I have been thinking about I slight extension to the issue, if your workflow permits adding to the scope a bit.
Specifically, what if, instead of throwing an exception, body or execute simply completes? (like in the snippet below)
client.prepareGet("...").body { channel: ByteReadChannel ->
while (true) {
// imagine a line is sent by the server only every minute
val line = channel.readLine()
if (line == "text I'm interested in")
return@body
}
}Then, wouldn't it be nice to have the same behavior as when throwing an exception, i.e. have the streaming call cancel immediately, without waiting for another frame to arrive?
I feel this would match the already documented behavior of execute:
The response object should not be accessed outside of [block] as it will be canceled upon block completion.
A similar line is also present on body. Note that "block completion" here makes no distinction between exceptional and normal completion.
So I've tried to see if cleanup could always cancel the call in this commit: gervaisj@76a525c (which is on this branch: gervaisj/client-always-cancel-streaming).
The only issue I ran across were the tests in ServerSentEventsTest failing, but this was because HttpClient.processSession (which is used by all SSE functions) did not respect body's contract of not using the response after completion of the block. That should be fixed in the commit referenced above, along with 2 new test cases, which are basically the same as those added by @zibet27, but for normal completion of block instead of throwing an exception.
Agree here, afaiu it's intended behaviour and "works" on some engines. I tried to run your tests on this branch, and it succeeded on OkHttp, Apache and Android, but not on CIO and Java @e5l @bjhham?
Looks good to me! |
Without cancelling the call unconditionally (like is done in my commit), then yes those tests won't work because we fall in the
Sure! |
@zibet27 Sorry, maybe there was a misunderstanding, were you waiting for an action on my part? Is there something missing for this PR? |
@gervaisj No, I'm waiting for approvals |
osipxd
left a comment
There was a problem hiding this comment.
Looks good! I think we should include this fix into minor release not to introduce new API in a patch release.
| job.apply { | ||
| complete() | ||
| if (cause != null) { | ||
| cancel(CancellationException("Exception occurred during request execution", cause)) |
There was a problem hiding this comment.
There is a chance the cause is already CancellationException. Probably we shouldn't wrap it with another CancellationException in this case.
Please add a test checking this behavior if you decide to fix it.
There was a problem hiding this comment.
HttpResponse.cleanup is not tested directly anywhere
Should we?
| public fun <init> (Lio/ktor/client/request/HttpRequestBuilder;Lio/ktor/client/HttpClient;)V | ||
| public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
| public final fun cleanup (Lio/ktor/client/statement/HttpResponse;Ljava/lang/Throwable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
| public final synthetic fun cleanup (Lio/ktor/client/statement/HttpResponse;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; |
There was a problem hiding this comment.
Let's switch the base branch to main
1a0c6be to
4dd9571
Compare
Subsystem
Client Core
Motivation
KTOR-9398 Streaming call not cancelled when exception is thrown from HttpStatement.execute/body
Solution
Response.cleanup