Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ public fun CoroutineScope.startServerConnectionPipeline(
if (isLastHttpRequest(version, connectionOptions)) break
}
} catch (_: IOException) {
// already handled
coroutineContext.cancel()
// Connection error - also triggers onClose below
} finally {
// Invoke onClose to allow HttpRequestLifecycle plugin to cancel
// the call coroutine with proper ConnectionClosedException cause.
handlerScope?.onClose?.invoke()
actorChannel.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ class CIOHttpRequestLifecycleTest :
enableSsl = false
enableHttp2 = false
}

@Ignore
override fun testPipelinedRequestsCancelledOnDisconnect() {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could log a new issue to handle this behaviour in CIO if it falls outside the scope here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik, CIO doesn't support pipelining, does it?

}
Original file line number Diff line number Diff line change
@@ -1,139 +1,74 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.netty

import io.ktor.http.*
import io.ktor.http.HttpHeaders
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.http.HttpRequestCloseHandlerKey
import io.ktor.server.netty.http1.*
import io.ktor.util.pipeline.*
import io.ktor.utils.io.*
import io.netty.channel.*
import io.netty.handler.codec.http.*
import io.netty.handler.timeout.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.coroutines.cancellation.*
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import kotlinx.coroutines.CoroutineName

private const val CHUNKED_VALUE = "chunked"

internal class NettyApplicationCallHandler(
userCoroutineContext: CoroutineContext,
private val enginePipeline: EnginePipeline
) : ChannelInboundHandlerAdapter(), CoroutineScope {
private var currentJob: Job? = null
private var currentCall: PipelineCall? = null

override val coroutineContext: CoroutineContext = userCoroutineContext

override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when (msg) {
is PipelineCall -> handleRequest(ctx, msg)
else -> ctx.fireChannelRead(msg)
}
}

internal fun onConnectionClose(context: ChannelHandlerContext) {
if (context.channel().isActive) {
return
}
currentCall?.let {
currentCall = null
@OptIn(InternalAPI::class)
it.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke()
}
}

private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) {
val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context)

currentCall = call
currentJob = launch(callContext, start = CoroutineStart.UNDISPATCHED) {
when {
call is NettyHttp1ApplicationCall && !call.request.isValid() -> {
respondError400BadRequest(call)
}

else ->
try {
enginePipeline.execute(call)
} catch (error: Throwable) {
handleFailure(call, error)
}
}
}
}

override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
when (cause) {
is ReadTimeoutException -> {
currentJob?.let {
respond408RequestTimeout(ctx)
it.cancel(CancellationException(cause))
} ?: ctx.fireExceptionCaught(cause)
}

else -> ctx.fireExceptionCaught(cause)
}
}
/**
* Contains shared constants and helper functions for Netty call handling.
*/
internal object NettyApplicationCallHandler {
internal val CallHandlerCoroutineName = CoroutineName("call-handler")
}

override fun channelInactive(ctx: ChannelHandlerContext) {
onConnectionClose(ctx)
ctx.fireChannelInactive()
internal fun NettyHttp1ApplicationRequest.isValid(): Boolean {
if (httpRequest.decoderResult().isFailure) {
return false
}

private fun respond408RequestTimeout(ctx: ChannelHandlerContext) {
val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT)
response.headers().add(HttpHeaders.ContentLength, "0")
response.headers().add(HttpHeaders.Connection, "close")
ctx.writeAndFlush(response)
ctx.close()
}
if (!headers.contains(HttpHeaders.TransferEncoding)) return true

private suspend fun respondError400BadRequest(call: NettyHttp1ApplicationCall) {
logCause(call)
val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true
return encodings.hasValidTransferEncoding()
}

val causeMessage = call.failureCause?.message?.toByteArray(charset = Charsets.UTF_8)
val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty
val contentLength = causeMessage?.size ?: 0
internal fun ChannelHandlerContext.respond408RequestTimeoutHttp1() {
val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_TIMEOUT)
response.headers().add(HttpHeaders.ContentLength, "0")
response.headers().add(HttpHeaders.Connection, "close")
writeAndFlush(response)
close()
}

call.response.status(HttpStatusCode.BadRequest)
call.response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false)
if (contentLength > 0) {
call.response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false)
}
call.response.headers.append(HttpHeaders.Connection, "close", safeOnly = false)
call.response.sendResponse(chunked = false, content)
call.finish()
}
internal suspend fun NettyHttp1ApplicationCall.respondError400BadRequest() {
logCause()

private fun logCause(call: NettyHttp1ApplicationCall) {
if (call.application.log.isTraceEnabled) {
val cause = call.failureCause ?: return
call.application.log.trace("Failed to decode request", cause)
}
}
val causeMessage = failureCause?.message?.toByteArray(charset = Charsets.UTF_8)
val content = if (causeMessage != null) ByteReadChannel(causeMessage) else ByteReadChannel.Empty
val contentLength = causeMessage?.size ?: 0

companion object {
internal val CallHandlerCoroutineName = CoroutineName("call-handler")
response.status(HttpStatusCode.BadRequest)
response.headers.append(HttpHeaders.ContentLength, contentLength.toString(), safeOnly = false)
if (contentLength > 0) {
response.headers.append(HttpHeaders.ContentType, "text/plain; charset=utf-8", safeOnly = false)
}
response.headers.append(HttpHeaders.Connection, "close", safeOnly = false)
response.sendResponse(chunked = false, content)
finish()
}

internal fun NettyHttp1ApplicationRequest.isValid(): Boolean {
if (httpRequest.decoderResult().isFailure) {
return false
private fun NettyHttp1ApplicationCall.logCause() {
if (application.log.isTraceEnabled) {
val cause = failureCause ?: return
application.log.trace("Failed to decode request", cause)
}

if (!headers.contains(HttpHeaders.TransferEncoding)) return true

val encodings = headers.getAll(HttpHeaders.TransferEncoding) ?: return true
return encodings.hasValidTransferEncoding()
}

private val NettyHttp1ApplicationCall.failureCause: Throwable?
get() = httpRequest.decoderResult()?.cause()

internal fun List<String>.hasValidTransferEncoding(): Boolean {
forEachIndexed { headerIndex, header ->
val chunkedStart = header.indexOf(CHUNKED_VALUE)
Expand Down Expand Up @@ -162,6 +97,3 @@ internal fun List<String>.hasValidTransferEncoding(): Boolean {
}

private fun Char.isSeparator(): Boolean = (this == ' ' || this == ',')

private val NettyHttp1ApplicationCall.failureCause: Throwable?
get() = request.httpRequest.decoderResult()?.cause()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
import io.netty.util.concurrent.EventExecutorGroup
import kotlinx.coroutines.cancel
import java.io.FileInputStream
import java.nio.channels.ClosedChannelException
import java.security.KeyStore
Expand Down Expand Up @@ -186,7 +185,6 @@ public class NettyChannelInitializer(
val handler = NettyHttp2Handler(
enginePipeline,
application,
callEventGroup,
application.coroutineContext + userContext,
runningLimit
)
Expand All @@ -202,7 +200,6 @@ public class NettyChannelInitializer(
val handler = NettyHttp2Handler(
enginePipeline,
applicationProvider(),
callEventGroup,
userContext,
runningLimit
)
Expand Down Expand Up @@ -232,7 +229,6 @@ public class NettyChannelInitializer(
applicationProvider,
enginePipeline,
environment,
callEventGroup,
engineContext,
userContext,
runningLimit
Expand Down Expand Up @@ -266,7 +262,6 @@ public class NettyChannelInitializer(
applicationProvider,
enginePipeline,
environment,
callEventGroup,
engineContext,
userContext,
runningLimit
Expand Down Expand Up @@ -330,14 +325,14 @@ public class NettyChannelInitializer(
if (SslProvider.isAlpnSupported(SslProvider.OPENSSL)) {
return SslProvider.OPENSSL
}
} catch (ignore: Throwable) {
} catch (_: Throwable) {
}

try {
if (SslProvider.isAlpnSupported(SslProvider.JDK)) {
return SslProvider.JDK
}
} catch (ignore: Throwable) {
} catch (_: Throwable) {
}

return null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.netty.http1
Expand All @@ -16,14 +16,12 @@ import kotlin.coroutines.*
internal class NettyHttp1ApplicationCall(
application: Application,
context: ChannelHandlerContext,
httpRequest: HttpRequest,
val httpRequest: HttpRequest,
requestBodyChannel: ByteReadChannel?,
engineContext: CoroutineContext,
userContext: CoroutineContext,
override val coroutineContext: CoroutineContext,
) : NettyApplicationCall(application, context, httpRequest), CoroutineScope {

override val coroutineContext: CoroutineContext = userContext

override val request = NettyHttp1ApplicationRequest(
this,
engineContext,
Expand All @@ -36,7 +34,7 @@ internal class NettyHttp1ApplicationCall(
this,
context,
engineContext,
userContext,
coroutineContext,
httpRequest.protocolVersion()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ internal class NettyHttp1ApplicationResponse(
override suspend fun respondUpgrade(upgrade: OutgoingContent.ProtocolUpgrade) {
val nettyContext = context
val nettyChannel = nettyContext.channel()
val userAppContext = userContext + NettyDispatcher.CurrentContext(nettyContext)

val bodyHandler = nettyContext.pipeline().get(RequestBodyHandler::class.java)
val upgradedReadChannel = bodyHandler.upgrade()
Expand All @@ -104,7 +103,7 @@ internal class NettyHttp1ApplicationResponse(
upgradedReadChannel,
upgradedWriteChannel,
engineContext,
userAppContext
userContext
)

job.invokeOnCompletion {
Expand Down
Loading
Loading