Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.ktor.client.engine.js

import io.ktor.client.FetchOptions
import io.ktor.client.*
import io.ktor.client.engine.*
import io.ktor.client.engine.js.compatibility.*
import io.ktor.client.plugins.*
Expand Down Expand Up @@ -132,26 +132,16 @@ internal class JsClientEngine(
private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation ->
if (continuation.isCancelled) return@suspendCancellableCoroutine

lateinit var eventListener: (Event) -> Unit
eventListener = { event: Event ->
removeEventListener("open", callback = eventListener)
removeEventListener("error", callback = eventListener)
val disposable = addOneTimeEventListener<Event>("open", "error") { event ->
when (event.type) {
"open" -> continuation.resume(this)
"error" -> {
continuation.resumeWithException(WebSocketException(event.asString()))
}
"error" -> continuation.resumeWithException(WebSocketException(event.asString()))
}
}

addEventListener("open", callback = eventListener)
addEventListener("error", callback = eventListener)

continuation.invokeOnCancellation {
removeEventListener("open", callback = eventListener)
removeEventListener("error", callback = eventListener)

if (it != null) {
continuation.invokeOnCancellation { cause ->
disposable.dispose()
if (cause != null) {
this@awaitConnection.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.ktor.client.plugins.websocket

import io.ktor.client.utils.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.websocket.*
Expand All @@ -14,10 +15,8 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import org.khronos.webgl.ArrayBuffer
import org.khronos.webgl.Int8Array
import org.w3c.dom.ARRAYBUFFER
import org.w3c.dom.BinaryType
import org.w3c.dom.MessageEvent
import org.w3c.dom.WebSocket
import org.w3c.dom.*
import org.w3c.dom.events.Event
import kotlin.coroutines.CoroutineContext

@OptIn(InternalAPI::class)
Expand Down Expand Up @@ -59,12 +58,11 @@ internal class JsWebSocketSession(
if (websocket.readyState == WebSocket.OPEN) {
return block()
}
websocket.addEventListener("open", callback = { block() })
websocket.addOneTimeEventListener<Event>("open") { block() }
}

init {
val onMessage: (org.w3c.dom.events.Event) -> Unit = { e ->
val event = e.unsafeCast<MessageEvent>()
val onMessage = websocket.addEventListener<MessageEvent>("message") { event ->
val frame: Frame = when (val data = event.data) {
is ArrayBuffer -> Frame.Binary(true, Int8Array(data).unsafeCast<ByteArray>())
is String -> Frame.Text(data)
Expand All @@ -77,21 +75,19 @@ internal class JsWebSocketSession(
_incoming.trySend(frame)
}

val onError: (org.w3c.dom.events.Event) -> Unit = { e ->
val cause = WebSocketException("$e")
val onError = websocket.addEventListener<ErrorEvent>("error") { event ->
val cause = WebSocketException("$event")
_closeReason.completeExceptionally(cause)
_incoming.close(cause)
_outgoing.cancel()
}

lateinit var onClose: (dynamic) -> Unit
onClose = { e ->
val reason = CloseReason(e.code as Short, e.reason as String)
websocket.addOneTimeEventListener<CloseEvent>("close") { event ->
val reason = CloseReason(event.code, event.reason)
_closeReason.complete(reason)
_incoming.trySend(Frame.Close(reason))
_incoming.close()
_outgoing.cancel()
websocket.removeEventListener("close", callback = onClose)
}

// we must not throw exceptions before this
Expand All @@ -106,14 +102,11 @@ internal class JsWebSocketSession(
websocket.close(CloseReason.Codes.NORMAL.code, "Client failed")
}
}
websocket.removeEventListener("message", callback = onMessage)
websocket.removeEventListener("error", callback = onError)
onMessage.dispose()
onError.dispose()
}

websocket.binaryType = BinaryType.ARRAYBUFFER
websocket.addEventListener("message", callback = onMessage)
websocket.addEventListener("error", callback = onError)
websocket.addEventListener("close", callback = onClose)

launch {
_outgoing.consumeEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.ktor.client.engine.js

import io.ktor.client.FetchOptions
import io.ktor.client.*
import io.ktor.client.engine.*
import io.ktor.client.engine.js.compatibility.*
import io.ktor.client.plugins.*
Expand All @@ -16,7 +16,6 @@ import io.ktor.http.*
import io.ktor.util.*
import io.ktor.util.date.*
import io.ktor.utils.io.*
import io.ktor.websocket.ChannelConfig
import kotlinx.coroutines.*
import org.w3c.dom.WebSocket
import org.w3c.dom.events.Event
Expand Down Expand Up @@ -141,26 +140,15 @@ internal class JsClientEngine(
private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation ->
if (continuation.isCancelled) return@suspendCancellableCoroutine

lateinit var eventListener: (JsAny) -> Unit
eventListener = { it: JsAny ->
val event: Event = it.unsafeCast()
removeEventListener("open", callback = eventListener)
removeEventListener("error", callback = eventListener)
val disposable = addOneTimeEventListener<Event>("open", "error") { event ->
when (event.type) {
"open" -> continuation.resume(this)
"error" -> {
continuation.resumeWithException(WebSocketException(eventAsString(event)))
}
"error" -> continuation.resumeWithException(WebSocketException(eventAsString(event)))
}
}

addEventListener("open", callback = eventListener)
addEventListener("error", callback = eventListener)

continuation.invokeOnCancellation {
removeEventListener("open", callback = eventListener)
removeEventListener("error", callback = eventListener)

disposable.dispose()
if (it != null) {
this@awaitConnection.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.consumeEach
import org.khronos.webgl.ArrayBuffer
import org.khronos.webgl.Uint8Array
import org.w3c.dom.*
import org.w3c.dom.events.Event
import kotlin.coroutines.CoroutineContext

@Suppress("UNUSED_PARAMETER")
Expand Down Expand Up @@ -64,13 +65,11 @@ internal class JsWebSocketSession(
if (websocket.readyState == WebSocket.OPEN) {
return block()
}
websocket.addEventListener("open", callback = { _: JsAny -> block() })
websocket.addOneTimeEventListener<Event>("open") { block() }
}

init {
val onMessage: (JsAny) -> Unit = { e ->
val event = e.unsafeCast<MessageEvent>()

val onMessage = websocket.addEventListener<MessageEvent>("message") { event ->
val data = event.data
if (data == null) {
val error = IllegalStateException("Empty message - no data for: ${event.type}")
Expand All @@ -95,22 +94,19 @@ internal class JsWebSocketSession(
_incoming.trySend(frame)
}

val onError: (JsAny) -> Unit = { e ->
val cause = WebSocketException("$e")
val onError = websocket.addEventListener<ErrorEvent>("error") { event ->
val cause = WebSocketException("$event")
_closeReason.completeExceptionally(cause)
_incoming.close(cause)
_outgoing.cancel()
}

lateinit var onClose: (JsAny) -> Unit
onClose = { e ->
val closeEvent = e.unsafeCast<CloseEvent>()
val reason = CloseReason(closeEvent.code, closeEvent.reason)
websocket.addOneTimeEventListener<CloseEvent>("close") { event ->
val reason = CloseReason(event.code, event.reason)
_closeReason.complete(reason)
_incoming.trySend(Frame.Close(reason))
_incoming.close()
_outgoing.cancel()
websocket.removeEventListener("close", callback = onClose)
}

coroutineContext[Job]?.invokeOnCompletion { cause ->
Expand All @@ -123,14 +119,11 @@ internal class JsWebSocketSession(
websocket.close(CloseReason.Codes.NORMAL.code, "Client failed")
}
}
websocket.removeEventListener("message", callback = onMessage)
websocket.removeEventListener("error", callback = onError)
onMessage.dispose()
onError.dispose()
}

websocket.binaryType = BinaryType.ARRAYBUFFER
websocket.addEventListener("message", callback = onMessage)
websocket.addEventListener("error", callback = onError)
websocket.addEventListener("close", callback = onClose)

launch {
_outgoing.consumeEach {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.utils

import kotlinx.coroutines.DisposableHandle
import org.w3c.dom.events.Event
import org.w3c.dom.events.EventTarget
import kotlin.js.JsAny
import kotlin.js.unsafeCast

/**
* Registers event listeners for the given [events] and returns a [DisposableHandle] to remove them.
*
* The event is unsafe-cast to [T] before invoking [listener]. The cast is unchecked because JS
* event objects from some runtimes (e.g. the `ws` npm package) are plain objects that do not
* satisfy Kotlin `instanceof` checks for DOM event subclasses.
*
* @param events event names to listen for
* @param listener callback invoked when any of the events fire
* @return a handle to remove the listeners
*/
internal inline fun <reified T : Event> EventTarget.addEventListener(
vararg events: String,
crossinline listener: (T) -> Unit
): DisposableHandle {
// The callback parameter is JsAny, not Event, by design.
// Kotlin/WasmJS generates a JS adapter for each lambda based on its declared parameter type:
// a `(Event) -> Unit` adapter checks `x instanceof globalThis.Event` before invoking the lambda.
// The `ws` npm package defines its own `Event` hierarchy that does not extend `globalThis.Event`
// (see https://github.com/websockets/ws/issues/1818), so that check fails for every event fired
// by a Node.js WebSocket.
// Using `(JsAny) -> Unit` produces an adapter with no instanceof check, accepting any JS value.
// This is valid Kotlin: `(JsAny) -> Unit` is a subtype of `(Event) -> Unit` by contravariance.
// `unsafeCast<T>()` inside the body is a no-op — it changes only the compile-time type with no
// runtime check — so it never fails regardless of the actual JS object type.
@Suppress("RemoveExplicitTypeArguments") // Compiler fails to infer type arguments otherwise
val callback = { event: JsAny -> listener(event.unsafeCast<T>()) }
Comment on lines +28 to +39
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turned out to be not so simple 😅
But at least now this code is isolated and documented here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is much better for event listeners listening a single event. I will try.
However, for the same listener registered to listen two events we will still need this utility function

events.forEach { addEventListener(it, callback) }
return DisposableHandle { events.forEach { removeEventListener(it, callback) } }
}

/**
* Registers a one-time event listener that removes itself after the first event fires.
* The listener is registered for all specified [events] and fires once for whichever event occurs first.
*
* The event is unsafe-cast to [T] before invoking [listener]. See [addEventListener] for why.
*
* @param events event names to listen for
* @param listener callback invoked when one of the events fires
* @return a handle to remove the listener before it fires
*/
internal inline fun <reified T : Event> EventTarget.addOneTimeEventListener(
vararg events: String,
crossinline listener: (T) -> Unit
): DisposableHandle {
lateinit var disposable: DisposableHandle
disposable = addEventListener<T>(*events) { event ->
disposable.dispose()
listener(event)
}
return disposable
}
Loading