From a79ee47a7926b1e40f1b9c1d3e4dbad4210ffe7a Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Wed, 25 Mar 2026 12:11:43 -0400 Subject: [PATCH 1/4] Fix bridging for recreated DM rooms Fix incoming messages in DM rooms created after having left an older DM room with the same XMPP user. Also make fixes to & improve typing of PgDatastore. --- src/MatrixEventHandler.ts | 34 +++- src/Program.ts | 61 ++++++- src/store/Types.ts | 10 +- src/store/postgres/PgDatastore.ts | 254 ++++++++++++++++-------------- 4 files changed, 231 insertions(+), 128 deletions(-) diff --git a/src/MatrixEventHandler.ts b/src/MatrixEventHandler.ts index 2369f907..8dd4da33 100644 --- a/src/MatrixEventHandler.ts +++ b/src/MatrixEventHandler.ts @@ -216,13 +216,33 @@ export class MatrixEventHandler { return; } - if (membershipEvent && roomType === MROOM_TYPE_GROUP) { - if (this.bridge.getBot().isRemoteUser(event.sender)) { - return; // Don't really care about remote users - } - if (["join", "leave"].includes(event.content.membership as string)) { - await this.handleJoinLeaveGroup(ctx, membershipEvent); - return; + if (membershipEvent) { + switch (roomType) { + case MROOM_TYPE_GROUP: + if (bridgeBot.isRemoteUser(event.sender)) { + return; // Don't really care about remote users + } + if (["join", "leave", "ban"].includes(membershipEvent.content.membership)) { + await this.handleJoinLeaveGroup(ctx, membershipEvent); + return; + } + break; + case MROOM_TYPE_IM: + if (membershipEvent.content.membership === "leave" && membershipEvent.sender === ctx.remote.get("matrixUser")) { + await this.store.removeRoomByRoomId(membershipEvent.room_id); + const protocol = this.purple.getProtocol(roomProtocol); + if (protocol) { + await this.bridge.getIntent( + protocol.getMxIdForProtocol( + ctx.remote.get("recipient"), + this.config.bridge.domain, + this.config.bridge.userPrefix, + ).getId() + ).leave(membershipEvent.room_id); + } + log.info(`Left and removed entry for IM room ${membershipEvent.room_id} because the user left`); + return; + } } } diff --git a/src/Program.ts b/src/Program.ts index 064dca75..322de1aa 100644 --- a/src/Program.ts +++ b/src/Program.ts @@ -14,7 +14,7 @@ import { XmppJsInstance } from "./xmppjs/XJSInstance"; import { Metrics } from "./Metrics"; import { AutoRegistration } from "./AutoRegistration"; import { GatewayHandler } from "./GatewayHandler"; -import { IRemoteUserAdminData, MROOM_TYPE_UADMIN } from "./store/Types"; +import { IRemoteUserAdminData, MROOM_TYPE_IM, MROOM_TYPE_UADMIN } from "./store/Types"; import * as fs from "fs"; import { webcrypto } from "node:crypto"; @@ -182,6 +182,64 @@ class Program { } } + private async dropStaleIMRooms(): Promise { + const rooms = await this.store.getRoomsOfType(MROOM_TYPE_IM); + log.info(`Got ${rooms.length} IM rooms`); + // Errors here aren't fatal, so use Promise.allSettled instead of Promise.all + await Promise.allSettled(rooms.map(async (room) => { + if (!room.matrix) { + log.warn(`Not checking IM room because it has no matrix component`); + return; + } + const roomId = room.matrix.getId(); + if (!room.remote) { + log.warn(`Not checking IM room ${roomId} because it has no remote links`); + return; + } + const recipient = room.remote.get("recipient"); + if (!recipient) { + log.warn(`Dropping IM room ${roomId} because it has no recipient`); + await this.store.removeRoomByRoomId(roomId); + return; + } + const protocol = this.purple.getProtocol(room.remote.get("protocol_id")); + if (!protocol) { + log.warn(`Dropping IM room ${roomId} because it has no valid protocol`); + await this.store.removeRoomByRoomId(roomId); + return; + } + const remoteIntent = this.bridge.getIntent( + protocol.getMxIdForProtocol( + recipient, + this.config.bridge.domain, + this.config.bridge.userPrefix, + ).getId() + ); + const matrixUser = room.remote.get("matrixUser"); + if (!matrixUser) { + log.warn(`Dropping and leaving IM room ${roomId} because it has no matrix user`); + await this.store.removeRoomByRoomId(roomId); + await remoteIntent.leave(roomId); + return; + } + let content: Record; + try { + content = await remoteIntent.matrixClient.getRoomStateEventContent(roomId, "m.room.member", matrixUser); + } catch (ex) { + log.warn(`Dropping and leaving IM room ${roomId} because we could not look up room state: ${ex}`); + await this.store.removeRoomByRoomId(roomId); + await remoteIntent.leave(roomId); + return; + } + if (content.membership === "leave") { + log.info(`Dropping and leaving IM room ${roomId} because its matrix user left`); + await this.store.removeRoomByRoomId(roomId); + await remoteIntent.leave(roomId); + return; + } + })); + } + private async runBridge(port: number, config: ConfigValue) { const checkOnly = process.env.BIFROST_CHECK_ONLY === "true"; this.cfg.ApplyConfig(config); @@ -349,6 +407,7 @@ class Program { log.info("Started appservice listener on port", port); await this.pingBridge(); await this.registerBot(); + await this.dropStaleIMRooms(); log.info("Bridge has started."); try { await purple.start(); diff --git a/src/store/Types.ts b/src/store/Types.ts index 0e0764a1..3a86d6f4 100644 --- a/src/store/Types.ts +++ b/src/store/Types.ts @@ -8,8 +8,8 @@ export const MROOM_TYPE_GROUP = "group"; export const MUSER_TYPE_ACCOUNT = "account"; export const MUSER_TYPE_GHOST = "ghost"; -export type MROOM_TYPES = "user-admin"|"im"|"group"; -export type MUSER_TYPES = "account"|"ghost"; +export type MROOM_TYPES = typeof MROOM_TYPE_UADMIN | typeof MROOM_TYPE_IM | typeof MROOM_TYPE_GROUP; +export type MUSER_TYPES = typeof MUSER_TYPE_ACCOUNT | typeof MUSER_TYPE_GHOST; export interface IRemoteRoomData { protocol_id?: string; @@ -31,6 +31,12 @@ export interface IRemoteUserAdminData extends IRemoteRoomData { matrixUser?: string; } +export interface RoomTypeToRemoteRoomData { + [MROOM_TYPE_IM]: IRemoteImData; + [MROOM_TYPE_GROUP]: IRemoteGroupData; + [MROOM_TYPE_UADMIN]: IRemoteUserAdminData; +}; + export interface IMatrixUserData { accounts: {[key: string]: IRemoteUserAccount}; } diff --git a/src/store/postgres/PgDatastore.ts b/src/store/postgres/PgDatastore.ts index e249c299..89440f1d 100644 --- a/src/store/postgres/PgDatastore.ts +++ b/src/store/postgres/PgDatastore.ts @@ -16,10 +16,10 @@ limitations under the License. import { Pool } from "pg"; import { MatrixRoom, RemoteRoom, MatrixUser, Logger, RoomBridgeStoreEntry } from "matrix-appservice-bridge"; -import { IRemoteRoomData, IRemoteGroupData, MROOM_TYPES, - IRemoteImData, IRemoteUserAdminData, MROOM_TYPE_IM } from "../Types"; +import { IRemoteGroupData, MROOM_TYPES, RoomTypeToRemoteRoomData, + IRemoteImData, IRemoteUserAdminData, MROOM_TYPE_IM, MROOM_TYPE_GROUP, MROOM_TYPE_UADMIN } from "../Types"; import { BifrostProtocol } from "../../bifrost/Protocol"; -import { IAccountMinimal } from "../../bifrost/Events"; +import { IAccountMinimal, IChatJoinProperties } from "../../bifrost/Events"; import { BifrostRemoteUser } from "../BifrostRemoteUser"; import { IConfigDatastore } from "../../Config"; import { IStore } from "../Store"; @@ -32,18 +32,98 @@ export interface PgDataStoreOpts { max: number; } -const RoomTypeToTable = { - "im": "im_rooms", - "group": "group_rooms", - "user-admin": "admin_rooms", +const ROOM_TABLE_IM = "im_rooms"; +const ROOM_TABLE_GROUP = "group_rooms"; +const ROOM_TABLE_UADMIN = "admin_rooms"; +type ROOM_TABLES = typeof ROOM_TABLE_IM | typeof ROOM_TABLE_GROUP | typeof ROOM_TABLE_UADMIN; + +const RoomTypeToTable: Record = { + [MROOM_TYPE_IM]: ROOM_TABLE_IM, + [MROOM_TYPE_GROUP]: ROOM_TABLE_GROUP, + [MROOM_TYPE_UADMIN]: ROOM_TABLE_UADMIN, }; -const TableToRoomType = { - im_rooms: "im", - group_rooms: "group", - admin_rooms: "user-admin", +const TableToRoomType: Record = { + [ROOM_TABLE_IM]: MROOM_TYPE_IM, + [ROOM_TABLE_GROUP]: MROOM_TYPE_GROUP, + [ROOM_TABLE_UADMIN]: MROOM_TYPE_UADMIN, }; +type Json = string | number | boolean | null | Json[] | { [name: string]: Json }; + +interface RoomRow { + room_id: string; +} + +interface ImRoomRow extends RoomRow { + user_id: string; + remote_id: string; + protocol_id: string; +} + +interface GroupRoomRow extends RoomRow { + protocol_id?: string; + room_name?: string; + gateway?: boolean; + properties?: Json; +} + +interface AdminRoomRow extends RoomRow { + user_id?: string; +} + +interface RoomTypeToRow { + [MROOM_TYPE_IM]: ImRoomRow, + [MROOM_TYPE_GROUP]: GroupRoomRow, + [MROOM_TYPE_UADMIN]: AdminRoomRow, +} + +function rowToRemoteImData(row: ImRoomRow): IRemoteImData { + return { + matrixUser: new MatrixUser(row.user_id).userId, + recipient: row.remote_id, + protocol_id: row.protocol_id, + }; +} + +function rowToRemoteGroupData(row: GroupRoomRow): IRemoteGroupData { + return { + gateway: row.gateway, + room_name: row.room_name, + protocol_id: row.protocol_id, + properties: row.properties as IChatJoinProperties, + }; +} + +function rowToRemoteUserAdminData(row: AdminRoomRow): IRemoteUserAdminData { + return row.user_id ? { + matrixUser: new MatrixUser(row.user_id).userId, + } : {}; +} + +const RoomTypeToDataFunc: {[T in MROOM_TYPES]: (row: RoomTypeToRow[T]) => RoomTypeToRemoteRoomData[T]} = { + [MROOM_TYPE_IM]: rowToRemoteImData, + [MROOM_TYPE_GROUP]: rowToRemoteGroupData, + [MROOM_TYPE_UADMIN]: rowToRemoteUserAdminData, +} + +function rowToRoomBridgeStoreEntry(type: T, row: RoomTypeToRow[T]): RoomBridgeStoreEntry { + return dataToRoomBridgeStoreEntry(row.room_id, type, RoomTypeToDataFunc[type](row)); +} + +function rowToGroupRoomBridgeStoreEntry(row: GroupRoomRow): RoomBridgeStoreEntry { + return dataToRoomBridgeStoreEntry(row.room_id, MROOM_TYPE_GROUP, rowToRemoteGroupData(row)); +} + +function dataToRoomBridgeStoreEntry(roomId: string, type: T, data: RoomTypeToRemoteRoomData[T], remoteId = ""): RoomBridgeStoreEntry { + return { + matrix: new MatrixRoom(roomId, { extras: { type } }), + // Id is not always used. + remote: new RemoteRoom(remoteId, data as Record), + data: {}, + }; +} + export class PgDataStore implements IStore { public static LATEST_SCHEMA = 2; @@ -190,69 +270,49 @@ export class PgDataStore implements IStore { if (i === 0) { throw Error("No remoteData to compare with"); } - const statement = `SELECT * FROM group_rooms WHERE ${parts.join(" AND ")};`; - const res = await this.pgPool.query( + const statement = `SELECT * FROM ${ROOM_TABLE_GROUP} WHERE ${parts.join(" AND ")}`; + const res = await this.pgPool.query( statement, Object.values(remoteData), ); if (res.rowCount === 0) { return null; } - const row = res.rows[0]; - const remoteGroupData: IRemoteGroupData = { - gateway: row.gateway, - properties: row.properties, - room_name: row.room_name, - }; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type: "group" } }), - // Id is not used. - remote: new RemoteRoom("", remoteGroupData as Record), - data: {} - }; + return rowToGroupRoomBridgeStoreEntry(res.rows[0]); } public async getAdminRoom(matrixUserId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id FROM admin_rooms WHERE user_id = $1", + const res = await this.pgPool.query( + `SELECT room_id FROM ${ROOM_TABLE_UADMIN} WHERE user_id = $1`, [ matrixUserId ], ); return res.rows[0]?.room_id || null; } public async getIMRoom(matrixUserId: string, protocolId: string, remoteUserId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id FROM im_rooms WHERE user_id = $1 AND remote_id = $2 AND protocol_id = $3", + const res = await this.pgPool.query( + `SELECT room_id FROM ${ROOM_TABLE_IM} WHERE user_id = $1 AND remote_id = $2 AND protocol_id = $3`, [ matrixUserId, remoteUserId, protocolId ], ); if (res.rowCount === 0) { return null; } - const row = res.rows[0]; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type: MROOM_TYPE_IM } }), - remote: new RemoteRoom("", { - matrixUser: matrixUserId, - protocol_id: protocolId, - recipient: remoteUserId, - }), - data: {} - }; + return dataToRoomBridgeStoreEntry(res.rows[0].room_id, MROOM_TYPE_IM, { + matrixUser: matrixUserId, + protocol_id: protocolId, + recipient: remoteUserId, + }); } public async getAllIMRoomsForAccount(matrixUserId: string, protocolId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id, remote_id FROM im_rooms WHERE user_id = $1 AND protocol_id = $2", + const res = await this.pgPool.query>( + `SELECT room_id, remote_id FROM ${ROOM_TABLE_IM} WHERE user_id = $1 AND protocol_id = $2`, [ matrixUserId, protocolId ], ); - return res.rows.map(row => ({ - matrix: new MatrixRoom(row.room_id, { extras: { type: MROOM_TYPE_IM } }), - remote: new RemoteRoom("", { - matrixUser: matrixUserId, - protocol_id: protocolId, - recipient: row.remote_id, - }), - data: {} + return res.rows.map(row => dataToRoomBridgeStoreEntry(row.room_id, MROOM_TYPE_IM, { + matrixUser: matrixUserId, + protocol_id: protocolId, + recipient: row.remote_id, })); } @@ -270,21 +330,8 @@ export class PgDataStore implements IStore { public async getRoomsOfType(type: MROOM_TYPES): Promise { const tableName = RoomTypeToTable[type]; - const res = await this.pgPool.query(`SELECT * FROM ${tableName};`); - return res.rows.map((row) => { - const remoteData: IRemoteGroupData = { - gateway: row.gateway, - properties: row.properties, - room_name: row.room_name, - protocol_id: row.protocol_id, - }; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type } }), - // Id is not used. - remote: new RemoteRoom("", remoteData as Record), - data: {} - }; - }); + const res = await this.pgPool.query(`SELECT * FROM ${tableName}`); + return res.rows.map((row) => rowToRoomBridgeStoreEntry(type, row)); } public async storeAccount(userId: string, protocol: BifrostProtocol, username: string, extraData?: any) { @@ -335,7 +382,7 @@ export class PgDataStore implements IStore { public async getRoomEntryByMatrixId(roomId: string): Promise { log.debug("Getting room", roomId); - const typeRes = await this.pgPool.query( + const typeRes = await this.pgPool.query<{table_name: ROOM_TABLES}>( "SELECT tableoid::regclass as table_name FROM rooms WHERE room_id = $1 LIMIT 1", [ roomId ], ); @@ -345,89 +392,60 @@ export class PgDataStore implements IStore { } const tableName = typeRes.rows[0].table_name; const type = TableToRoomType[tableName]; - const res = await this.pgPool.query( + if (!type) { + throw new Error("Room was of unknown type!"); + } + const res = await this.pgPool.query( `SELECT * FROM ${tableName} WHERE room_id = $1 LIMIT 1`, [ roomId ], ); if (!res.rowCount) { throw Error("Missing data for room that we did manage to select!"); } - const row = res.rows[0]; - let remoteData: IRemoteRoomData; - if (type === "group") { - remoteData = { - gateway: row.gateway, - room_name: row.room_name, - protocol_id: row.protocol_id, - properties: row.properties, - } as IRemoteGroupData; - } else if (type === "im") { - remoteData = { - matrixUser: new MatrixUser(row.user_id).userId, - recipient: row.remote_id, - protocol_id: row.protocol_id, - } as IRemoteImData; - } else if (type === "user-admin") { - remoteData = { - matrixUser: new MatrixUser(row.user_id).userId, - } as IRemoteUserAdminData; - } else { - throw Error("Room was of unknown type!"); - } - log.debug("Found room ", JSON.stringify(remoteData)); - return { - remote: new RemoteRoom("", remoteData as Record), - matrix: new MatrixRoom(roomId, { extras: { type }}), - data: {} - }; + const entry = rowToRoomBridgeStoreEntry(type, res.rows[0]); + log.debug("Found room ", JSON.stringify(entry.remote)); + return entry; } - public async storeRoom(matrixId: string, type: MROOM_TYPES, remoteId: string, remoteData: IRemoteRoomData) + public async storeRoom(matrixId: string, type: T, remoteId: string, remoteData: RoomTypeToRemoteRoomData[T]) : Promise { log.debug("Storing room", matrixId); let statement: string; - const res = { - remote: new RemoteRoom(remoteId, remoteData as Record), - matrix: new MatrixRoom(matrixId, { extras: { type } }), - data: {} - }; + const res = dataToRoomBridgeStoreEntry(matrixId, type, remoteData, remoteId); - if (type === "user-admin") { - const adminProps = { + if (type === MROOM_TYPE_UADMIN) { + const adminProps: AdminRoomRow = { room_id: matrixId, user_id: (remoteData as IRemoteUserAdminData).matrixUser, }; // We don't upsert here. - await this.pgPool.query("INSERT INTO admin_rooms (room_id, user_id) VALUES ($1, $2)", Object.values(adminProps)); + await this.pgPool.query(`INSERT INTO ${ROOM_TABLE_UADMIN} (room_id, user_id) VALUES ($1, $2)`, Object.values(adminProps)); return res; } - if (type === "im") { - const imData = (remoteData as IRemoteImData); - const imProps = { + if (type === MROOM_TYPE_IM) { + const imData = remoteData as IRemoteImData; + const imProps: ImRoomRow = { room_id: matrixId, user_id: imData.matrixUser, remote_id: imData.recipient, protocol_id: imData.protocol_id, }; - statement = PgDataStore.BuildUpsertStatement("im_rooms", "(room_id)", Object.keys(imProps)); + statement = PgDataStore.BuildUpsertStatement(ROOM_TABLE_IM, "(room_id)", Object.keys(imProps)); await this.pgPool.query(statement, Object.values(imProps)); return res; } - const props = { + const groupData = remoteData as IRemoteGroupData; + const props: GroupRoomRow = { room_id: matrixId, - protocol_id: remoteData.protocol_id || "", - room_name: "", - gateway: false, - properties: "{}", + protocol_id: remoteData.protocol_id ?? "", + room_name: groupData.room_name ?? "", + gateway: groupData.gateway ?? false, + properties: JSON.stringify(groupData.properties), }; - const groupData = remoteData as IRemoteGroupData; - props.gateway = groupData.gateway || false; - props.room_name = groupData.room_name || ""; - props.properties = JSON.stringify(groupData.properties); statement = PgDataStore.BuildUpsertStatement( - "group_rooms", "(room_id)", Object.keys(props), + ROOM_TABLE_GROUP, "(room_id)", Object.keys(props), ); await this.pgPool.query(statement, Object.values(props)); log.debug("Stored room", matrixId); @@ -494,7 +512,7 @@ export class PgDataStore implements IStore { private async updateSchemaVersion(version: number) { log.debug(`updateSchemaVersion: ${version}`); - await this.pgPool.query("UPDATE schema SET version = $1;", [version]); + await this.pgPool.query("UPDATE schema SET version = $1", [version]); } private async getSchemaVersion(): Promise { From 17b9d9a248017c3bd26321edd8943e09df3105c6 Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Wed, 25 Mar 2026 13:27:30 -0400 Subject: [PATCH 2/4] Add changelog --- changelog.d/373.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/373.bugfix diff --git a/changelog.d/373.bugfix b/changelog.d/373.bugfix new file mode 100644 index 00000000..f2b101c1 --- /dev/null +++ b/changelog.d/373.bugfix @@ -0,0 +1 @@ +Fix incoming messages in DM rooms created after having left an older DM room with the same XMPP user. From 488fd7a9709acf65dc71d7e43b3a28f4d8fa1089 Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Mon, 30 Mar 2026 09:24:44 -0400 Subject: [PATCH 3/4] Rename changelog file --- changelog.d/{373.bugfix => 376.bugfix} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{373.bugfix => 376.bugfix} (100%) diff --git a/changelog.d/373.bugfix b/changelog.d/376.bugfix similarity index 100% rename from changelog.d/373.bugfix rename to changelog.d/376.bugfix From e8fbcb6033921c185be4eb3091cc13dd6fcfb9e0 Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Tue, 31 Mar 2026 16:02:43 -0400 Subject: [PATCH 4/4] Check IMs asynchronously; examine Matrix errors Don't block startup until all IM rooms are scanned for staleness. Instead, kick off a scan to be done asynchronously, and only block incoming IMs on a per-chat basis, until the scan on its associated Matrix room has finished. Also don't consider an IM room as stale if an attempt to look up membership state fails with anything other than a 403 or 404 response, as those are the only responses that definitively indicate something wrong with membership in the room. --- src/MatrixRoomHandler.ts | 116 +++++++++++++++++++++++++++++++++++++-- src/Program.ts | 62 +-------------------- 2 files changed, 114 insertions(+), 64 deletions(-) diff --git a/src/MatrixRoomHandler.ts b/src/MatrixRoomHandler.ts index c9c7bb7e..f097e073 100644 --- a/src/MatrixRoomHandler.ts +++ b/src/MatrixRoomHandler.ts @@ -34,6 +34,7 @@ export class MatrixRoomHandler { private readonly accountRoomLock = new Set(); private readonly remoteEventIdMapping = new Map(); // remote_id -> event_id private readonly roomCreationLock = new Map>(); + private readonly staleIMRoomLock = new Map>(); constructor( private readonly purple: IBifrostInstance, private readonly profileSync: ProfileSync, @@ -101,6 +102,105 @@ export class MatrixRoomHandler { purple.on("read-receipt", handleAsyncEvent(this.handleReadReceipt.bind(this))); } + /** + * Begin scanning for stale IM rooms, which will be removed asynchronously. + * + * @returns a Promise that resolves after having retrieved all IM rooms from the store + * and kicking off an asynchronous staleness scan for each of them. + */ + public async startStaleIMRoomScan(): Promise { + const rooms = await this.store.getRoomsOfType(MROOM_TYPE_IM); + log.info(`Got ${rooms.length} IM rooms`); + for (const room of rooms) { + if (!room.matrix) { + log.warn( + `Not checking IM room for remote recipient ${room.remote?.get("recipient") || ""} because it has no matrix component`, + ); + continue; + } + const waiter = this.dropStaleIMRoom(room); + const roomId = room.matrix.getId(); + this.staleIMRoomLock.set(roomId, waiter); + waiter.finally(() => this.staleIMRoomLock.delete(roomId)); + } + } + + /** + * Removes the Matrix room for a bridged IM chat from the store + * if it is determined to be stale by {@link isIMRoomStale}. + * + * @param room The bridged IM chat to check. + * @returns whether the chat's Matrix room was deemed as stale & was removed. + */ + private async dropStaleIMRoom(room: RoomBridgeStoreEntry): Promise { + const [roomId, remoteIntent] = await this.isIMRoomStale(room); + if (roomId) { + await this.store.removeRoomByRoomId(roomId); + if (remoteIntent) { + // May be done concurrently + void remoteIntent.leave(roomId); + } + return true; + } else { + return false; + } + } + + /** + * Checks whether a Matrix room for a bridged IM chat is no longer usable. + * + * @param room The bridged IM chat to check. + * @returns A two-element tuple: + * 1. The ID of the chat's Matrix room if it is stale, or an empty string otherwise. + * 2. The {@link Intent} of the room's remote user, if one was found. + */ + private async isIMRoomStale(room: RoomBridgeStoreEntry): Promise<[string, Intent | null]> { + const roomId = room.matrix.getId(); + const recipient = room.remote.get("recipient"); + if (!recipient) { + log.warn(`IM room ${roomId} is stale because it has no recipient`); + return [roomId, null]; + } + const protocol = this.purple.getProtocol(room.remote.get("protocol_id")); + if (!protocol) { + log.warn(`IM room ${roomId} is stale because it has no valid protocol`); + return [roomId, null]; + } + const remoteIntent = this.bridge.getIntent( + protocol.getMxIdForProtocol( + recipient, + this.config.bridge.domain, + this.config.bridge.userPrefix, + ).getId() + ); + const matrixUser = room.remote.get("matrixUser"); + if (!matrixUser) { + log.warn(`IM room ${roomId} is stale because it has no matrix user`); + return [roomId, remoteIntent]; + } + let content: Record; + try { + content = await remoteIntent.matrixClient.getRoomStateEventContent(roomId, "m.room.member", matrixUser); + } catch (ex) { + switch (ex.statusCode) { + case 403: + log.warn(`IM room ${roomId} is stale because remote intent isn't a room member and never was`); + return [roomId, remoteIntent]; + case 404: + log.warn(`IM room ${roomId} is stale because its matrix user has no membership`); + return [roomId, remoteIntent]; + default: + log.error(`IM room ${roomId} staleness unknown, failed to look up room state:`, ex); + return ["", null]; + } + } + if (content.membership === "leave") { + log.info(`IM room ${roomId} is stale because its matrix user left`); + return [roomId, remoteIntent]; + } + return ["", null]; + } + public async onChatJoined(ev: IConversationEvent) { if (this.purple.needsDedupe()) { this.deduplicator.incrementRoomUsers(ev.conv.name); @@ -127,11 +227,19 @@ export class MatrixRoomHandler { await (this.roomCreationLock.get(remoteId) || Promise.resolve()); log.info("room was created, no longer waiting"); } - const remoteEntries = await this.store.getIMRoom(matrixUser.getId(), data.account.protocol_id, data.sender); - if (remoteEntries != null && remoteEntries.matrix) { - return remoteEntries.matrix.getId(); + const remoteEntry = await this.store.getIMRoom(matrixUser.getId(), data.account.protocol_id, data.sender); + if (!remoteEntry?.matrix) { + return null; + } + const roomId = remoteEntry.matrix.getId(); + const staleIMRoomWaiter = this.staleIMRoomLock.get(roomId); + if (staleIMRoomWaiter) { + const isStale = await staleIMRoomWaiter; + if (isStale) { + return null; + } } - return null; + return roomId; } private async createOrGetIMRoom(data: IReceivedImMsg, matrixUser: MatrixUser, intent: Intent): Promise { diff --git a/src/Program.ts b/src/Program.ts index 322de1aa..0642ff4d 100644 --- a/src/Program.ts +++ b/src/Program.ts @@ -14,7 +14,7 @@ import { XmppJsInstance } from "./xmppjs/XJSInstance"; import { Metrics } from "./Metrics"; import { AutoRegistration } from "./AutoRegistration"; import { GatewayHandler } from "./GatewayHandler"; -import { IRemoteUserAdminData, MROOM_TYPE_IM, MROOM_TYPE_UADMIN } from "./store/Types"; +import { IRemoteUserAdminData, MROOM_TYPE_UADMIN } from "./store/Types"; import * as fs from "fs"; import { webcrypto } from "node:crypto"; @@ -182,64 +182,6 @@ class Program { } } - private async dropStaleIMRooms(): Promise { - const rooms = await this.store.getRoomsOfType(MROOM_TYPE_IM); - log.info(`Got ${rooms.length} IM rooms`); - // Errors here aren't fatal, so use Promise.allSettled instead of Promise.all - await Promise.allSettled(rooms.map(async (room) => { - if (!room.matrix) { - log.warn(`Not checking IM room because it has no matrix component`); - return; - } - const roomId = room.matrix.getId(); - if (!room.remote) { - log.warn(`Not checking IM room ${roomId} because it has no remote links`); - return; - } - const recipient = room.remote.get("recipient"); - if (!recipient) { - log.warn(`Dropping IM room ${roomId} because it has no recipient`); - await this.store.removeRoomByRoomId(roomId); - return; - } - const protocol = this.purple.getProtocol(room.remote.get("protocol_id")); - if (!protocol) { - log.warn(`Dropping IM room ${roomId} because it has no valid protocol`); - await this.store.removeRoomByRoomId(roomId); - return; - } - const remoteIntent = this.bridge.getIntent( - protocol.getMxIdForProtocol( - recipient, - this.config.bridge.domain, - this.config.bridge.userPrefix, - ).getId() - ); - const matrixUser = room.remote.get("matrixUser"); - if (!matrixUser) { - log.warn(`Dropping and leaving IM room ${roomId} because it has no matrix user`); - await this.store.removeRoomByRoomId(roomId); - await remoteIntent.leave(roomId); - return; - } - let content: Record; - try { - content = await remoteIntent.matrixClient.getRoomStateEventContent(roomId, "m.room.member", matrixUser); - } catch (ex) { - log.warn(`Dropping and leaving IM room ${roomId} because we could not look up room state: ${ex}`); - await this.store.removeRoomByRoomId(roomId); - await remoteIntent.leave(roomId); - return; - } - if (content.membership === "leave") { - log.info(`Dropping and leaving IM room ${roomId} because its matrix user left`); - await this.store.removeRoomByRoomId(roomId); - await remoteIntent.leave(roomId); - return; - } - })); - } - private async runBridge(port: number, config: ConfigValue) { const checkOnly = process.env.BIFROST_CHECK_ONLY === "true"; this.cfg.ApplyConfig(config); @@ -407,7 +349,7 @@ class Program { log.info("Started appservice listener on port", port); await this.pingBridge(); await this.registerBot(); - await this.dropStaleIMRooms(); + await this.roomHandler.startStaleIMRoomScan(); log.info("Bridge has started."); try { await purple.start();