From 8accfe4460b33bb7d2bf94d65c2786e31d8daca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Fri, 9 Jan 2026 12:37:30 +0100 Subject: [PATCH 1/3] Use in-process event emitter instead of Redis pub/sub --- package.json | 1 + src/SocketServer.js | 50 +++++++++++------------------------ src/Uwave.js | 8 +++++- src/plugins/configStore.js | 54 ++++++++++---------------------------- 4 files changed, 38 insertions(+), 75 deletions(-) diff --git a/package.json b/package.json index 83fd585d..5b72f2bf 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "cookie": "^1.0.1", "cookie-parser": "^1.4.4", "cors": "^2.8.5", + "emittery": "^1.2.0", "escape-string-regexp": "^5.0.0", "explain-error": "^1.0.4", "express": "^5.0.0", diff --git a/src/SocketServer.js b/src/SocketServer.js index 192ff499..a3eff387 100644 --- a/src/SocketServer.js +++ b/src/SocketServer.js @@ -1,6 +1,5 @@ import { promisify } from 'node:util'; import lodash from 'lodash'; -import sjson from 'secure-json-parse'; import { WebSocketServer } from 'ws'; import Ajv from 'ajv'; import { stdSerializers } from 'pino'; @@ -96,8 +95,6 @@ class SocketServer { #logger; - #redisSubscription; - #wss; #closing = false; @@ -134,6 +131,8 @@ class SocketServer { */ #serverActions; + #unsubscribe; + /** * Create a socket server. * @@ -157,7 +156,6 @@ class SocketServer { req: stdSerializers.req, }, }); - this.#redisSubscription = uw.redis.duplicate(); this.options = { /** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */ @@ -176,11 +174,8 @@ class SocketServer { port: options.server ? undefined : options.port, }); - uw.use(() => this.#redisSubscription.subscribe('uwave')); - this.#redisSubscription.on('message', (channel, command) => { - // this returns a promise, but we don't handle the error case: - // there is not much we can do, so just let node.js crash w/ an unhandled rejection - this.onServerMessage(channel, command); + this.#unsubscribe = uw.events.onAny((command, data) => { + this.#onServerMessage(command, data); }); this.#wss.on('error', (error) => { @@ -678,33 +673,20 @@ class SocketServer { } /** - * Handle command messages coming in from Redis. + * Handle command messages coming in from elsewhere in the app. * Some commands are intended to broadcast immediately to all connected * clients, but others require special action. * - * @param {string} channel - * @param {string} rawCommand - * @returns {Promise} - * @private + * @template {keyof import('./redisMessages.js').ServerActionParameters} K + * @param {K} command + * @param {import('./redisMessages.js').ServerActionParameters[K]} data */ - async onServerMessage(channel, rawCommand) { - /** - * @type {{ command: string, data: import('type-fest').JsonValue }|undefined} - */ - const json = sjson.safeParse(rawCommand); - if (!json) { - return; - } - const { command, data } = json; + #onServerMessage(command, data) { + this.#logger.trace({ channel: command, command, data }, 'server message'); - this.#logger.trace({ channel, command, data }, 'server message'); - - if (has(this.#serverActions, command)) { - const action = this.#serverActions[command]; - if (action !== undefined) { // the types for `ServerActions` allow undefined, so... - // @ts-expect-error TS2345 `data` is validated - action(data); - } + const action = this.#serverActions[command]; + if (action !== undefined) { + action(data); } } @@ -714,9 +696,10 @@ class SocketServer { * @returns {Promise} */ async destroy() { - clearInterval(this.#pinger); - this.#closing = true; + + this.#unsubscribe(); + clearInterval(this.#pinger); clearInterval(this.#guestCountInterval); for (const connection of this.#connections) { @@ -725,7 +708,6 @@ class SocketServer { const closeWsServer = promisify(this.#wss.close.bind(this.#wss)); await closeWsServer(); - await this.#redisSubscription.quit(); } /** diff --git a/src/Uwave.js b/src/Uwave.js index c6d39ec1..47360b7b 100644 --- a/src/Uwave.js +++ b/src/Uwave.js @@ -21,6 +21,7 @@ import waitlist from './plugins/waitlist.js'; import passport from './plugins/passport.js'; import migrations from './plugins/migrations.js'; import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js'; +import Emittery from 'emittery'; const DEFAULT_SQLITE_PATH = './uwave.sqlite'; const DEFAULT_REDIS_URL = 'redis://localhost:6379'; @@ -125,6 +126,11 @@ class UwaveServer extends EventEmitter { // @ts-expect-error TS2564 Definitely assigned in a plugin socketServer; + /** @type {Emittery} */ + events = new Emittery({ + debug: { name: 'u-wave-core' }, + }); + /** * @type {Map} */ @@ -294,7 +300,7 @@ class UwaveServer extends EventEmitter { * @param {import('./redisMessages.js').ServerActionParameters[CommandName]} data */ publish(command, data) { - return this.redis.publish('uwave', JSON.stringify({ command, data })); + return this.events.emit(command, data); } async listen() { diff --git a/src/plugins/configStore.js b/src/plugins/configStore.js index a359f5dd..590f2d20 100644 --- a/src/plugins/configStore.js +++ b/src/plugins/configStore.js @@ -4,7 +4,6 @@ import EventEmitter from 'node:events'; import Ajv from 'ajv/dist/2019.js'; import formats from 'ajv-formats'; import jsonMergePatch from 'json-merge-patch'; -import sjson from 'secure-json-parse'; import ValidationError from '../errors/ValidationError.js'; import { sql } from 'kysely'; import { fromJson, json, jsonb } from '../utils/sqlite.js'; @@ -29,12 +28,12 @@ class ConfigStore { #logger; - #subscriber; - #ajv; #emitter = new EventEmitter(); + #unsubscribe; + /** @type {Map>} */ #validators = new Map(); @@ -44,7 +43,6 @@ class ConfigStore { constructor(uw) { this.#uw = uw; this.#logger = uw.logger.child({ ns: 'uwave:config' }); - this.#subscriber = uw.redis.duplicate(); this.#ajv = new Ajv({ useDefaults: true, // Allow unknown keywords (`uw:xyz`) @@ -59,40 +57,16 @@ class ConfigStore { fs.readFileSync(new URL('../schemas/definitions.json', import.meta.url), 'utf8'), )); - this.#subscriber.on('message', (_channel, command) => { - this.#onServerMessage(command); - }); - - uw.use(async () => this.#subscriber.subscribe('uwave')); - } - - /** - * @param {string} rawCommand - */ - async #onServerMessage(rawCommand) { - /** - * @type {undefined|{ - * command: string, - * data: import('../redisMessages.js').ServerActionParameters['configStore:update'], - * }} - */ - const json = sjson.safeParse(rawCommand); - if (!json) { - return; - } - const { command, data } = json; - if (command !== CONFIG_UPDATE_MESSAGE) { - return; - } - - this.#logger.trace({ command, data }, 'handle config update'); + this.#unsubscribe = uw.events.on(CONFIG_UPDATE_MESSAGE, async (data) => { + this.#logger.trace({ data }, 'handle config update'); - try { - const updatedSettings = await this.get(data.key); - this.#emitter.emit(data.key, updatedSettings, data.user, data.patch); - } catch (error) { - this.#logger.error({ err: error }, 'could not retrieve settings after update'); - } + try { + const updatedSettings = await this.get(data.key); + this.#emitter.emit(data.key, updatedSettings, data.user, data.patch); + } catch (error) { + this.#logger.error({ err: error }, 'could not retrieve settings after update'); + } + }); } /** @@ -258,8 +232,8 @@ class ConfigStore { }; } - async destroy() { - await this.#subscriber.quit(); + destroy() { + this.#unsubscribe(); } } @@ -268,7 +242,7 @@ class ConfigStore { */ async function configStorePlugin(uw) { uw.config = new ConfigStore(uw); - uw.onClose(() => uw.config.destroy()); + uw.onClose(async () => uw.config.destroy()); } export default configStorePlugin; From b2397af76cae5a8cb9d98c693390732eb4989835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sat, 10 Jan 2026 15:39:56 +0100 Subject: [PATCH 2/3] Use async emitter for sockets stuff --- src/SocketServer.js | 14 +++++--------- src/sockets/AuthedConnection.js | 14 +++++++++++--- src/sockets/GuestConnection.js | 12 +++++++++--- src/sockets/LostConnection.js | 7 +++++-- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/SocketServer.js b/src/SocketServer.js index a3eff387..9bb420be 100644 --- a/src/SocketServer.js +++ b/src/SocketServer.js @@ -512,12 +512,12 @@ class SocketServer { /** * Get a LostConnection for a user, if one exists. * - * @param {User} user + * @param {string} sessionID * @private */ - getLostConnection(user) { + getLostConnection(sessionID) { return this.#connections.find((connection) => ( - connection instanceof LostConnection && connection.user.id === user.id + connection instanceof LostConnection && connection.sessionID === sessionID )); } @@ -534,7 +534,7 @@ class SocketServer { connection.on('close', () => { this.remove(connection); }); - connection.on('authenticate', async (user, sessionID, lastEventID) => { + connection.on('authenticate', async ({ user, sessionID, lastEventID }) => { const isReconnect = await connection.isReconnect(sessionID); this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket'); if (isReconnect) { @@ -575,11 +575,7 @@ class SocketServer { }); connection.on( 'command', - /** - * @param {string} command - * @param {import('type-fest').JsonValue} data - */ - (command, data) => { + ({ command, data }) => { this.#logger.trace({ userId: user.id, command, data }, 'command'); if (has(this.#clientActions, command)) { // Ignore incorrect input diff --git a/src/sockets/AuthedConnection.js b/src/sockets/AuthedConnection.js index 96b1039d..34d8b120 100644 --- a/src/sockets/AuthedConnection.js +++ b/src/sockets/AuthedConnection.js @@ -1,4 +1,4 @@ -import EventEmitter from 'node:events'; +import Emittery from 'emittery'; import Ultron from 'ultron'; import WebSocket from 'ws'; import sjson from 'secure-json-parse'; @@ -8,7 +8,13 @@ import { fromJson, json } from '../utils/sqlite.js'; const PING_TIMEOUT = 5_000; const DEAD_TIMEOUT = 30_000; -class AuthedConnection extends EventEmitter { +/** + * @augments {Emittery<{ + * command: { command: string, data: import('type-fest').JsonValue }, + * close: { banned: boolean, lastEventID: string | null }, + * }>} + */ +class AuthedConnection extends Emittery { #events; #logger; @@ -20,6 +26,8 @@ class AuthedConnection extends EventEmitter { /** @type {string|null} */ #lastEventID = null; + banned = false; + /** * @param {import('../Uwave.js').default} uw * @param {import('ws').WebSocket} socket @@ -97,7 +105,7 @@ class AuthedConnection extends EventEmitter { this.#lastMessage = Date.now(); const { command, data } = sjson.safeParse(raw) ?? {}; if (command) { - this.emit('command', command, data); + this.emit('command', { command, data }); } } diff --git a/src/sockets/GuestConnection.js b/src/sockets/GuestConnection.js index da9231d8..0f62608f 100644 --- a/src/sockets/GuestConnection.js +++ b/src/sockets/GuestConnection.js @@ -1,4 +1,4 @@ -import EventEmitter from 'node:events'; +import Emittery from 'emittery'; import { ulid } from 'ulid'; import Ultron from 'ultron'; import WebSocket from 'ws'; @@ -6,7 +6,13 @@ import WebSocket from 'ws'; const PING_TIMEOUT = 5_000; const DEAD_TIMEOUT = 30_000; -class GuestConnection extends EventEmitter { +/** + * @augments {Emittery<{ + * close: undefined, + * authenticate: { user: import('../schema.js').User, sessionID: string, lastEventID: string | null } + * }>} + */ +class GuestConnection extends Emittery { #events; #logger; @@ -69,7 +75,7 @@ class GuestConnection extends EventEmitter { throw new Error('You have been banned'); } - this.emit('authenticate', userModel, sessionID, null); + await this.emit('authenticate', { user: userModel, sessionID, lastEventID: null }); } /** diff --git a/src/sockets/LostConnection.js b/src/sockets/LostConnection.js index ed8c4483..97c6540e 100644 --- a/src/sockets/LostConnection.js +++ b/src/sockets/LostConnection.js @@ -1,6 +1,9 @@ -import EventEmitter from 'node:events'; +import Emittery from 'emittery'; -class LostConnection extends EventEmitter { +/** + * @augments {Emittery<{ close: undefined }>} + */ +class LostConnection extends Emittery { #logger; #expiresAt; From 4807d9b248dd5267be287253aec747162540ccaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Sun, 18 Jan 2026 18:02:28 +0100 Subject: [PATCH 3/3] Record WS messages consistently in tests --- src/sockets/GuestConnection.js | 6 +++- test/booth.mjs | 18 +++------- test/chat.mjs | 60 ++++++---------------------------- test/sockets.mjs | 16 +++------ test/utils/plugin.mjs | 14 +++++++- 5 files changed, 37 insertions(+), 77 deletions(-) diff --git a/src/sockets/GuestConnection.js b/src/sockets/GuestConnection.js index 0f62608f..2930dc95 100644 --- a/src/sockets/GuestConnection.js +++ b/src/sockets/GuestConnection.js @@ -9,7 +9,11 @@ const DEAD_TIMEOUT = 30_000; /** * @augments {Emittery<{ * close: undefined, - * authenticate: { user: import('../schema.js').User, sessionID: string, lastEventID: string | null } + * authenticate: { + * user: import('../schema.js').User, + * sessionID: string, + * lastEventID: string | null, + * }, * }>} */ class GuestConnection extends Emittery { diff --git a/test/booth.mjs b/test/booth.mjs index 03fa76a6..f81f9a71 100644 --- a/test/booth.mjs +++ b/test/booth.mjs @@ -119,10 +119,6 @@ describe('Booth', () => { const token = await uw.test.createTestSessionToken(user); const ws = await uw.test.connectToWebSocketAs(user); - const receivedMessages = []; - ws.on('message', (data, isBinary) => { - receivedMessages.push(JSON.parse(isBinary ? data.toString() : data)); - }); // Prep the DJ account to be able to join the waitlist const { playlist } = await uw.playlists.createPlaylist(dj, { name: 'vote' }); @@ -153,11 +149,11 @@ describe('Booth', () => { .expect(200); await retryFor(500, () => { - assert(receivedMessages.some((message) => message.command === 'vote' && message.data.value === -1)); + assert(ws.messages.some((message) => message.command === 'vote' && message.data.value === -1)); }); // Resubmit vote without changing - receivedMessages.length = 0; + ws.messages.length = 0; await supertest(uw.server) .put(`/api/booth/${historyID}/vote`) .set('Cookie', `uwsession=${token}`) @@ -168,7 +164,7 @@ describe('Booth', () => { // without waiting the whole time limit await delay(200); assert( - !receivedMessages.some((message) => message.command === 'vote' && message.data.value === -1), + !ws.messages.some((message) => message.command === 'vote' && message.data.value === -1), 'should not have re-emitted the vote', ); @@ -179,7 +175,7 @@ describe('Booth', () => { .expect(200); await retryFor(500, () => { - assert(receivedMessages.some((message) => message.command === 'vote' && message.data.value === 1)); + assert(ws.messages.some((message) => message.command === 'vote' && message.data.value === 1)); }); djWs.close(); @@ -382,10 +378,6 @@ describe('Booth', () => { const item = await uw.source('test-source').getOne(dj, 'SELF_FAVORITE'); await uw.playlists.addPlaylistItems(playlist, [item]); const ws = await uw.test.connectToWebSocketAs(dj); - const receivedMessages = []; - ws.on('message', (data, isBinary) => { - receivedMessages.push(JSON.parse(isBinary ? data.toString() : data)); - }); // Prep the favoriter account to grab the song const favoriterToken = await uw.test.createTestSessionToken(favoriter); @@ -429,7 +421,7 @@ describe('Booth', () => { }); // Check that an event was emitted - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws.messages, sinon.match.some(sinon.match({ command: 'favorite', data: { userID: favoriter.id }, }))); diff --git a/test/chat.mjs b/test/chat.mjs index cdf08ae6..48d9f215 100644 --- a/test/chat.mjs +++ b/test/chat.mjs @@ -25,15 +25,10 @@ describe('Chat', () => { const ws = await uw.test.connectToWebSocketAs(user); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - ws.send(JSON.stringify({ command: 'sendChat', data: 'Message text' })); await retryFor(1500, () => { - assert(receivedMessages.some((message) => message.command === 'chatMessage' && message.data.userID === user.id && message.data.message === 'Message text')); + assert(ws.messages.some((message) => message.command === 'chatMessage' && message.data.userID === user.id && message.data.message === 'Message text')); }); }); @@ -52,17 +47,12 @@ describe('Chat', () => { const ws = await uw.test.connectToWebSocketAs(user); const mutedWs = await uw.test.connectToWebSocketAs(mutedUser); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - ws.send(JSON.stringify({ command: 'sendChat', data: 'unmuted' })); mutedWs.send(JSON.stringify({ command: 'sendChat', data: 'muted' })); await retryFor(1500, () => { - assert(receivedMessages.some((message) => message.command === 'chatMessage' && message.data.userID === user.id)); - assert(!receivedMessages.some((message) => message.command === 'chatMessage' && message.data.userID === mutedUser.id)); + assert(ws.messages.some((message) => message.command === 'chatMessage' && message.data.userID === user.id)); + assert(!ws.messages.some((message) => message.command === 'chatMessage' && message.data.userID === mutedUser.id)); }); }); }); @@ -80,11 +70,6 @@ describe('Chat', () => { const ws = await uw.test.connectToWebSocketAs(user); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - // TODO: is it important to serialize this stuff on the server side // so it always gets recorded in the same order? ws.send(JSON.stringify({ command: 'sendChat', data: 'a' })); @@ -95,7 +80,7 @@ describe('Chat', () => { await retryFor(1500, () => { assert.strictEqual( - receivedMessages.filter((message) => message.command === 'chatMessage' && message.data.userID === user.id).length, + ws.messages.filter((message) => message.command === 'chatMessage' && message.data.userID === user.id).length, 3, ); }); @@ -192,14 +177,9 @@ describe('Chat', () => { message: sinon.match.string, }); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - ws.send(JSON.stringify({ command: 'sendChat', data: 'HTTP message text' })); await retryFor(5_000, () => ( - receivedMessages.some((message) => ( + ws.messages.some((message) => ( message.command === 'chatMessage' && message.data.userID === user.id && message.data.message === 'HTTP message text' @@ -227,11 +207,6 @@ describe('Chat', () => { // We do need to be connected to be allowed to send a message const mutedWs = await uw.test.connectToWebSocketAs(mutedUser); - const receivedMessages = []; - adminWs.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - const res = await supertest(uw.server) .post('/api/chat') .set('Cookie', `uwsession=${mutedToken}`) @@ -248,8 +223,8 @@ describe('Chat', () => { .expect(200); await retryFor(1500, () => { - assert(receivedMessages.some((message) => message.command === 'chatMessage' && message.data.userID === adminUser.id)); - assert(!receivedMessages.some((message) => message.command === 'chatMessage' && message.data.userID === mutedUser.id)); + assert(adminWs.messages.some((message) => message.command === 'chatMessage' && message.data.userID === adminUser.id)); + assert(!adminWs.messages.some((message) => message.command === 'chatMessage' && message.data.userID === mutedUser.id)); }); mutedWs.close(); @@ -290,18 +265,13 @@ describe('Chat', () => { const otherUser = await uw.test.createUser(); const ws = await uw.test.connectToWebSocketAs(otherUser); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - await supertest(uw.server) .delete('/api/chat') .set('Cookie', `uwsession=${token}`) .expect(200); await retryFor(1500, () => { - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match.has('command', 'chatDelete'))); + sinon.assert.match(ws.messages, sinon.match.some(sinon.match.has('command', 'chatDelete'))); }); }); }); @@ -342,18 +312,13 @@ describe('Chat', () => { const otherUser = await uw.test.createUser(); const ws = await uw.test.connectToWebSocketAs(otherUser); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - await supertest(uw.server) .delete(`/api/chat/user/${otherUser.id}`) .set('Cookie', `uwsession=${token}`) .expect(200); await retryFor(1500, () => { - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws.messages, sinon.match.some(sinon.match({ command: 'chatDeleteByUser', data: sinon.match({ userID: otherUser.id, @@ -399,18 +364,13 @@ describe('Chat', () => { const otherUser = await uw.test.createUser(); const ws = await uw.test.connectToWebSocketAs(otherUser); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - await supertest(uw.server) .delete(`/api/chat/${messageID}`) .set('Cookie', `uwsession=${token}`) .expect(200); await retryFor(1500, () => { - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws.messages, sinon.match.some(sinon.match({ command: 'chatDeleteByID', data: sinon.match({ _id: messageID, diff --git a/test/sockets.mjs b/test/sockets.mjs index 98646ffc..00b1d9c4 100644 --- a/test/sockets.mjs +++ b/test/sockets.mjs @@ -25,20 +25,15 @@ describe('Sockets', () => { const ws = await uw.test.connectToWebSocketAs(user, userSession); const wsChatter = await uw.test.connectToWebSocketAs(chatter); - const receivedMessages = []; - ws.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); - wsChatter.send(JSON.stringify({ command: 'sendChat', data: 'a' })); wsChatter.send(JSON.stringify({ command: 'sendChat', data: 'b' })); await retryFor(1500, () => { - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws.messages, sinon.match.some(sinon.match({ command: 'chatMessage', data: { userID: chatter.id, message: 'a' }, }))); - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws.messages, sinon.match.some(sinon.match({ command: 'chatMessage', data: { userID: chatter.id, message: 'b' }, }))); @@ -55,16 +50,13 @@ describe('Sockets', () => { // Reconnect & receive the messages const ws2 = await uw.test.connectToWebSocketAs(user, userSession); - ws2.on('message', (data) => { - receivedMessages.push(JSON.parse(data)); - }); await retryFor(1500, () => { - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws2.messages, sinon.match.some(sinon.match({ command: 'chatMessage', data: { userID: chatter.id, message: 'c' }, }))); - sinon.assert.match(receivedMessages, sinon.match.some(sinon.match({ + sinon.assert.match(ws2.messages, sinon.match.some(sinon.match({ command: 'chatMessage', data: { userID: chatter.id, message: 'd' }, }))); diff --git a/test/utils/plugin.mjs b/test/utils/plugin.mjs index 3555031a..e034bb56 100644 --- a/test/utils/plugin.mjs +++ b/test/utils/plugin.mjs @@ -3,6 +3,18 @@ import events from 'events'; import jwt from 'jsonwebtoken'; import WebSocket from 'ws'; +class RecordingWebSocket extends WebSocket { + messages = []; + + constructor(url) { + super(url); + + this.on('message', (data, isBinary) => { + this.messages.push(JSON.parse(isBinary ? data.toString() : data)); + }); + } +} + async function testPlugin(uw) { let i = Date.now(); function createUser() { @@ -25,7 +37,7 @@ async function testPlugin(uw) { const token = await uw.socketServer.authRegistry.createAuthToken(user, session ?? randomUUID()); - const ws = new WebSocket(`ws://localhost:${port}`); + const ws = new RecordingWebSocket(`ws://localhost:${port}`); await events.once(ws, 'open'); ws.send(token);