diff --git a/ktor-client/ktor-client-core/js/src/io/ktor/client/engine/js/JsClientEngine.kt b/ktor-client/ktor-client-core/js/src/io/ktor/client/engine/js/JsClientEngine.kt index 9ad1b5a50e0..d1463ecf9a4 100644 --- a/ktor-client/ktor-client-core/js/src/io/ktor/client/engine/js/JsClientEngine.kt +++ b/ktor-client/ktor-client-core/js/src/io/ktor/client/engine/js/JsClientEngine.kt @@ -4,7 +4,7 @@ package io.ktor.client.engine.js -import io.ktor.client.FetchOptions +import io.ktor.client.* import io.ktor.client.engine.* import io.ktor.client.engine.js.compatibility.* import io.ktor.client.plugins.* @@ -132,26 +132,16 @@ internal class JsClientEngine( private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation -> if (continuation.isCancelled) return@suspendCancellableCoroutine - lateinit var eventListener: (Event) -> Unit - eventListener = { event: Event -> - removeEventListener("open", callback = eventListener) - removeEventListener("error", callback = eventListener) + val disposable = addOneTimeEventListener("open", "error") { event -> when (event.type) { "open" -> continuation.resume(this) - "error" -> { - continuation.resumeWithException(WebSocketException(event.asString())) - } + "error" -> continuation.resumeWithException(WebSocketException(event.asString())) } } - addEventListener("open", callback = eventListener) - addEventListener("error", callback = eventListener) - - continuation.invokeOnCancellation { - removeEventListener("open", callback = eventListener) - removeEventListener("error", callback = eventListener) - - if (it != null) { + continuation.invokeOnCancellation { cause -> + disposable.dispose() + if (cause != null) { this@awaitConnection.close() } } diff --git a/ktor-client/ktor-client-core/js/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt b/ktor-client/ktor-client-core/js/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt index 408bd373ab2..25a548d1a6e 100644 --- a/ktor-client/ktor-client-core/js/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt +++ b/ktor-client/ktor-client-core/js/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt @@ -4,6 +4,7 @@ package io.ktor.client.plugins.websocket +import io.ktor.client.utils.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* import io.ktor.websocket.* @@ -14,10 +15,8 @@ import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.consumeEach import org.khronos.webgl.ArrayBuffer import org.khronos.webgl.Int8Array -import org.w3c.dom.ARRAYBUFFER -import org.w3c.dom.BinaryType -import org.w3c.dom.MessageEvent -import org.w3c.dom.WebSocket +import org.w3c.dom.* +import org.w3c.dom.events.Event import kotlin.coroutines.CoroutineContext @OptIn(InternalAPI::class) @@ -59,12 +58,11 @@ internal class JsWebSocketSession( if (websocket.readyState == WebSocket.OPEN) { return block() } - websocket.addEventListener("open", callback = { block() }) + websocket.addOneTimeEventListener("open") { block() } } init { - val onMessage: (org.w3c.dom.events.Event) -> Unit = { e -> - val event = e.unsafeCast() + val onMessage = websocket.addEventListener("message") { event -> val frame: Frame = when (val data = event.data) { is ArrayBuffer -> Frame.Binary(true, Int8Array(data).unsafeCast()) is String -> Frame.Text(data) @@ -77,21 +75,19 @@ internal class JsWebSocketSession( _incoming.trySend(frame) } - val onError: (org.w3c.dom.events.Event) -> Unit = { e -> - val cause = WebSocketException("$e") + val onError = websocket.addEventListener("error") { event -> + val cause = WebSocketException("$event") _closeReason.completeExceptionally(cause) _incoming.close(cause) _outgoing.cancel() } - lateinit var onClose: (dynamic) -> Unit - onClose = { e -> - val reason = CloseReason(e.code as Short, e.reason as String) + websocket.addOneTimeEventListener("close") { event -> + val reason = CloseReason(event.code, event.reason) _closeReason.complete(reason) _incoming.trySend(Frame.Close(reason)) _incoming.close() _outgoing.cancel() - websocket.removeEventListener("close", callback = onClose) } // we must not throw exceptions before this @@ -106,14 +102,11 @@ internal class JsWebSocketSession( websocket.close(CloseReason.Codes.NORMAL.code, "Client failed") } } - websocket.removeEventListener("message", callback = onMessage) - websocket.removeEventListener("error", callback = onError) + onMessage.dispose() + onError.dispose() } websocket.binaryType = BinaryType.ARRAYBUFFER - websocket.addEventListener("message", callback = onMessage) - websocket.addEventListener("error", callback = onError) - websocket.addEventListener("close", callback = onClose) launch { _outgoing.consumeEach { diff --git a/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/engine/js/WasmJsClientEngine.kt b/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/engine/js/WasmJsClientEngine.kt index d487165590f..b82abcf4bc5 100644 --- a/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/engine/js/WasmJsClientEngine.kt +++ b/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/engine/js/WasmJsClientEngine.kt @@ -4,7 +4,7 @@ package io.ktor.client.engine.js -import io.ktor.client.FetchOptions +import io.ktor.client.* import io.ktor.client.engine.* import io.ktor.client.engine.js.compatibility.* import io.ktor.client.plugins.* @@ -16,7 +16,6 @@ import io.ktor.http.* import io.ktor.util.* import io.ktor.util.date.* import io.ktor.utils.io.* -import io.ktor.websocket.ChannelConfig import kotlinx.coroutines.* import org.w3c.dom.WebSocket import org.w3c.dom.events.Event @@ -141,26 +140,15 @@ internal class JsClientEngine( private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation -> if (continuation.isCancelled) return@suspendCancellableCoroutine - lateinit var eventListener: (JsAny) -> Unit - eventListener = { it: JsAny -> - val event: Event = it.unsafeCast() - removeEventListener("open", callback = eventListener) - removeEventListener("error", callback = eventListener) + val disposable = addOneTimeEventListener("open", "error") { event -> when (event.type) { "open" -> continuation.resume(this) - "error" -> { - continuation.resumeWithException(WebSocketException(eventAsString(event))) - } + "error" -> continuation.resumeWithException(WebSocketException(eventAsString(event))) } } - addEventListener("open", callback = eventListener) - addEventListener("error", callback = eventListener) - continuation.invokeOnCancellation { - removeEventListener("open", callback = eventListener) - removeEventListener("error", callback = eventListener) - + disposable.dispose() if (it != null) { this@awaitConnection.close() } diff --git a/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt b/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt index 41cf2868a5f..c69bc8b788b 100644 --- a/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt +++ b/ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.consumeEach import org.khronos.webgl.ArrayBuffer import org.khronos.webgl.Uint8Array import org.w3c.dom.* +import org.w3c.dom.events.Event import kotlin.coroutines.CoroutineContext @Suppress("UNUSED_PARAMETER") @@ -64,13 +65,11 @@ internal class JsWebSocketSession( if (websocket.readyState == WebSocket.OPEN) { return block() } - websocket.addEventListener("open", callback = { _: JsAny -> block() }) + websocket.addOneTimeEventListener("open") { block() } } init { - val onMessage: (JsAny) -> Unit = { e -> - val event = e.unsafeCast() - + val onMessage = websocket.addEventListener("message") { event -> val data = event.data if (data == null) { val error = IllegalStateException("Empty message - no data for: ${event.type}") @@ -95,22 +94,19 @@ internal class JsWebSocketSession( _incoming.trySend(frame) } - val onError: (JsAny) -> Unit = { e -> - val cause = WebSocketException("$e") + val onError = websocket.addEventListener("error") { event -> + val cause = WebSocketException("$event") _closeReason.completeExceptionally(cause) _incoming.close(cause) _outgoing.cancel() } - lateinit var onClose: (JsAny) -> Unit - onClose = { e -> - val closeEvent = e.unsafeCast() - val reason = CloseReason(closeEvent.code, closeEvent.reason) + websocket.addOneTimeEventListener("close") { event -> + val reason = CloseReason(event.code, event.reason) _closeReason.complete(reason) _incoming.trySend(Frame.Close(reason)) _incoming.close() _outgoing.cancel() - websocket.removeEventListener("close", callback = onClose) } coroutineContext[Job]?.invokeOnCompletion { cause -> @@ -123,14 +119,11 @@ internal class JsWebSocketSession( websocket.close(CloseReason.Codes.NORMAL.code, "Client failed") } } - websocket.removeEventListener("message", callback = onMessage) - websocket.removeEventListener("error", callback = onError) + onMessage.dispose() + onError.dispose() } websocket.binaryType = BinaryType.ARRAYBUFFER - websocket.addEventListener("message", callback = onMessage) - websocket.addEventListener("error", callback = onError) - websocket.addEventListener("close", callback = onClose) launch { _outgoing.consumeEach { diff --git a/ktor-client/ktor-client-core/web/src/io/ktor/client/utils/JsUtils.web.kt b/ktor-client/ktor-client-core/web/src/io/ktor/client/utils/JsUtils.web.kt new file mode 100644 index 00000000000..c23c3863e63 --- /dev/null +++ b/ktor-client/ktor-client-core/web/src/io/ktor/client/utils/JsUtils.web.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package io.ktor.client.utils + +import kotlinx.coroutines.DisposableHandle +import org.w3c.dom.events.Event +import org.w3c.dom.events.EventTarget +import kotlin.js.JsAny +import kotlin.js.unsafeCast + +/** + * Registers event listeners for the given [events] and returns a [DisposableHandle] to remove them. + * + * The event is unsafe-cast to [T] before invoking [listener]. The cast is unchecked because JS + * event objects from some runtimes (e.g. the `ws` npm package) are plain objects that do not + * satisfy Kotlin `instanceof` checks for DOM event subclasses. + * + * @param events event names to listen for + * @param listener callback invoked when any of the events fire + * @return a handle to remove the listeners + */ +internal inline fun EventTarget.addEventListener( + vararg events: String, + crossinline listener: (T) -> Unit +): DisposableHandle { + // The callback parameter is JsAny, not Event, by design. + // Kotlin/WasmJS generates a JS adapter for each lambda based on its declared parameter type: + // a `(Event) -> Unit` adapter checks `x instanceof globalThis.Event` before invoking the lambda. + // The `ws` npm package defines its own `Event` hierarchy that does not extend `globalThis.Event` + // (see https://github.com/websockets/ws/issues/1818), so that check fails for every event fired + // by a Node.js WebSocket. + // Using `(JsAny) -> Unit` produces an adapter with no instanceof check, accepting any JS value. + // This is valid Kotlin: `(JsAny) -> Unit` is a subtype of `(Event) -> Unit` by contravariance. + // `unsafeCast()` inside the body is a no-op — it changes only the compile-time type with no + // runtime check — so it never fails regardless of the actual JS object type. + @Suppress("RemoveExplicitTypeArguments") // Compiler fails to infer type arguments otherwise + val callback = { event: JsAny -> listener(event.unsafeCast()) } + events.forEach { addEventListener(it, callback) } + return DisposableHandle { events.forEach { removeEventListener(it, callback) } } +} + +/** + * Registers a one-time event listener that removes itself after the first event fires. + * The listener is registered for all specified [events] and fires once for whichever event occurs first. + * + * The event is unsafe-cast to [T] before invoking [listener]. See [addEventListener] for why. + * + * @param events event names to listen for + * @param listener callback invoked when one of the events fires + * @return a handle to remove the listener before it fires + */ +internal inline fun EventTarget.addOneTimeEventListener( + vararg events: String, + crossinline listener: (T) -> Unit +): DisposableHandle { + lateinit var disposable: DisposableHandle + disposable = addEventListener(*events) { event -> + disposable.dispose() + listener(event) + } + return disposable +}