Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/workerd/server/tests/container-client/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
load("@aspect_rules_js//js:defs.bzl", "js_binary")
load("@rules_shell//shell:sh_test.bzl", "sh_test")
load("//:build/wd_test.bzl", "wd_test")

wd_test(
Expand All @@ -10,3 +12,30 @@ wd_test(
"requires-network", # Accesses unix://var/run/docker.sock
],
)

js_binary(
name = "websocket-close-propagation-client",
entry_point = "websocket-close-propagation-client.js",
)

sh_test(
name = "websocket-close-propagation-test",
size = "enormous",
srcs = ["websocket-close-propagation-test.sh"],
args = [
"$(location //src/workerd/server:workerd_cross)",
"$(location :websocket-close-propagation-client)",
"$(location websocket-close-propagation.capnp.in)",
"$(location websocket-close-propagation-worker.js)",
],
data = [
"websocket-close-propagation.capnp.in",
"websocket-close-propagation-worker.js",
":websocket-close-propagation-client",
"//src/workerd/server:workerd_cross",
],
tags = [
"requires-container-engine",
"requires-network",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

'use strict';

const assert = require('node:assert/strict');

const url = process.argv[2];
assert.ok(url, 'Usage: websocket-close-propagation-client.js <ws-url>');

const CLOSE_TIMEOUT_MS = 5000;
const CONNECT_RETRY_DEADLINE_MS = Date.now() + 5000;

function connectOnce() {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url);
let settled = false;

const timeout = setTimeout(() => {
if (!settled) {
settled = true;
reject(
new Error(
`Timed out after ${CLOSE_TIMEOUT_MS}ms waiting for close event`
)
);
}
}, CLOSE_TIMEOUT_MS);

function done(fn) {
if (!settled) {
settled = true;
clearTimeout(timeout);
fn();
}
}

ws.addEventListener('open', () => {
ws.send('hello');
});

ws.addEventListener('message', (event) => {
if (event.data !== 'Echo: hello') {
done(() => reject(new Error(`Unexpected message: ${event.data}`)));
return;
}

// The echo response above proves data flows end-to-end. Initiate a
// clean close — the close event should propagate back to the client
// the same way data does.
ws.close(1000, 'client closing');
});

ws.addEventListener('close', (event) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You always need to make sure to reciprocate the close:

      ws.close(1000, 'closing in response to client close');

That is likely the bug you have

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is a gap in my repro, but even with reciprocation, I've verified the close event doesn't propagate back to the client. And my earliest test for this on the sandbox-sdk was a server-initiated close scenario, which would still remain.

I can try to expand on the repro to add these cases if that'd be useful!

done(() => {
if (event.code !== 1000) {
reject(
new Error(
`Expected close code 1000, got ${event.code} (reason: ${event.reason})`
)
);
} else {
resolve({ code: event.code, reason: event.reason });
}
});
});

ws.addEventListener('error', (event) => {
done(() => reject(new Error(event?.message ?? 'WebSocket error')));
});
});
}

(async () => {
for (;;) {
try {
const result = await connectOnce();
console.log(`Closed with code=${result.code} reason=${result.reason}`);
process.exit(0);
} catch (err) {
const message = err?.message ?? String(err);
const isConnectError =
message.includes('ECONNREFUSED') ||
message.includes('network error') ||
message.includes('non-101');

if (isConnectError && Date.now() < CONNECT_RETRY_DEADLINE_MS) {
await new Promise((r) => setTimeout(r, 100));
continue;
}

console.error(message);
process.exit(1);
}
}
})();
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env bash
# Copyright (c) 2025 Cloudflare, Inc.
# Licensed under the Apache 2.0 license found in the LICENSE file or at:
# https://opensource.org/licenses/Apache-2.0

set -euo pipefail

WORKERD_BINARY="$1"
CLIENT_BINARY="$2"
TEMPLATE_CAPNP="$3"
WORKER_JS="$4"

TMPDIR="${TEST_TMPDIR:-$(mktemp -d)}"
RUNDIR="$TMPDIR/ws-close-propagation"
mkdir -p "$RUNDIR"

cleanup() {
if [[ -n "${WORKERD_PID:-}" ]]; then
kill "$WORKERD_PID" 2>/dev/null || true
wait "$WORKERD_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT

cp "$WORKER_JS" "$RUNDIR/websocket-close-propagation-worker.js"

LOG="$RUNDIR/workerd.log"

for attempt in $(seq 1 30); do
PORT=$(( (RANDOM % 20000) + 30000 ))
sed -e "s/__PORT__/${PORT}/g" \
"$TEMPLATE_CAPNP" > "$RUNDIR/config.capnp"

"$WORKERD_BINARY" serve "$RUNDIR/config.capnp" --experimental --verbose \
--directory-path=TEST_TMPDIR="$TMPDIR" \
>"$LOG" 2>&1 &
WORKERD_PID=$!

sleep 0.5
if ! kill -0 "$WORKERD_PID" 2>/dev/null; then
wait "$WORKERD_PID" 2>/dev/null || true
unset WORKERD_PID
continue
fi

URL="ws://127.0.0.1:${PORT}/ws"
if "$CLIENT_BINARY" "$URL"; then
exit 0
fi

echo "WebSocket close propagation test failed (${URL})" >&2
echo "--- workerd log tail ---" >&2
tail -200 "$LOG" >&2 || true
exit 1
done

echo "failed to start workerd after repeated attempts" >&2
echo "--- workerd log tail ---" >&2
tail -200 "$LOG" >&2 || true
exit 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { DurableObject } from 'cloudflare:workers';
import { scheduler } from 'node:timers/promises';

export default {
async fetch(request, env) {
const upgrade = request.headers.get('Upgrade');
if (upgrade?.toLowerCase() !== 'websocket') {
return new Response('expected websocket', { status: 400 });
}

const id = env.MY_CONTAINER.idFromName('repro');
const stub = env.MY_CONTAINER.get(id);
return stub.fetch(request);
},
};

export class ContainerProxy extends DurableObject {
async fetch(request) {
const { container } = this.ctx;

if (!container.running) {
container.start({
env: { WS_ENABLED: 'true' },
enableInternet: true,
});
}

// Proxy the websocket upgrade into the container and return it to the
// eyeball client. Close events should propagate back through the same
// path as data.
const maxRetries = 6;
for (let i = 1; i <= maxRetries; i++) {
try {
return await container.getTcpPort(8080).fetch('http://container/ws', {
headers: {
Upgrade: 'websocket',
Connection: 'Upgrade',
'Sec-WebSocket-Key': 'x3JJHMbDL1EzLkh9GBhXDw==',
'Sec-WebSocket-Version': '13',
},
});
} catch (e) {
if (!e.message.includes('container port not found')) {
throw e;
}
console.info(
`Retrying getTcpPort(8080) for the ${i} time due to an error ${e.message}`
);
console.info(e);
if (i === maxRetries) {
console.error(
`Failed to connect to container for WebSocket. Retried ${i} times`
);
throw e;
}
await scheduler.wait(1000);
}
}
throw new Error('unreachable');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Workerd = import "/workerd/workerd.capnp";

const config :Workerd.Config = (
services = [
(name = "main", worker = .mainWorker),
(name = "internet", network = (allow = ["private"])),
(name = "TEST_TMPDIR", disk = (writable = true)),
],

sockets = [
( name = "http",
address = "127.0.0.1:__PORT__",
http = (),
service = "main"
),
],
);

const mainWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "websocket-close-propagation-worker.js"),
],
compatibilityDate = "2026-02-03",
compatibilityFlags = ["nodejs_compat", "experimental"],
containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock")),
durableObjectNamespaces = [
( className = "ContainerProxy",
uniqueKey = "container-client-ws-close-propagation",
container = (imageName = "cloudflare/workerd/container-client-test")
),
],
durableObjectStorage = (localDisk = "TEST_TMPDIR"),
bindings = [
(name = "MY_CONTAINER", durableObjectNamespace = "ContainerProxy"),
],
);
Loading