diff --git a/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt b/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt index 9dea49de3ad..6dc8f6f5826 100644 --- a/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt +++ b/ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/backend/ServerPipeline.kt @@ -185,9 +185,10 @@ public fun CoroutineScope.startServerConnectionPipeline( if (isLastHttpRequest(version, connectionOptions)) break } } catch (_: IOException) { - // already handled - coroutineContext.cancel() + // Connection error - also triggers onClose below } finally { + // Invoke onClose to allow HttpRequestLifecycle plugin to cancel + // the call coroutine with proper ConnectionClosedException cause. handlerScope?.onClose?.invoke() actorChannel.close() } diff --git a/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt b/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt index 4f0219ef7aa..9a447121446 100644 --- a/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt +++ b/ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt @@ -95,4 +95,7 @@ class CIOHttpRequestLifecycleTest : enableSsl = false enableHttp2 = false } + + @Ignore + override fun testPipelinedRequestsCancelledOnDisconnect() {} } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt index 039438112de..65fb0652838 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt @@ -1,139 +1,74 @@ /* -* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. +* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty import io.ktor.http.* -import io.ktor.http.HttpHeaders import io.ktor.server.application.* -import io.ktor.server.engine.* -import io.ktor.server.http.HttpRequestCloseHandlerKey import io.ktor.server.netty.http1.* -import io.ktor.util.pipeline.* import io.ktor.utils.io.* -import io.netty.channel.* -import io.netty.handler.codec.http.* -import io.netty.handler.timeout.* -import kotlinx.coroutines.* -import kotlin.coroutines.* -import kotlin.coroutines.cancellation.* +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.HttpResponseStatus +import io.netty.handler.codec.http.HttpVersion +import kotlinx.coroutines.CoroutineName private const val CHUNKED_VALUE = "chunked" -internal class NettyApplicationCallHandler( - userCoroutineContext: CoroutineContext, - private val enginePipeline: EnginePipeline -) : ChannelInboundHandlerAdapter(), CoroutineScope { - private var currentJob: Job? = null - private var currentCall: PipelineCall? = null - - override val coroutineContext: CoroutineContext = userCoroutineContext - - override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { - when (msg) { - is PipelineCall -> handleRequest(ctx, msg) - else -> ctx.fireChannelRead(msg) - } - } - - internal fun onConnectionClose(context: ChannelHandlerContext) { - if (context.channel().isActive) { - return - } - currentCall?.let { - currentCall = null - @OptIn(InternalAPI::class) - it.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() - } - } - - private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) { - val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context) - - currentCall = call - currentJob = launch(callContext, start = CoroutineStart.UNDISPATCHED) { - when { - call is NettyHttp1ApplicationCall && !call.request.isValid() -> { - respondError400BadRequest(call) - } - - else -> - try { - enginePipeline.execute(call) - } catch (error: Throwable) { - handleFailure(call, error) - } - } - } - } - - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - when (cause) { - is ReadTimeoutException -> { - currentJob?.let { - respond408RequestTimeout(ctx) - it.cancel(CancellationException(cause)) - } ?: ctx.fireExceptionCaught(cause) - } - - else -> ctx.fireExceptionCaught(cause) - } - } +/** + * Contains shared constants and helper functions for Netty call handling. + */ +internal object NettyApplicationCallHandler { + internal val CallHandlerCoroutineName = CoroutineName("call-handler") +} - override fun channelInactive(ctx: ChannelHandlerContext) { - onConnectionClose(ctx) - ctx.fireChannelInactive() +internal fun NettyHttp1ApplicationRequest.isValid(): Boolean { + if (httpRequest.decoderResult().isFailure) { + return false } - private fun respond408RequestTimeout(ctx: ChannelHandlerContext) { - val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT) - response.headers().add(HttpHeaders.ContentLength, "0") - response.headers().add(HttpHeaders.Connection, "close") - ctx.writeAndFlush(response) - ctx.close() - } + if (!headers.contains(HttpHeaders.TransferEncoding)) return true - private suspend fun respondError400BadRequest(call: NettyHttp1ApplicationCall) { - logCause(call) + val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true + return encodings.hasValidTransferEncoding() +} - val causeMessage = call.failureCause?.message?.toByteArray(charset = Charsets.UTF_8) - val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty - val contentLength = causeMessage?.size ?: 0 +internal fun ChannelHandlerContext.respond408RequestTimeoutHttp1() { + val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT) + response.headers().add(HttpHeaders.ContentLength, "0") + response.headers().add(HttpHeaders.Connection, "close") + writeAndFlush(response) + close() +} - call.response.status(HttpStatusCode.BadRequest) - call.response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false) - if (contentLength > 0) { - call.response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false) - } - call.response.headers.append(HttpHeaders.Connection, "close", safeOnly = false) - call.response.sendResponse(chunked = false, content) - call.finish() - } +internal suspend fun NettyHttp1ApplicationCall.respondError400BadRequest() { + logCause() - private fun logCause(call: NettyHttp1ApplicationCall) { - if (call.application.log.isTraceEnabled) { - val cause = call.failureCause ?: return - call.application.log.trace("Failed to decode request", cause) - } - } + val causeMessage = failureCause?.message?.toByteArray(charset = Charsets.UTF_8) + val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty + val contentLength = causeMessage?.size ?: 0 - companion object { - internal val CallHandlerCoroutineName = CoroutineName("call-handler") + response.status(HttpStatusCode.BadRequest) + response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false) + if (contentLength > 0) { + response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false) } + response.headers.append(HttpHeaders.Connection, "close", safeOnly = false) + response.sendResponse(chunked = false, content) + finish() } -internal fun NettyHttp1ApplicationRequest.isValid(): Boolean { - if (httpRequest.decoderResult().isFailure) { - return false +private fun NettyHttp1ApplicationCall.logCause() { + if (application.log.isTraceEnabled) { + val cause = failureCause ?: return + application.log.trace("Failed to decode request", cause) } - - if (!headers.contains(HttpHeaders.TransferEncoding)) return true - - val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true - return encodings.hasValidTransferEncoding() } +private val NettyHttp1ApplicationCall.failureCause: Throwable? + get() = httpRequest.decoderResult()?.cause() + internal fun List.hasValidTransferEncoding(): Boolean { forEachIndexed { headerIndex, header -> val chunkedStart = header.indexOf(CHUNKED_VALUE) @@ -162,6 +97,3 @@ internal fun List.hasValidTransferEncoding(): Boolean { } private fun Char.isSeparator(): Boolean = (this == ' ' || this == ',') - -private val NettyHttp1ApplicationCall.failureCause: Throwable? - get() = request.httpRequest.decoderResult()?.cause() diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt index 17a60c8276a..a0e4abe5fdf 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyChannelInitializer.kt @@ -34,7 +34,6 @@ import io.netty.handler.timeout.ReadTimeoutException import io.netty.handler.timeout.ReadTimeoutHandler import io.netty.handler.timeout.WriteTimeoutHandler import io.netty.util.concurrent.EventExecutorGroup -import kotlinx.coroutines.cancel import java.io.FileInputStream import java.nio.channels.ClosedChannelException import java.security.KeyStore @@ -186,7 +185,6 @@ public class NettyChannelInitializer( val handler = NettyHttp2Handler( enginePipeline, application, - callEventGroup, application.coroutineContext + userContext, runningLimit ) @@ -202,7 +200,6 @@ public class NettyChannelInitializer( val handler = NettyHttp2Handler( enginePipeline, applicationProvider(), - callEventGroup, userContext, runningLimit ) @@ -232,7 +229,6 @@ public class NettyChannelInitializer( applicationProvider, enginePipeline, environment, - callEventGroup, engineContext, userContext, runningLimit @@ -266,7 +262,6 @@ public class NettyChannelInitializer( applicationProvider, enginePipeline, environment, - callEventGroup, engineContext, userContext, runningLimit @@ -330,14 +325,14 @@ public class NettyChannelInitializer( if (SslProvider.isAlpnSupported(SslProvider.OPENSSL)) { return SslProvider.OPENSSL } - } catch (ignore: Throwable) { + } catch (_: Throwable) { } try { if (SslProvider.isAlpnSupported(SslProvider.JDK)) { return SslProvider.JDK } - } catch (ignore: Throwable) { + } catch (_: Throwable) { } return null diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt index eda6cacdb58..889a14d140b 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationCall.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http1 @@ -16,14 +16,12 @@ import kotlin.coroutines.* internal class NettyHttp1ApplicationCall( application: Application, context: ChannelHandlerContext, - httpRequest: HttpRequest, + val httpRequest: HttpRequest, requestBodyChannel: ByteReadChannel?, engineContext: CoroutineContext, - userContext: CoroutineContext, + override val coroutineContext: CoroutineContext, ) : NettyApplicationCall(application, context, httpRequest), CoroutineScope { - override val coroutineContext: CoroutineContext = userContext - override val request = NettyHttp1ApplicationRequest( this, engineContext, @@ -36,7 +34,7 @@ internal class NettyHttp1ApplicationCall( this, context, engineContext, - userContext, + coroutineContext, httpRequest.protocolVersion() ) diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt index 7ba2951fafd..1bf81539e43 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt @@ -79,7 +79,6 @@ internal class NettyHttp1ApplicationResponse( override suspend fun respondUpgrade(upgrade: OutgoingContent.ProtocolUpgrade) { val nettyContext = context val nettyChannel = nettyContext.channel() - val userAppContext = userContext + NettyDispatcher.CurrentContext(nettyContext) val bodyHandler = nettyContext.pipeline().get(RequestBodyHandler::class.java) val upgradedReadChannel = bodyHandler.upgrade() @@ -104,7 +103,7 @@ internal class NettyHttp1ApplicationResponse( upgradedReadChannel, upgradedWriteChannel, engineContext, - userAppContext + userContext ) job.invokeOnCompletion { diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt index cfdf3919c45..cdbc2994828 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt @@ -1,59 +1,56 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http1 import io.ktor.server.application.* import io.ktor.server.engine.* +import io.ktor.server.http.* import io.ktor.server.netty.* +import io.ktor.server.netty.NettyApplicationCallHandler.CallHandlerCoroutineName import io.ktor.server.netty.cio.* +import io.ktor.util.pipeline.* import io.ktor.utils.io.* -import io.netty.channel.* +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http.* -import io.netty.handler.timeout.* -import io.netty.util.concurrent.* +import io.netty.handler.timeout.ReadTimeoutException import kotlinx.coroutines.* -import java.io.* -import kotlin.coroutines.* +import java.io.IOException +import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.cancellation.CancellationException internal class NettyHttp1Handler( private val applicationProvider: () -> Application, private val enginePipeline: EnginePipeline, private val environment: ApplicationEnvironment, - private val callEventGroup: EventExecutorGroup, private val engineContext: CoroutineContext, private val userContext: CoroutineContext, private val runningLimit: Int -) : ChannelInboundHandlerAdapter(), CoroutineScope { +) : ChannelInboundHandlerAdapter() { private val handlerJob = CompletableDeferred() - override val coroutineContext: CoroutineContext get() = handlerJob - private var skipEmpty = false private lateinit var responseWriter: NettyHttpResponsePipeline private val state = NettyHttpHandlerState(runningLimit) + private val activeCalls = ConcurrentLinkedQueue() + override fun channelActive(context: ChannelHandlerContext) { responseWriter = NettyHttpResponsePipeline( - context, - state, - coroutineContext + context = context, + httpHandlerState = state, + coroutineContext = handlerJob ) context.channel().config().isAutoRead = false context.channel().read() context.pipeline().apply { addLast(RequestBodyHandler(context)) - addLast( - callEventGroup, - NettyApplicationCallHandler( - applicationProvider().coroutineContext + userContext, - enginePipeline - ) - ) } context.fireChannelActive() } @@ -63,8 +60,8 @@ internal class NettyHttp1Handler( state.isCurrentRequestFullyRead.compareAndSet(expect = false, update = true) } - when { - message is HttpRequest -> { + when (message) { + is HttpRequest -> { if (message !is LastHttpContent) { state.isCurrentRequestFullyRead.compareAndSet(expect = true, update = false) } @@ -75,7 +72,7 @@ internal class NettyHttp1Handler( callReadIfNeeded(context) } - message is LastHttpContent && !message.content().isReadable && skipEmpty -> { + is LastHttpContent if !message.content().isReadable && skipEmpty -> { skipEmpty = false message.release() callReadIfNeeded(context) @@ -88,11 +85,21 @@ internal class NettyHttp1Handler( } override fun channelInactive(context: ChannelHandlerContext) { - val handler = context.pipeline().remove(NettyApplicationCallHandler::class.java) - handler?.onConnectionClose(context) + onConnectionClose(context) context.fireChannelInactive() } + private fun onConnectionClose(context: ChannelHandlerContext) { + if (context.channel().isActive) { + return + } + while (true) { + val call = activeCalls.poll() ?: break + @OptIn(InternalAPI::class) + call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() + } + } + @Suppress("OverridingDeprecatedMember") override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) { when (cause) { @@ -103,7 +110,14 @@ internal class NettyHttp1Handler( } is ReadTimeoutException -> { - context.fireExceptionCaught(cause) + if (activeCalls.isEmpty()) { + context.fireExceptionCaught(cause) + return + } + context.respond408RequestTimeoutHttp1() + activeCalls.forEach { call -> + call.coroutineContext.cancel(CancellationException(cause)) + } } else -> { @@ -120,10 +134,40 @@ internal class NettyHttp1Handler( } private fun handleRequest(context: ChannelHandlerContext, message: HttpRequest) { - val call = prepareCallFromRequest(context, message) + val userAppContext = applicationProvider().coroutineContext + userContext + val callJob = Job(parent = userAppContext[Job]) + val callContext = userAppContext + NettyDispatcher.CurrentContext(context) + callJob + CallHandlerCoroutineName + val call = prepareCallFromRequest(context, message, callContext = callContext) + activeCalls.add(call) + + // Fire channel read for custom handlers added to the pipeline context.fireChannelRead(call) + + // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) + + // Defer coroutine start to the next event loop tick so that channelReadComplete() fires first + // This allows the response pipeline to detect that the request body is still being received and flush headers + // early instead of buffering them, which is required when the client waits for response headers + // before sending the request body + context.executor().execute { + val callScope = CoroutineScope(context = callContext) + callScope.launch(start = CoroutineStart.UNDISPATCHED) { + try { + if (!call.request.isValid()) { + call.respondError400BadRequest() + return@launch + } + enginePipeline.execute(call) + } catch (error: Throwable) { + handleFailure(call, error) + } finally { + activeCalls.remove(call) + callJob.complete() + } + } + } } /** @@ -132,7 +176,8 @@ internal class NettyHttp1Handler( */ private fun prepareCallFromRequest( context: ChannelHandlerContext, - message: HttpRequest + message: HttpRequest, + callContext: CoroutineContext ): NettyHttp1ApplicationCall { val requestBodyChannel = when { message is LastHttpContent && !message.content().isReadable -> null @@ -145,15 +190,13 @@ internal class NettyHttp1Handler( else -> prepareRequestContentChannel(context, message) } - - val application = applicationProvider() return NettyHttp1ApplicationCall( - application, - context, - message, - requestBodyChannel, - engineContext, - application.coroutineContext + userContext + application = applicationProvider(), + context = context, + httpRequest = message, + requestBodyChannel = requestBodyChannel, + engineContext = engineContext, + coroutineContext = callContext ) } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt index 2eeebf6eb42..33668db4e7c 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2ApplicationCall.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http2 @@ -17,13 +17,11 @@ internal class NettyHttp2ApplicationCall( val headers: Http2Headers, handler: NettyHttp2Handler, engineContext: CoroutineContext, - userContext: CoroutineContext + override val coroutineContext: CoroutineContext ) : NettyApplicationCall(application, context, headers) { - override val coroutineContext: CoroutineContext = userContext - override val request = NettyHttp2ApplicationRequest(this, engineContext, context, headers) - override val response = NettyHttp2ApplicationResponse(this, handler, context, engineContext, userContext) + override val response = NettyHttp2ApplicationResponse(this, handler, context, engineContext, coroutineContext) init { putResponseAttribute() diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt index c645828cac2..b438db74623 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.netty.http2 @@ -7,15 +7,18 @@ package io.ktor.server.netty.http2 import io.ktor.http.* import io.ktor.server.application.* import io.ktor.server.engine.* +import io.ktor.server.http.* import io.ktor.server.netty.* +import io.ktor.server.netty.NettyApplicationCallHandler.CallHandlerCoroutineName import io.ktor.server.netty.cio.* import io.ktor.server.response.* +import io.ktor.util.pipeline.* +import io.ktor.utils.io.* import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http2.* import io.netty.util.AttributeKey -import io.netty.util.concurrent.EventExecutorGroup import kotlinx.coroutines.* import java.lang.reflect.Field import java.nio.channels.ClosedChannelException @@ -25,18 +28,14 @@ import kotlin.coroutines.CoroutineContext internal class NettyHttp2Handler( private val enginePipeline: EnginePipeline, private val application: Application, - private val callEventGroup: EventExecutorGroup, private val userCoroutineContext: CoroutineContext, runningLimit: Int -) : ChannelInboundHandlerAdapter(), CoroutineScope { +) : ChannelInboundHandlerAdapter() { private val handlerJob = SupervisorJob(userCoroutineContext[Job]) private val state = NettyHttpHandlerState(runningLimit) private lateinit var responseWriter: NettyHttpResponsePipeline - override val coroutineContext: CoroutineContext - get() = handlerJob - override fun channelRead(context: ChannelHandlerContext, message: Any) { when (message) { is Http2HeadersFrame -> { @@ -68,17 +67,27 @@ internal class NettyHttp2Handler( override fun channelActive(context: ChannelHandlerContext) { responseWriter = NettyHttpResponsePipeline( - context, - state, - coroutineContext + context = context, + httpHandlerState = state, + coroutineContext = handlerJob ) - context.pipeline()?.apply { - addLast(callEventGroup, NettyApplicationCallHandler(userCoroutineContext, enginePipeline)) - } context.fireChannelActive() } + override fun channelInactive(context: ChannelHandlerContext) { + onStreamClose(context) + context.fireChannelInactive() + } + + private fun onStreamClose(context: ChannelHandlerContext) { + context.applicationCall?.let { call -> + context.applicationCall = null + @OptIn(InternalAPI::class) + call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke() + } + } + override fun channelReadComplete(context: ChannelHandlerContext) { state.isChannelReadCompleted.compareAndSet(expect = false, update = true) responseWriter.flushIfNeeded() @@ -91,18 +100,37 @@ internal class NettyHttp2Handler( } private fun startHttp2(context: ChannelHandlerContext, headers: Http2Headers) { + val callJob = Job(parent = userCoroutineContext[Job]) + val callContext = + userCoroutineContext + NettyDispatcher.CurrentContext(context) + callJob + CallHandlerCoroutineName val call = NettyHttp2ApplicationCall( - application, - context, - headers, - this, - handlerJob + Dispatchers.Unconfined, - userCoroutineContext + application = application, + context = context, + headers = headers, + handler = this@NettyHttp2Handler, + engineContext = handlerJob + Dispatchers.Unconfined, + coroutineContext = callContext ) context.applicationCall = call - context.fireChannelRead(call) + // Reserve response slot synchronously on the I/O thread for proper ordering responseWriter.processResponse(call) + + // Defer coroutine start to the next event loop tick via context.executor().execute so that + // channelRead returns and Netty can deliver subsequent Http2DataFrame messages. + // Without this, the coroutine runs on the event loop, blocking data frame delivery and causing EOFException. + context.executor().execute { + val callScope = CoroutineScope(context = callContext) + callScope.launch(start = CoroutineStart.UNDISPATCHED) { + try { + enginePipeline.execute(call) + } catch (error: Throwable) { + handleFailure(call, error) + } finally { + callJob.complete() + } + } + } } @UseHttp2Push @@ -213,6 +241,10 @@ internal class NettyHttp2Handler( } } + internal fun cancel() { + handlerJob.cancel() + } + companion object { private val ApplicationCallKey = AttributeKey.valueOf("ktor.ApplicationCall") diff --git a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt index 3d0de3b9de8..c3ca3f6b353 100644 --- a/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt +++ b/ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/HttpRequestLifecycleTest.kt @@ -1,13 +1,15 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.server.testing.suites import io.ktor.client.statement.* import io.ktor.http.* +import io.ktor.server.application.install import io.ktor.server.engine.* import io.ktor.server.http.* +import io.ktor.server.plugins.calllogging.CallLogging import io.ktor.server.response.* import io.ktor.server.routing.* import io.ktor.server.test.base.* @@ -16,6 +18,9 @@ import io.ktor.utils.io.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.Channel +import java.io.OutputStreamWriter +import java.net.InetSocketAddress +import java.net.Socket import kotlin.concurrent.atomics.AtomicInt import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.concurrent.atomics.incrementAndFetch @@ -38,16 +43,17 @@ abstract class HttpRequestLifecycleTest Unit) -> Unit + ) = runTest { val requestStartedCnt = AtomicInt(0) val requestCancelledCnt = AtomicInt(0) val requestStarted = Channel(Channel.UNLIMITED) val requestCancelled = Channel(Channel.UNLIMITED) - cancellableRoute { + startServerWithRoute { requestStarted.send(requestStartedCnt.incrementAndFetch()) try { // very long operation @@ -100,6 +106,13 @@ abstract class HttpRequestLifecycleTest + cancellableRoute(configureRoute) + } + } + @Test fun testHttpRequestLifecycleSuccess() = runTest { val requestCompleted = CompletableDeferred() @@ -150,4 +163,116 @@ abstract class HttpRequestLifecycleTest + val server = createServer { + install(HttpRequestLifecycle) { + cancelCallOnClose = true + } + install(CallLogging) { + mdc("something") { "something else" } + } + routing { + get { configureRoute() } + } + } + startServer(server) + } + } + + @Test + @OptIn(ExperimentalAtomicApi::class) + open fun testPipelinedRequestsCancelledOnDisconnect() = runTest { + val pipelinedCount = 10 + val allStarted = Channel(pipelinedCount) + val cancelledCount = AtomicInt(0) + val allCancelled = CompletableDeferred() + + val server = createServer { + install(HttpRequestLifecycle) { + cancelCallOnClose = true + } + routing { + get("/slow") { + allStarted.send(Unit) + try { + repeat(100) { + delay(200.milliseconds) + } + call.respondText("Done") + } catch (e: CancellationException) { + val count = cancelledCount.incrementAndFetch() + if (count == pipelinedCount) { + allCancelled.complete(Unit) + } + throw e + } + } + } + } + startServer(server) + + // Use a raw socket to send pipelined HTTP/1.1 requests + val socket = Socket() + socket.tcpNoDelay = true + socket.connect(InetSocketAddress("127.0.0.1", port)) + + try { + val writer = OutputStreamWriter(socket.getOutputStream(), Charsets.US_ASCII) + repeat(pipelinedCount) { + writer.write("GET /slow HTTP/1.1\r\n") + writer.write("Host: localhost:$port\r\n") + writer.write("Connection: keep-alive\r\n") + writer.write("\r\n") + } + writer.flush() + + // Wait for all requests to start processing on the server + withTimeout(10.seconds) { + repeat(pipelinedCount) { + allStarted.receive() + } + } + } finally { + // Abruptly close the connection + socket.setSoLinger(true, 0) + socket.close() + } + + // Verify that ALL pipelined requests were cancelled, not just the last one + withTimeout(10.seconds) { + allCancelled.await() + } + assertEquals(pipelinedCount, cancelledCount.load()) + } }