From fcec86c2574c11bd2a046f33d875e3a0a87b92bf Mon Sep 17 00:00:00 2001 From: zibet27 Date: Wed, 4 Mar 2026 11:12:52 +0100 Subject: [PATCH 1/6] Server Netty. Set proper to fix HttpRequestLifecycle --- .../netty/NettyApplicationCallHandler.kt | 158 +++++------------- .../netty/http1/NettyHttp1ApplicationCall.kt | 5 +- .../server/netty/http1/NettyHttp1Handler.kt | 103 ++++++++---- .../netty/http2/NettyHttp2ApplicationCall.kt | 3 +- .../server/netty/http2/NettyHttp2Handler.kt | 67 ++++++-- .../suites/HttpRequestLifecycleTest.kt | 64 ++++++- 6 files changed, 229 insertions(+), 171 deletions(-) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt index 039438112de..65fb0652838 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt @@ -1,139 +1,74 @@ /* -* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. +* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty import io.ktor.http.* -import io.ktor.http.HttpHeaders import io.ktor.server.application.* -import io.ktor.server.engine.* -import io.ktor.server.http.HttpRequestCloseHandlerKey import io.ktor.server.netty.http1.* -import io.ktor.util.pipeline.* import io.ktor.utils.io.* -import io.netty.channel.* -import io.netty.handler.codec.http.* -import io.netty.handler.timeout.* -import kotlinx.coroutines.* -import kotlin.coroutines.* -import kotlin.coroutines.cancellation.* +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.HttpResponseStatus +import io.netty.handler.codec.http.HttpVersion +import kotlinx.coroutines.CoroutineName private const val CHUNKED_VALUE = "chunked" -internal class NettyApplicationCallHandler( - userCoroutineContext: CoroutineContext, - private val enginePipeline: EnginePipeline -) : ChannelInboundHandlerAdapter(), CoroutineScope { - private var currentJob: Job? = null - private var currentCall: PipelineCall? = null - - override val coroutineContext: CoroutineContext = userCoroutineContext - - override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { - when (msg) { - is PipelineCall -> handleRequest(ctx, msg) - else -> ctx.fireChannelRead(msg) - } - } - - internal fun onConnectionClose(context: ChannelHandlerContext) { - if (context.channel().isActive) { - return - } - currentCall?.let { - currentCall = null - @OptIn(InternalAPI::class) - it.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() - } - } - - private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) { - val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) - - currentCall = call - currentJob = launch(callContext, start = CoroutineStart.UNDISPATCHED) { - when { - call is NettyHttp1ApplicationCall && !call.request.isValid() -> { - respondError400BadRequest(call) - } - - else -> - try { - enginePipeline.execute(call) - } catch (error: Throwable) { - handleFailure(call, error) - } - } - } - } - - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - when (cause) { - is ReadTimeoutException -> { - currentJob?.let { - respond408RequestTimeout(ctx) - it.cancel(CancellationException(cause)) - } ?: ctx.fireExceptionCaught(cause) - } - - else -> ctx.fireExceptionCaught(cause) - } - } +/** + * Contains shared constants and helper functions for Netty call handling. + */ +internal object NettyApplicationCallHandler { + internal val CallHandlerCoroutineName = CoroutineName("call-handler") +} - override fun channelInactive(ctx: ChannelHandlerContext) { - onConnectionClose(ctx) - ctx.fireChannelInactive() +internal fun NettyHttp1ApplicationRequest.isValid(): Boolean { + if (httpRequest.decoderResult().isFailure) { + return false } - private fun respond408RequestTimeout(ctx: ChannelHandlerContext) { - val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT) - response.headers().add(HttpHeaders.ContentLength, "0") - response.headers().add(HttpHeaders.Connection, "close") - ctx.writeAndFlush(response) - ctx.close() - } + if (!headers.contains(HttpHeaders.TransferEncoding)) return true - private suspend fun respondError400BadRequest(call: NettyHttp1ApplicationCall) { - logCause(call) + val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true + return encodings.hasValidTransferEncoding() +} - val causeMessage = call.failureCause?.message?.toByteArray(charset = Charsets.UTF_8) - val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty - val contentLength = causeMessage?.size ?: 0 +internal fun ChannelHandlerContext.respond408RequestTimeoutHttp1() { + val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT) + response.headers().add(HttpHeaders.ContentLength, "0") + response.headers().add(HttpHeaders.Connection, "close") + writeAndFlush(response) + close() +} - call.response.status(HttpStatusCode.BadRequest) - call.response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false) - if (contentLength > 0) { - call.response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false) - } - call.response.headers.append(HttpHeaders.Connection, "close", safeOnly = false) - call.response.sendResponse(chunked = false, content) - call.finish() - } +internal suspend fun NettyHttp1ApplicationCall.respondError400BadRequest() { + logCause() - private fun logCause(call: NettyHttp1ApplicationCall) { - if (call.application.log.isTraceEnabled) { - val cause = call.failureCause ?: return - call.application.log.trace("Failed to decode request", cause) - } - } + val causeMessage = failureCause?.message?.toByteArray(charset = Charsets.UTF_8) + val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty + val contentLength = causeMessage?.size ?: 0 - companion object { - internal val CallHandlerCoroutineName = CoroutineName("call-handler") + response.status(HttpStatusCode.BadRequest) + response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false) + if (contentLength > 0) { + response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false) } + response.headers.append(HttpHeaders.Connection, "close", safeOnly = false) + response.sendResponse(chunked = false, content) + finish() } -internal fun NettyHttp1ApplicationRequest.isValid(): Boolean { - if (httpRequest.decoderResult().isFailure) { - return false +private fun NettyHttp1ApplicationCall.logCause() { + if (application.log.isTraceEnabled) { + val cause = failureCause ?: return + application.log.trace("Failed to decode request", cause) } - - if (!headers.contains(HttpHeaders.TransferEncoding)) return true - - val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true - return encodings.hasValidTransferEncoding() } +private val NettyHttp1ApplicationCall.failureCause: Throwable? + get() = httpRequest.decoderResult()?.cause() + internal fun List.hasValidTransferEncoding(): Boolean { forEachIndexed { headerIndex, header -> val chunkedStart = header.indexOf(CHUNKED_VALUE) @@ -162,6 +97,3 @@ internal fun List.hasValidTransferEncoding(): Boolean { } private fun Char.isSeparator(): Boolean = (this == ' ' || this == ',') - -private val NettyHttp1ApplicationCall.failureCause: Throwable? - get() = request.httpRequest.decoderResult()?.cause() diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt index eda6cacdb58..aaee7fd66f0 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt @@ -16,13 +16,14 @@ import kotlin.coroutines.* internal class NettyHttp1ApplicationCall( application: Application, context: ChannelHandlerContext, - httpRequest: HttpRequest, + val httpRequest: HttpRequest, requestBodyChannel: ByteReadChannel?, engineContext: CoroutineContext, userContext: CoroutineContext, ) : NettyApplicationCall(application, context, httpRequest), CoroutineScope { - override val coroutineContext: CoroutineContext = userContext + override var coroutineContext: CoroutineContext = userContext + internal set override val request = NettyHttp1ApplicationRequest( this, diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index cfdf3919c45..8cf5866c46a 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -1,21 +1,26 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http1 import io.ktor.server.application.* import io.ktor.server.engine.* +import io.ktor.server.http.* import io.ktor.server.netty.* +import io.ktor.server.netty.NettyApplicationCallHandler.CallHandlerCoroutineName import io.ktor.server.netty.cio.* +import io.ktor.util.pipeline.* import io.ktor.utils.io.* -import io.netty.channel.* +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http.* -import io.netty.handler.timeout.* -import io.netty.util.concurrent.* +import io.netty.handler.timeout.ReadTimeoutException +import io.netty.util.concurrent.EventExecutorGroup import kotlinx.coroutines.* -import java.io.* -import kotlin.coroutines.* +import java.io.IOException +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.cancellation.CancellationException internal class NettyHttp1Handler( private val applicationProvider: () -> Application, @@ -25,35 +30,32 @@ internal class NettyHttp1Handler( private val engineContext: CoroutineContext, private val userContext: CoroutineContext, private val runningLimit: Int -) : ChannelInboundHandlerAdapter(), CoroutineScope { +) : ChannelInboundHandlerAdapter() { private val handlerJob = CompletableDeferred() - override val coroutineContext: CoroutineContext get() = handlerJob - private var skipEmpty = false private lateinit var responseWriter: NettyHttpResponsePipeline private val state = NettyHttpHandlerState(runningLimit) + @Volatile + private var currentJob: Job? = null + + @Volatile + private var currentCall: NettyHttp1ApplicationCall? = null + override fun channelActive(context: ChannelHandlerContext) { responseWriter = NettyHttpResponsePipeline( - context, - state, - coroutineContext + context = context, + httpHandlerState = state, + coroutineContext = handlerJob ) context.channel().config().isAutoRead = false context.channel().read() context.pipeline().apply { addLast(RequestBodyHandler(context)) - addLast( - callEventGroup, - NettyApplicationCallHandler( - applicationProvider().coroutineContext + userContext, - enginePipeline - ) - ) } context.fireChannelActive() } @@ -63,8 +65,8 @@ internal class NettyHttp1Handler( state.isCurrentRequestFullyRead.compareAndSet(expect = false, update = true) } - when { - message is HttpRequest -> { + when (message) { + is HttpRequest -> { if (message !is LastHttpContent) { state.isCurrentRequestFullyRead.compareAndSet(expect = true, update = false) } @@ -75,7 +77,7 @@ internal class NettyHttp1Handler( callReadIfNeeded(context) } - message is LastHttpContent && !message.content().isReadable && skipEmpty -> { + is LastHttpContent if !message.content().isReadable && skipEmpty -> { skipEmpty = false message.release() callReadIfNeeded(context) @@ -88,11 +90,21 @@ internal class NettyHttp1Handler( } override fun channelInactive(context: ChannelHandlerContext) { - val handler = context.pipeline().remove(NettyApplicationCallHandler::class.java) - handler?.onConnectionClose(context) + onConnectionClose(context) context.fireChannelInactive() } + private fun onConnectionClose(context: ChannelHandlerContext) { + if (context.channel().isActive) { + return + } + currentCall?.let { call -> + currentCall = null + @OptIn(InternalAPI::class) + call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() + } + } + @Suppress("OverridingDeprecatedMember") override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) { when (cause) { @@ -103,7 +115,10 @@ internal class NettyHttp1Handler( } is ReadTimeoutException -> { - context.fireExceptionCaught(cause) + currentJob?.let { + context.respond408RequestTimeoutHttp1() + it.cancel(CancellationException(cause)) + } } else -> { @@ -122,8 +137,32 @@ internal class NettyHttp1Handler( private fun handleRequest(context: ChannelHandlerContext, message: HttpRequest) { val call = prepareCallFromRequest(context, message) + // Fire channel read for custom handlers added to the pipeline context.fireChannelRead(call) + + // Reserve response slot synchronously on the I / O thread for proper ordering + // call.coroutineContext should be updated with the proper value later responseWriter.processResponse(call) + + callEventGroup.execute { + val userScope = CoroutineScope(applicationProvider().coroutineContext + userContext) + val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) + + currentCall = call + currentJob = userScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) callJob@{ + call.coroutineContext = this@callJob.coroutineContext + + if (!call.request.isValid()) { + call.respondError400BadRequest() + return@callJob + } + try { + enginePipeline.execute(call) + } catch (error: Throwable) { + handleFailure(call, error) + } + } + } } /** @@ -145,15 +184,13 @@ internal class NettyHttp1Handler( else -> prepareRequestContentChannel(context, message) } - - val application = applicationProvider() return NettyHttp1ApplicationCall( - application, - context, - message, - requestBodyChannel, - engineContext, - application.coroutineContext + userContext + application = applicationProvider(), + context = context, + httpRequest = message, + requestBodyChannel = requestBodyChannel, + engineContext = engineContext, + userContext = userContext // initial context ) } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt index 2eeebf6eb42..63a06ba7261 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt @@ -20,7 +20,8 @@ internal class NettyHttp2ApplicationCall( userContext: CoroutineContext ) : NettyApplicationCall(application, context, headers) { - override val coroutineContext: CoroutineContext = userContext + override var coroutineContext: CoroutineContext = userContext + internal set override val request = NettyHttp2ApplicationRequest(this, engineContext, context, headers) override val response = NettyHttp2ApplicationResponse(this, handler, context, engineContext, userContext) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt index c645828cac2..0db9bb44dc4 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http2 @@ -7,9 +7,13 @@ package io.ktor.server.netty.http2 import io.ktor.http.* import io.ktor.server.application.* import io.ktor.server.engine.* +import io.ktor.server.http.* import io.ktor.server.netty.* +import io.ktor.server.netty.NettyApplicationCallHandler.CallHandlerCoroutineName import io.ktor.server.netty.cio.* import io.ktor.server.response.* +import io.ktor.util.pipeline.execute +import io.ktor.utils.io.InternalAPI import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter @@ -28,15 +32,12 @@ internal class NettyHttp2Handler( private val callEventGroup: EventExecutorGroup, private val userCoroutineContext: CoroutineContext, runningLimit: Int -) : ChannelInboundHandlerAdapter(), CoroutineScope { +) : ChannelInboundHandlerAdapter() { private val handlerJob = SupervisorJob(userCoroutineContext[Job]) private val state = NettyHttpHandlerState(runningLimit) private lateinit var responseWriter: NettyHttpResponsePipeline - override val coroutineContext: CoroutineContext - get() = handlerJob - override fun channelRead(context: ChannelHandlerContext, message: Any) { when (message) { is Http2HeadersFrame -> { @@ -61,6 +62,8 @@ internal class NettyHttp2Handler( val e = if (message.errorCode() == 0L) null else Http2ClosedChannelException(message.errorCode()) r.contentActor.close(e) } + // Handle connection close to the stream + onStreamClose(context) } else -> context.fireChannelRead(message) } @@ -68,17 +71,27 @@ internal class NettyHttp2Handler( override fun channelActive(context: ChannelHandlerContext) { responseWriter = NettyHttpResponsePipeline( - context, - state, - coroutineContext + context = context, + httpHandlerState = state, + coroutineContext = handlerJob ) - context.pipeline()?.apply { - addLast(callEventGroup, NettyApplicationCallHandler(userCoroutineContext, enginePipeline)) - } context.fireChannelActive() } + override fun channelInactive(context: ChannelHandlerContext) { + onStreamClose(context) + context.fireChannelInactive() + } + + private fun onStreamClose(context: ChannelHandlerContext) { + context.applicationCall?.let { call -> + context.applicationCall = null + @OptIn(InternalAPI::class) + call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() + } + } + override fun channelReadComplete(context: ChannelHandlerContext) { state.isChannelReadCompleted.compareAndSet(expect = false, update = true) responseWriter.flushIfNeeded() @@ -92,17 +105,31 @@ internal class NettyHttp2Handler( private fun startHttp2(context: ChannelHandlerContext, headers: Http2Headers) { val call = NettyHttp2ApplicationCall( - application, - context, - headers, - this, - handlerJob + Dispatchers.Unconfined, - userCoroutineContext + application = application, + context = context, + headers = headers, + handler = this@NettyHttp2Handler, + engineContext = handlerJob + Dispatchers.Unconfined, + userContext = userCoroutineContext // initial context ) context.applicationCall = call - context.fireChannelRead(call) + // Reserve response slot synchronously on the I / O thread for proper ordering + // call.coroutineContext should be updated with the proper value later responseWriter.processResponse(call) + + callEventGroup.execute { + val userScope = CoroutineScope(userCoroutineContext) + val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) + userScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) callJob@{ + call.coroutineContext = this@callJob.coroutineContext + try { + enginePipeline.execute(call) + } catch (error: Throwable) { + handleFailure(call, error) + } + } + } } @UseHttp2Push @@ -213,6 +240,10 @@ internal class NettyHttp2Handler( } } + fun cancel() { + handlerJob.cancel() + } + companion object { private val ApplicationCallKey = AttributeKey.valueOf("ktor.ApplicationCall") diff --git a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt index 3d0de3b9de8..480f5d71ebe 100644 --- a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt +++ b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt @@ -1,13 +1,15 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.testing.suites import io.ktor.client.statement.* import io.ktor.http.* +import io.ktor.server.application.install import io.ktor.server.engine.* import io.ktor.server.http.* +import io.ktor.server.plugins.calllogging.CallLogging import io.ktor.server.response.* import io.ktor.server.routing.* import io.ktor.server.test.base.* @@ -38,16 +40,17 @@ abstract class HttpRequestLifecycleTest Unit) -> Unit + ) = runTest { val requestStartedCnt = AtomicInt(0) val requestCancelledCnt = AtomicInt(0) val requestStarted = Channel(Channel.UNLIMITED) val requestCancelled = Channel(Channel.UNLIMITED) - cancellableRoute { + startServerWithRoute { requestStarted.send(requestStartedCnt.incrementAndFetch()) try { // very long operation @@ -100,6 +103,13 @@ abstract class HttpRequestLifecycleTest + cancellableRoute(configureRoute) + } + } + @Test fun testHttpRequestLifecycleSuccess() = runTest { val requestCompleted = CompletableDeferred() @@ -150,4 +160,50 @@ abstract class HttpRequestLifecycleTest + val server = createServer { + install(HttpRequestLifecycle) { + cancelCallOnClose = true + } + install(CallLogging) { + mdc("something") { "something else" } + } + routing { + get { configureRoute() } + } + } + startServer(server) + } + } } From bfc1c3ec779e36fb1634753a0e6bd4363166dbd5 Mon Sep 17 00:00:00 2001 From: zibet27 Date: Thu, 12 Mar 2026 18:22:12 +0100 Subject: [PATCH 2/6] use proper call context without mutation --- .../netty/http1/NettyHttp1ApplicationCall.kt | 9 ++-- .../http1/NettyHttp1ApplicationResponse.kt | 3 +- .../server/netty/http1/NettyHttp1Handler.kt | 46 ++++++++++--------- .../netty/http2/NettyHttp2ApplicationCall.kt | 9 ++-- .../server/netty/http2/NettyHttp2Handler.kt | 23 +++++----- 5 files changed, 42 insertions(+), 48 deletions(-) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt index aaee7fd66f0..889a14d140b 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http1 @@ -19,12 +19,9 @@ internal class NettyHttp1ApplicationCall( val httpRequest: HttpRequest, requestBodyChannel: ByteReadChannel?, engineContext: CoroutineContext, - userContext: CoroutineContext, + override val coroutineContext: CoroutineContext, ) : NettyApplicationCall(application, context, httpRequest), CoroutineScope { - override var coroutineContext: CoroutineContext = userContext - internal set - override val request = NettyHttp1ApplicationRequest( this, engineContext, @@ -37,7 +34,7 @@ internal class NettyHttp1ApplicationCall( this, context, engineContext, - userContext, + coroutineContext, httpRequest.protocolVersion() ) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt index 7ba2951fafd..1bf81539e43 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt @@ -79,7 +79,6 @@ internal class NettyHttp1ApplicationResponse( override suspend fun respondUpgrade(upgrade: OutgoingContent.ProtocolUpgrade) { val nettyContext = context val nettyChannel = nettyContext.channel() - val userAppContext = userContext + NettyDispatcher.CurrentContext(nettyContext) val bodyHandler = nettyContext.pipeline().get(RequestBodyHandler::class.java) val upgradedReadChannel = bodyHandler.upgrade() @@ -104,7 +103,7 @@ internal class NettyHttp1ApplicationResponse( upgradedReadChannel, upgradedWriteChannel, engineContext, - userAppContext + userContext ) job.invokeOnCompletion { diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index 8cf5866c46a..bc6e6ad409f 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -39,9 +39,6 @@ internal class NettyHttp1Handler( private val state = NettyHttpHandlerState(runningLimit) - @Volatile - private var currentJob: Job? = null - @Volatile private var currentCall: NettyHttp1ApplicationCall? = null @@ -115,10 +112,13 @@ internal class NettyHttp1Handler( } is ReadTimeoutException -> { - currentJob?.let { - context.respond408RequestTimeoutHttp1() - it.cancel(CancellationException(cause)) + val callContext = currentCall?.coroutineContext + if (callContext == null) { + context.fireExceptionCaught(cause) + return } + context.respond408RequestTimeoutHttp1() + callContext.cancel(CancellationException(cause)) } else -> { @@ -135,31 +135,32 @@ internal class NettyHttp1Handler( } private fun handleRequest(context: ChannelHandlerContext, message: HttpRequest) { - val call = prepareCallFromRequest(context, message) + val userAppContext = applicationProvider().coroutineContext + userContext + val callJob = Job(parent = userAppContext[Job]) + + val callContext = userAppContext + CallHandlerCoroutineName + callJob + val call = prepareCallFromRequest(context, message, callContext = callContext) + currentCall = call // Fire channel read for custom handlers added to the pipeline context.fireChannelRead(call) - // Reserve response slot synchronously on the I / O thread for proper ordering - // call.coroutineContext should be updated with the proper value later + // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) callEventGroup.execute { - val userScope = CoroutineScope(applicationProvider().coroutineContext + userContext) - val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) - - currentCall = call - currentJob = userScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) callJob@{ - call.coroutineContext = this@callJob.coroutineContext - - if (!call.request.isValid()) { - call.respondError400BadRequest() - return@callJob - } + val callScope = CoroutineScope(context = callContext) + callScope.launch(start = CoroutineStart.UNDISPATCHED) { try { + if (!call.request.isValid()) { + call.respondError400BadRequest() + return@launch + } enginePipeline.execute(call) } catch (error: Throwable) { handleFailure(call, error) + } finally { + callJob.complete() } } } @@ -171,7 +172,8 @@ internal class NettyHttp1Handler( */ private fun prepareCallFromRequest( context: ChannelHandlerContext, - message: HttpRequest + message: HttpRequest, + callContext: CoroutineContext ): NettyHttp1ApplicationCall { val requestBodyChannel = when { message is LastHttpContent && !message.content().isReadable -> null @@ -190,7 +192,7 @@ internal class NettyHttp1Handler( httpRequest = message, requestBodyChannel = requestBodyChannel, engineContext = engineContext, - userContext = userContext // initial context + coroutineContext = callContext ) } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt index 63a06ba7261..33668db4e7c 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http2 @@ -17,14 +17,11 @@ internal class NettyHttp2ApplicationCall( val headers: Http2Headers, handler: NettyHttp2Handler, engineContext: CoroutineContext, - userContext: CoroutineContext + override val coroutineContext: CoroutineContext ) : NettyApplicationCall(application, context, headers) { - override var coroutineContext: CoroutineContext = userContext - internal set - override val request = NettyHttp2ApplicationRequest(this, engineContext, context, headers) - override val response = NettyHttp2ApplicationResponse(this, handler, context, engineContext, userContext) + override val response = NettyHttp2ApplicationResponse(this, handler, context, engineContext, coroutineContext) init { putResponseAttribute() diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt index 0db9bb44dc4..1c9a7a6e755 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt @@ -12,8 +12,8 @@ import io.ktor.server.netty.* import io.ktor.server.netty.NettyApplicationCallHandler.CallHandlerCoroutineName import io.ktor.server.netty.cio.* import io.ktor.server.response.* -import io.ktor.util.pipeline.execute -import io.ktor.utils.io.InternalAPI +import io.ktor.util.pipeline.* +import io.ktor.utils.io.* import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter @@ -62,8 +62,6 @@ internal class NettyHttp2Handler( val e = if (message.errorCode() == 0L) null else Http2ClosedChannelException(message.errorCode()) r.contentActor.close(e) } - // Handle connection close to the stream - onStreamClose(context) } else -> context.fireChannelRead(message) } @@ -104,29 +102,30 @@ internal class NettyHttp2Handler( } private fun startHttp2(context: ChannelHandlerContext, headers: Http2Headers) { + val callJob = Job(parent = userCoroutineContext[Job]) + val callContext = userCoroutineContext + CallHandlerCoroutineName + callJob val call = NettyHttp2ApplicationCall( application = application, context = context, headers = headers, handler = this@NettyHttp2Handler, engineContext = handlerJob + Dispatchers.Unconfined, - userContext = userCoroutineContext // initial context + coroutineContext = callContext ) context.applicationCall = call - // Reserve response slot synchronously on the I / O thread for proper ordering - // call.coroutineContext should be updated with the proper value later + // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) callEventGroup.execute { - val userScope = CoroutineScope(userCoroutineContext) - val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) - userScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) callJob@{ - call.coroutineContext = this@callJob.coroutineContext + val callScope = CoroutineScope(context = callContext) + callScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) { try { enginePipeline.execute(call) } catch (error: Throwable) { handleFailure(call, error) + } finally { + callJob.complete() } } } @@ -240,7 +239,7 @@ internal class NettyHttp2Handler( } } - fun cancel() { + internal fun cancel() { handlerJob.cancel() } From 2208798e38ec2a878ae03ead8fc1820e59f554c8 Mon Sep 17 00:00:00 2001 From: zibet27 Date: Fri, 13 Mar 2026 11:14:18 +0100 Subject: [PATCH 3/6] fix issue with http1 pipelining --- .../ktor/tests/server/cio/CIOEngineTestJvm.kt | 3 + .../server/netty/http1/NettyHttp1Handler.kt | 18 ++--- .../suites/HttpRequestLifecycleTest.kt | 69 +++++++++++++++++++ 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt b/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt index 4f0219ef7aa..9a447121446 100644 --- a/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt +++ b/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt @@ -95,4 +95,7 @@ class CIOHttpRequestLifecycleTest : enableSsl = false enableHttp2 = false } + + @Ignore + override fun testPipelinedRequestsCancelledOnDisconnect() {} } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index bc6e6ad409f..6b903175b3b 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -19,6 +19,7 @@ import io.netty.handler.timeout.ReadTimeoutException import io.netty.util.concurrent.EventExecutorGroup import kotlinx.coroutines.* import java.io.IOException +import java.util.concurrent.ConcurrentLinkedQueue import kotlin.coroutines.CoroutineContext import kotlin.coroutines.cancellation.CancellationException @@ -39,8 +40,7 @@ internal class NettyHttp1Handler( private val state = NettyHttpHandlerState(runningLimit) - @Volatile - private var currentCall: NettyHttp1ApplicationCall? = null + private val activeCalls = ConcurrentLinkedQueue() override fun channelActive(context: ChannelHandlerContext) { responseWriter = NettyHttpResponsePipeline( @@ -95,8 +95,8 @@ internal class NettyHttp1Handler( if (context.channel().isActive) { return } - currentCall?.let { call -> - currentCall = null + while (true) { + val call = activeCalls.poll() ?: break @OptIn(InternalAPI::class) call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() } @@ -112,13 +112,14 @@ internal class NettyHttp1Handler( } is ReadTimeoutException -> { - val callContext = currentCall?.coroutineContext - if (callContext == null) { + if (activeCalls.isEmpty()) { context.fireExceptionCaught(cause) return } context.respond408RequestTimeoutHttp1() - callContext.cancel(CancellationException(cause)) + activeCalls.forEach { call -> + call.coroutineContext.cancel(CancellationException(cause)) + } } else -> { @@ -140,7 +141,7 @@ internal class NettyHttp1Handler( val callContext = userAppContext + CallHandlerCoroutineName + callJob val call = prepareCallFromRequest(context, message, callContext = callContext) - currentCall = call + activeCalls.add(call) // Fire channel read for custom handlers added to the pipeline context.fireChannelRead(call) @@ -160,6 +161,7 @@ internal class NettyHttp1Handler( } catch (error: Throwable) { handleFailure(call, error) } finally { + activeCalls.remove(call) callJob.complete() } } diff --git a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt index 480f5d71ebe..c3ca3f6b353 100644 --- a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt +++ b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt @@ -18,6 +18,9 @@ import io.ktor.utils.io.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.Channel +import java.io.OutputStreamWriter +import java.net.InetSocketAddress +import java.net.Socket import kotlin.concurrent.atomics.AtomicInt import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.concurrent.atomics.incrementAndFetch @@ -206,4 +209,70 @@ abstract class HttpRequestLifecycleTest(pipelinedCount) + val cancelledCount = AtomicInt(0) + val allCancelled = CompletableDeferred() + + val server = createServer { + install(HttpRequestLifecycle) { + cancelCallOnClose = true + } + routing { + get("/slow") { + allStarted.send(Unit) + try { + repeat(100) { + delay(200.milliseconds) + } + call.respondText("Done") + } catch (e: CancellationException) { + val count = cancelledCount.incrementAndFetch() + if (count == pipelinedCount) { + allCancelled.complete(Unit) + } + throw e + } + } + } + } + startServer(server) + + // Use a raw socket to send pipelined HTTP/1.1 requests + val socket = Socket() + socket.tcpNoDelay = true + socket.connect(InetSocketAddress("127.0.0.1", port)) + + try { + val writer = OutputStreamWriter(socket.getOutputStream(), Charsets.US_ASCII) + repeat(pipelinedCount) { + writer.write("GET /slow HTTP/1.1\r\n") + writer.write("Host: localhost:$port\r\n") + writer.write("Connection: keep-alive\r\n") + writer.write("\r\n") + } + writer.flush() + + // Wait for all requests to start processing on the server + withTimeout(10.seconds) { + repeat(pipelinedCount) { + allStarted.receive() + } + } + } finally { + // Abruptly close the connection + socket.setSoLinger(true, 0) + socket.close() + } + + // Verify that ALL pipelined requests were cancelled, not just the last one + withTimeout(10.seconds) { + allCancelled.await() + } + assertEquals(pipelinedCount, cancelledCount.load()) + } } From baa3ba950d8cab5f9caf17c23e9e21002b23fe35 Mon Sep 17 00:00:00 2001 From: zibet27 Date: Mon, 16 Mar 2026 15:04:03 +0100 Subject: [PATCH 4/6] execute netty call using netty dispatcher --- .../src/io/ktor/server/netty/NettyChannelInitializer.kt | 9 ++------- .../src/io/ktor/server/netty/http1/NettyHttp1Handler.kt | 6 ++---- .../src/io/ktor/server/netty/http2/NettyHttp2Handler.kt | 9 ++++----- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt index 17a60c8276a..a0e4abe5fdf 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt @@ -34,7 +34,6 @@ import io.netty.handler.timeout.ReadTimeoutException import io.netty.handler.timeout.ReadTimeoutHandler import io.netty.handler.timeout.WriteTimeoutHandler import io.netty.util.concurrent.EventExecutorGroup -import kotlinx.coroutines.cancel import java.io.FileInputStream import java.nio.channels.ClosedChannelException import java.security.KeyStore @@ -186,7 +185,6 @@ public class NettyChannelInitializer( val handler = NettyHttp2Handler( enginePipeline, application, - callEventGroup, application.coroutineContext + userContext, runningLimit ) @@ -202,7 +200,6 @@ public class NettyChannelInitializer( val handler = NettyHttp2Handler( enginePipeline, applicationProvider(), - callEventGroup, userContext, runningLimit ) @@ -232,7 +229,6 @@ public class NettyChannelInitializer( applicationProvider, enginePipeline, environment, - callEventGroup, engineContext, userContext, runningLimit @@ -266,7 +262,6 @@ public class NettyChannelInitializer( applicationProvider, enginePipeline, environment, - callEventGroup, engineContext, userContext, runningLimit @@ -330,14 +325,14 @@ public class NettyChannelInitializer( if (SslProvider.isAlpnSupported(SslProvider.OPENSSL)) { return SslProvider.OPENSSL } - } catch (ignore: Throwable) { + } catch (_: Throwable) { } try { if (SslProvider.isAlpnSupported(SslProvider.JDK)) { return SslProvider.JDK } - } catch (ignore: Throwable) { + } catch (_: Throwable) { } return null diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index 6b903175b3b..4412b02a212 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -16,7 +16,6 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http.* import io.netty.handler.timeout.ReadTimeoutException -import io.netty.util.concurrent.EventExecutorGroup import kotlinx.coroutines.* import java.io.IOException import java.util.concurrent.ConcurrentLinkedQueue @@ -27,7 +26,6 @@ internal class NettyHttp1Handler( private val applicationProvider: () -> Application, private val enginePipeline: EnginePipeline, private val environment: ApplicationEnvironment, - private val callEventGroup: EventExecutorGroup, private val engineContext: CoroutineContext, private val userContext: CoroutineContext, private val runningLimit: Int @@ -139,7 +137,7 @@ internal class NettyHttp1Handler( val userAppContext = applicationProvider().coroutineContext + userContext val callJob = Job(parent = userAppContext[Job]) - val callContext = userAppContext + CallHandlerCoroutineName + callJob + val callContext = userAppContext + NettyDispatcher.CurrentContext(context) + callJob + CallHandlerCoroutineName val call = prepareCallFromRequest(context, message, callContext = callContext) activeCalls.add(call) @@ -149,7 +147,7 @@ internal class NettyHttp1Handler( // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) - callEventGroup.execute { + context.executor().execute { val callScope = CoroutineScope(context = callContext) callScope.launch(start = CoroutineStart.UNDISPATCHED) { try { diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt index 1c9a7a6e755..2382ab61c65 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt @@ -19,7 +19,6 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http2.* import io.netty.util.AttributeKey -import io.netty.util.concurrent.EventExecutorGroup import kotlinx.coroutines.* import java.lang.reflect.Field import java.nio.channels.ClosedChannelException @@ -29,7 +28,6 @@ import kotlin.coroutines.CoroutineContext internal class NettyHttp2Handler( private val enginePipeline: EnginePipeline, private val application: Application, - private val callEventGroup: EventExecutorGroup, private val userCoroutineContext: CoroutineContext, runningLimit: Int ) : ChannelInboundHandlerAdapter() { @@ -103,7 +101,8 @@ internal class NettyHttp2Handler( private fun startHttp2(context: ChannelHandlerContext, headers: Http2Headers) { val callJob = Job(parent = userCoroutineContext[Job]) - val callContext = userCoroutineContext + CallHandlerCoroutineName + callJob + val callContext = + userCoroutineContext + NettyDispatcher.CurrentContext(context) + callJob + CallHandlerCoroutineName val call = NettyHttp2ApplicationCall( application = application, context = context, @@ -117,9 +116,9 @@ internal class NettyHttp2Handler( // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) - callEventGroup.execute { + context.executor().execute { val callScope = CoroutineScope(context = callContext) - callScope.launch(callContext, start = CoroutineStart.UNDISPATCHED) { + callScope.launch(start = CoroutineStart.UNDISPATCHED) { try { enginePipeline.execute(call) } catch (error: Throwable) { From f9020dbea185a33ec289adc9b0c11092bf6b43d0 Mon Sep 17 00:00:00 2001 From: zibet27 Date: Mon, 16 Mar 2026 17:24:48 +0100 Subject: [PATCH 5/6] fix CIO call context already cancelled --- .../common/src/io/ktor/server/cio/backend/ServerPipeline.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt b/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt index 9dea49de3ad..6dc8f6f5826 100644 --- a/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt +++ b/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt @@ -185,9 +185,10 @@ public fun CoroutineScope.startServerConnectionPipeline( if (isLastHttpRequest(version, connectionOptions)) break } } catch (_: IOException) { - // already handled - coroutineContext.cancel() + // Connection error - also triggers onClose below } finally { + // Invoke onClose to allow HttpRequestLifecycle plugin to cancel + // the call coroutine with proper ConnectionClosedException cause. handlerScope?.onClose?.invoke() actorChannel.close() } From 92959afd5a8461addf4cf2e2129714fb93a745fc Mon Sep 17 00:00:00 2001 From: zibet27 Date: Tue, 17 Mar 2026 11:55:09 +0100 Subject: [PATCH 6/6] explain why context.executor() is needed --- .../jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt | 4 ++++ .../jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt | 3 +++ 2 files changed, 7 insertions(+) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index 4412b02a212..cdbc2994828 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -147,6 +147,10 @@ internal class NettyHttp1Handler( // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) + // Defer coroutine start to the next event loop tick so that channelReadComplete() fires first + // This allows the response pipeline to detect that the request body is still being received and flush headers + // early instead of buffering them, which is required when the client waits for response headers + // before sending the request body context.executor().execute { val callScope = CoroutineScope(context = callContext) callScope.launch(start = CoroutineStart.UNDISPATCHED) { diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt index 2382ab61c65..b438db74623 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt @@ -116,6 +116,9 @@ internal class NettyHttp2Handler( // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) + // Defer coroutine start to the next event loop tick via context.executor().execute so that + // channelRead returns and Netty can deliver subsequent Http2DataFrame messages. + // Without this, the coroutine runs on the event loop, blocking data frame delivery and causing EOFException. context.executor().execute { val callScope = CoroutineScope(context = callContext) callScope.launch(start = CoroutineStart.UNDISPATCHED) {