Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions src/SocketServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { subMinutes } from './utils/date.js';

const { isEmpty } = lodash;

export const REDIS_ACTIVE_SESSIONS = 'users';
export const KEY_ACTIVE_SESSIONS = 'users';

const PING_INTERVAL = 10_000;
const GUEST_COUNT_INTERVAL = 2_000;
Expand Down Expand Up @@ -83,7 +83,7 @@ class SocketServer {
// We do need to clear the `users` list because the lost connection handlers
// will not do so.
uw.socketServer.#logger.warn({ err }, 'could not initialise lost connections');
await uw.redis.del(REDIS_ACTIVE_SESSIONS);
await uw.keyv.delete(KEY_ACTIVE_SESSIONS);
}
});

Expand Down Expand Up @@ -199,9 +199,7 @@ class SocketServer {
return;
}

this.#recountGuests().catch((error) => {
this.#logger.error({ err: error }, 'counting guests failed');
});
this.#recountGuests();
}, GUEST_COUNT_INTERVAL);

this.#clientActions = {
Expand Down Expand Up @@ -398,11 +396,15 @@ class SocketServer {
}
},
'user:join': async ({ userID }) => {
const { users, redis } = this.#uw;
const { users, keyv } = this.#uw;
const user = await users.getUser(userID);
if (user) {
// TODO this should not be the socket server code's responsibility
await redis.rpush(REDIS_ACTIVE_SESSIONS, user.id);
const userIDs = /** @type {import('./schema').UserID[] | null} */ (
await keyv.get(KEY_ACTIVE_SESSIONS)
) ?? [];
userIDs.push(user.id);
await keyv.set(KEY_ACTIVE_SESSIONS, userIDs);
this.broadcast('join', serializeUser(user));
}
},
Expand Down Expand Up @@ -460,10 +462,10 @@ class SocketServer {
* @private
*/
async initLostConnections() {
const { db, redis } = this.#uw;
const userIDs = /** @type {import('./schema').UserID[]} */ (
await redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1)
);
const { db, keyv } = this.#uw;
const userIDs = /** @type {import('./schema').UserID[] | null} */ (
await keyv.get(KEY_ACTIVE_SESSIONS)
) ?? [];
const disconnectedIDs = userIDs.filter((userID) => !this.connection(userID));

if (disconnectedIDs.length === 0) {
Expand Down Expand Up @@ -820,24 +822,22 @@ class SocketServer {
});
}

async getGuestCount() {
const { redis } = this.#uw;
const rawCount = await redis.get('http-api:guests');
if (typeof rawCount !== 'string' || !/^\d+$/.test(rawCount)) {
return 0;
}
return parseInt(rawCount, 10);
}
#lastGuestCount = 0;

async #recountGuests() {
const { redis } = this.#uw;
const guests = this.#connections
.filter((connection) => connection instanceof GuestConnection)
.length;
/** The number of unauthenticated connections. */
get guestCount() {
return this.#connections.reduce((acc, connection) => {
if (connection instanceof GuestConnection) {
return acc + 1;
}
return acc;
}, 0);
}

const lastGuestCount = await this.getGuestCount();
if (guests !== lastGuestCount) {
await redis.set('http-api:guests', guests);
#recountGuests() {
const guests = this.guestCount;
if (guests !== this.#lastGuestCount) {
this.#lastGuestCount = guests;
this.broadcast('guests', guests);
}
}
Expand Down
27 changes: 6 additions & 21 deletions src/controllers/now.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getBoothData } from './booth.js';
import { serializeCurrentUser, serializePlaylist, serializeUser } from '../utils/serialize.js';
import { legacyPlaylistItem } from './playlists.js';
import { REDIS_ACTIVE_SESSIONS } from '../SocketServer.js';
import { KEY_ACTIVE_SESSIONS } from '../SocketServer.js';

/**
* @typedef {import('../schema.js').UserID} UserID
Expand All @@ -23,20 +23,11 @@ async function getFirstItem(uw, playlist) {
return null;
}

/**
* @param {unknown} str
*/
function toInt(str) {
if (typeof str !== 'string') return 0;
if (!/^\d+$/.test(str)) return 0;
return parseInt(str, 10);
}

/**
* @param {import('../Uwave.js').default} uw
*/
async function getOnlineUsers(uw) {
const userIDs = /** @type {UserID[]} */ (await uw.redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1));
const userIDs = /** @type {UserID[] | null} */ (await uw.keyv.get(KEY_ACTIVE_SESSIONS)) ?? [];
if (userIDs.length === 0) {
return [];
}
Expand All @@ -45,14 +36,6 @@ async function getOnlineUsers(uw) {
return users.map(serializeUser);
}

/**
* @param {import('../Uwave.js').default} uw
*/
async function getGuestsCount(uw) {
const guests = await uw.redis.get('http-api:guests');
return toInt(guests);
}

/**
* @type {import('../types.js').Controller}
*/
Expand All @@ -62,9 +45,11 @@ async function getState(req) {
const { passport } = uw;
const { user, sessionID } = req;

// XXX: with sqlite there isn't really a point in making this all "parallel",
// but maybe it makes sense to keep so it'd reduce network waiting times with
// other databases in the future?
const motd = uw.motd.get();
const users = getOnlineUsers(uw);
const guests = getGuestsCount(uw);
const roles = uw.acl.getAllRoles();
const booth = getBoothData(uw);
const waitlist = uw.waitlist.getUserIDs();
Expand Down Expand Up @@ -97,7 +82,7 @@ async function getState(req) {
motd,
user: user ? serializeCurrentUser(user) : null,
users,
guests,
guests: uw.socketServer.guestCount,
roles,
booth,
waitlist,
Expand Down
11 changes: 9 additions & 2 deletions src/controllers/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import toItemResponse from '../utils/toItemResponse.js';
import toListResponse from '../utils/toListResponse.js';
import toPaginatedResponse from '../utils/toPaginatedResponse.js';
import { muteUser, unmuteUser } from './chat.js';
import { REDIS_ACTIVE_SESSIONS } from '../SocketServer.js';
import { KEY_ACTIVE_SESSIONS } from '../SocketServer.js';

/**
* @typedef {import('../schema').UserID} UserID
Expand Down Expand Up @@ -207,7 +207,14 @@ async function disconnectUser(uw, userID) {
}
}

await uw.redis.lrem(REDIS_ACTIVE_SESSIONS, 0, userID);
const userIDs = new Set(
/** @type {import('../schema.js').UserID[] | null} */ (
await uw.keyv.get(KEY_ACTIVE_SESSIONS)
) ?? [],
);
userIDs.delete(userID);

await uw.keyv.set(KEY_ACTIVE_SESSIONS, Array.from(userIDs));

uw.publish('user:leave', { userID });
}
Expand Down
10 changes: 7 additions & 3 deletions src/middleware/requireActiveConnection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import httpErrors from 'http-errors';
import wrapMiddleware from '../utils/wrapMiddleware.js';
import { REDIS_ACTIVE_SESSIONS } from '../SocketServer.js';
import { KEY_ACTIVE_SESSIONS } from '../SocketServer.js';

const { BadRequest } = httpErrors;

Expand All @@ -10,8 +10,12 @@ function requireActiveConnection() {
* @param {import('../schema.js').User} user
*/
async function isConnected(uwave, user) {
const onlineIDs = await uwave.redis.lrange(REDIS_ACTIVE_SESSIONS, 0, -1);
return onlineIDs.indexOf(user.id) !== -1;
const onlineIDs = new Set(
/** @type {import('../schema.js').UserID[] | null} */ (
await uwave.keyv.get(KEY_ACTIVE_SESSIONS)
) ?? [],
);
return onlineIDs.has(user.id);
}

return wrapMiddleware(async (req) => {
Expand Down
4 changes: 1 addition & 3 deletions src/sockets/LostConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ class LostConnection extends EventEmitter {
// we can ensure that everyone still gets the full `timeout` duration to
// reconnect after a server restart, while also not filling up Redis with
// session IDs that left and will never return.
this.#uw.redis.multi()
.set(this.#key, lastEventID, 'EX', seconds * 10)
.exec();
this.#uw.redis.set(this.#key, lastEventID, 'EX', seconds * 10);
}

/**
Expand Down