diff --git a/changelog.d/376.bugfix b/changelog.d/376.bugfix new file mode 100644 index 00000000..f2b101c1 --- /dev/null +++ b/changelog.d/376.bugfix @@ -0,0 +1 @@ +Fix incoming messages in DM rooms created after having left an older DM room with the same XMPP user. diff --git a/src/MatrixEventHandler.ts b/src/MatrixEventHandler.ts index 2369f907..8dd4da33 100644 --- a/src/MatrixEventHandler.ts +++ b/src/MatrixEventHandler.ts @@ -216,13 +216,33 @@ export class MatrixEventHandler { return; } - if (membershipEvent && roomType === MROOM_TYPE_GROUP) { - if (this.bridge.getBot().isRemoteUser(event.sender)) { - return; // Don't really care about remote users - } - if (["join", "leave"].includes(event.content.membership as string)) { - await this.handleJoinLeaveGroup(ctx, membershipEvent); - return; + if (membershipEvent) { + switch (roomType) { + case MROOM_TYPE_GROUP: + if (bridgeBot.isRemoteUser(event.sender)) { + return; // Don't really care about remote users + } + if (["join", "leave", "ban"].includes(membershipEvent.content.membership)) { + await this.handleJoinLeaveGroup(ctx, membershipEvent); + return; + } + break; + case MROOM_TYPE_IM: + if (membershipEvent.content.membership === "leave" && membershipEvent.sender === ctx.remote.get("matrixUser")) { + await this.store.removeRoomByRoomId(membershipEvent.room_id); + const protocol = this.purple.getProtocol(roomProtocol); + if (protocol) { + await this.bridge.getIntent( + protocol.getMxIdForProtocol( + ctx.remote.get("recipient"), + this.config.bridge.domain, + this.config.bridge.userPrefix, + ).getId() + ).leave(membershipEvent.room_id); + } + log.info(`Left and removed entry for IM room ${membershipEvent.room_id} because the user left`); + return; + } } } diff --git a/src/MatrixRoomHandler.ts b/src/MatrixRoomHandler.ts index c9c7bb7e..f097e073 100644 --- a/src/MatrixRoomHandler.ts +++ b/src/MatrixRoomHandler.ts @@ -34,6 +34,7 @@ export class MatrixRoomHandler { private readonly accountRoomLock = new Set(); private readonly remoteEventIdMapping = new Map(); // remote_id -> event_id private readonly roomCreationLock = new Map>(); + private readonly staleIMRoomLock = new Map>(); constructor( private readonly purple: IBifrostInstance, private readonly profileSync: ProfileSync, @@ -101,6 +102,105 @@ export class MatrixRoomHandler { purple.on("read-receipt", handleAsyncEvent(this.handleReadReceipt.bind(this))); } + /** + * Begin scanning for stale IM rooms, which will be removed asynchronously. + * + * @returns a Promise that resolves after having retrieved all IM rooms from the store + * and kicking off an asynchronous staleness scan for each of them. + */ + public async startStaleIMRoomScan(): Promise { + const rooms = await this.store.getRoomsOfType(MROOM_TYPE_IM); + log.info(`Got ${rooms.length} IM rooms`); + for (const room of rooms) { + if (!room.matrix) { + log.warn( + `Not checking IM room for remote recipient ${room.remote?.get("recipient") || ""} because it has no matrix component`, + ); + continue; + } + const waiter = this.dropStaleIMRoom(room); + const roomId = room.matrix.getId(); + this.staleIMRoomLock.set(roomId, waiter); + waiter.finally(() => this.staleIMRoomLock.delete(roomId)); + } + } + + /** + * Removes the Matrix room for a bridged IM chat from the store + * if it is determined to be stale by {@link isIMRoomStale}. + * + * @param room The bridged IM chat to check. + * @returns whether the chat's Matrix room was deemed as stale & was removed. + */ + private async dropStaleIMRoom(room: RoomBridgeStoreEntry): Promise { + const [roomId, remoteIntent] = await this.isIMRoomStale(room); + if (roomId) { + await this.store.removeRoomByRoomId(roomId); + if (remoteIntent) { + // May be done concurrently + void remoteIntent.leave(roomId); + } + return true; + } else { + return false; + } + } + + /** + * Checks whether a Matrix room for a bridged IM chat is no longer usable. + * + * @param room The bridged IM chat to check. + * @returns A two-element tuple: + * 1. The ID of the chat's Matrix room if it is stale, or an empty string otherwise. + * 2. The {@link Intent} of the room's remote user, if one was found. + */ + private async isIMRoomStale(room: RoomBridgeStoreEntry): Promise<[string, Intent | null]> { + const roomId = room.matrix.getId(); + const recipient = room.remote.get("recipient"); + if (!recipient) { + log.warn(`IM room ${roomId} is stale because it has no recipient`); + return [roomId, null]; + } + const protocol = this.purple.getProtocol(room.remote.get("protocol_id")); + if (!protocol) { + log.warn(`IM room ${roomId} is stale because it has no valid protocol`); + return [roomId, null]; + } + const remoteIntent = this.bridge.getIntent( + protocol.getMxIdForProtocol( + recipient, + this.config.bridge.domain, + this.config.bridge.userPrefix, + ).getId() + ); + const matrixUser = room.remote.get("matrixUser"); + if (!matrixUser) { + log.warn(`IM room ${roomId} is stale because it has no matrix user`); + return [roomId, remoteIntent]; + } + let content: Record; + try { + content = await remoteIntent.matrixClient.getRoomStateEventContent(roomId, "m.room.member", matrixUser); + } catch (ex) { + switch (ex.statusCode) { + case 403: + log.warn(`IM room ${roomId} is stale because remote intent isn't a room member and never was`); + return [roomId, remoteIntent]; + case 404: + log.warn(`IM room ${roomId} is stale because its matrix user has no membership`); + return [roomId, remoteIntent]; + default: + log.error(`IM room ${roomId} staleness unknown, failed to look up room state:`, ex); + return ["", null]; + } + } + if (content.membership === "leave") { + log.info(`IM room ${roomId} is stale because its matrix user left`); + return [roomId, remoteIntent]; + } + return ["", null]; + } + public async onChatJoined(ev: IConversationEvent) { if (this.purple.needsDedupe()) { this.deduplicator.incrementRoomUsers(ev.conv.name); @@ -127,11 +227,19 @@ export class MatrixRoomHandler { await (this.roomCreationLock.get(remoteId) || Promise.resolve()); log.info("room was created, no longer waiting"); } - const remoteEntries = await this.store.getIMRoom(matrixUser.getId(), data.account.protocol_id, data.sender); - if (remoteEntries != null && remoteEntries.matrix) { - return remoteEntries.matrix.getId(); + const remoteEntry = await this.store.getIMRoom(matrixUser.getId(), data.account.protocol_id, data.sender); + if (!remoteEntry?.matrix) { + return null; + } + const roomId = remoteEntry.matrix.getId(); + const staleIMRoomWaiter = this.staleIMRoomLock.get(roomId); + if (staleIMRoomWaiter) { + const isStale = await staleIMRoomWaiter; + if (isStale) { + return null; + } } - return null; + return roomId; } private async createOrGetIMRoom(data: IReceivedImMsg, matrixUser: MatrixUser, intent: Intent): Promise { diff --git a/src/Program.ts b/src/Program.ts index 064dca75..0642ff4d 100644 --- a/src/Program.ts +++ b/src/Program.ts @@ -349,6 +349,7 @@ class Program { log.info("Started appservice listener on port", port); await this.pingBridge(); await this.registerBot(); + await this.roomHandler.startStaleIMRoomScan(); log.info("Bridge has started."); try { await purple.start(); diff --git a/src/store/Types.ts b/src/store/Types.ts index 0e0764a1..3a86d6f4 100644 --- a/src/store/Types.ts +++ b/src/store/Types.ts @@ -8,8 +8,8 @@ export const MROOM_TYPE_GROUP = "group"; export const MUSER_TYPE_ACCOUNT = "account"; export const MUSER_TYPE_GHOST = "ghost"; -export type MROOM_TYPES = "user-admin"|"im"|"group"; -export type MUSER_TYPES = "account"|"ghost"; +export type MROOM_TYPES = typeof MROOM_TYPE_UADMIN | typeof MROOM_TYPE_IM | typeof MROOM_TYPE_GROUP; +export type MUSER_TYPES = typeof MUSER_TYPE_ACCOUNT | typeof MUSER_TYPE_GHOST; export interface IRemoteRoomData { protocol_id?: string; @@ -31,6 +31,12 @@ export interface IRemoteUserAdminData extends IRemoteRoomData { matrixUser?: string; } +export interface RoomTypeToRemoteRoomData { + [MROOM_TYPE_IM]: IRemoteImData; + [MROOM_TYPE_GROUP]: IRemoteGroupData; + [MROOM_TYPE_UADMIN]: IRemoteUserAdminData; +}; + export interface IMatrixUserData { accounts: {[key: string]: IRemoteUserAccount}; } diff --git a/src/store/postgres/PgDatastore.ts b/src/store/postgres/PgDatastore.ts index e249c299..89440f1d 100644 --- a/src/store/postgres/PgDatastore.ts +++ b/src/store/postgres/PgDatastore.ts @@ -16,10 +16,10 @@ limitations under the License. import { Pool } from "pg"; import { MatrixRoom, RemoteRoom, MatrixUser, Logger, RoomBridgeStoreEntry } from "matrix-appservice-bridge"; -import { IRemoteRoomData, IRemoteGroupData, MROOM_TYPES, - IRemoteImData, IRemoteUserAdminData, MROOM_TYPE_IM } from "../Types"; +import { IRemoteGroupData, MROOM_TYPES, RoomTypeToRemoteRoomData, + IRemoteImData, IRemoteUserAdminData, MROOM_TYPE_IM, MROOM_TYPE_GROUP, MROOM_TYPE_UADMIN } from "../Types"; import { BifrostProtocol } from "../../bifrost/Protocol"; -import { IAccountMinimal } from "../../bifrost/Events"; +import { IAccountMinimal, IChatJoinProperties } from "../../bifrost/Events"; import { BifrostRemoteUser } from "../BifrostRemoteUser"; import { IConfigDatastore } from "../../Config"; import { IStore } from "../Store"; @@ -32,18 +32,98 @@ export interface PgDataStoreOpts { max: number; } -const RoomTypeToTable = { - "im": "im_rooms", - "group": "group_rooms", - "user-admin": "admin_rooms", +const ROOM_TABLE_IM = "im_rooms"; +const ROOM_TABLE_GROUP = "group_rooms"; +const ROOM_TABLE_UADMIN = "admin_rooms"; +type ROOM_TABLES = typeof ROOM_TABLE_IM | typeof ROOM_TABLE_GROUP | typeof ROOM_TABLE_UADMIN; + +const RoomTypeToTable: Record = { + [MROOM_TYPE_IM]: ROOM_TABLE_IM, + [MROOM_TYPE_GROUP]: ROOM_TABLE_GROUP, + [MROOM_TYPE_UADMIN]: ROOM_TABLE_UADMIN, }; -const TableToRoomType = { - im_rooms: "im", - group_rooms: "group", - admin_rooms: "user-admin", +const TableToRoomType: Record = { + [ROOM_TABLE_IM]: MROOM_TYPE_IM, + [ROOM_TABLE_GROUP]: MROOM_TYPE_GROUP, + [ROOM_TABLE_UADMIN]: MROOM_TYPE_UADMIN, }; +type Json = string | number | boolean | null | Json[] | { [name: string]: Json }; + +interface RoomRow { + room_id: string; +} + +interface ImRoomRow extends RoomRow { + user_id: string; + remote_id: string; + protocol_id: string; +} + +interface GroupRoomRow extends RoomRow { + protocol_id?: string; + room_name?: string; + gateway?: boolean; + properties?: Json; +} + +interface AdminRoomRow extends RoomRow { + user_id?: string; +} + +interface RoomTypeToRow { + [MROOM_TYPE_IM]: ImRoomRow, + [MROOM_TYPE_GROUP]: GroupRoomRow, + [MROOM_TYPE_UADMIN]: AdminRoomRow, +} + +function rowToRemoteImData(row: ImRoomRow): IRemoteImData { + return { + matrixUser: new MatrixUser(row.user_id).userId, + recipient: row.remote_id, + protocol_id: row.protocol_id, + }; +} + +function rowToRemoteGroupData(row: GroupRoomRow): IRemoteGroupData { + return { + gateway: row.gateway, + room_name: row.room_name, + protocol_id: row.protocol_id, + properties: row.properties as IChatJoinProperties, + }; +} + +function rowToRemoteUserAdminData(row: AdminRoomRow): IRemoteUserAdminData { + return row.user_id ? { + matrixUser: new MatrixUser(row.user_id).userId, + } : {}; +} + +const RoomTypeToDataFunc: {[T in MROOM_TYPES]: (row: RoomTypeToRow[T]) => RoomTypeToRemoteRoomData[T]} = { + [MROOM_TYPE_IM]: rowToRemoteImData, + [MROOM_TYPE_GROUP]: rowToRemoteGroupData, + [MROOM_TYPE_UADMIN]: rowToRemoteUserAdminData, +} + +function rowToRoomBridgeStoreEntry(type: T, row: RoomTypeToRow[T]): RoomBridgeStoreEntry { + return dataToRoomBridgeStoreEntry(row.room_id, type, RoomTypeToDataFunc[type](row)); +} + +function rowToGroupRoomBridgeStoreEntry(row: GroupRoomRow): RoomBridgeStoreEntry { + return dataToRoomBridgeStoreEntry(row.room_id, MROOM_TYPE_GROUP, rowToRemoteGroupData(row)); +} + +function dataToRoomBridgeStoreEntry(roomId: string, type: T, data: RoomTypeToRemoteRoomData[T], remoteId = ""): RoomBridgeStoreEntry { + return { + matrix: new MatrixRoom(roomId, { extras: { type } }), + // Id is not always used. + remote: new RemoteRoom(remoteId, data as Record), + data: {}, + }; +} + export class PgDataStore implements IStore { public static LATEST_SCHEMA = 2; @@ -190,69 +270,49 @@ export class PgDataStore implements IStore { if (i === 0) { throw Error("No remoteData to compare with"); } - const statement = `SELECT * FROM group_rooms WHERE ${parts.join(" AND ")};`; - const res = await this.pgPool.query( + const statement = `SELECT * FROM ${ROOM_TABLE_GROUP} WHERE ${parts.join(" AND ")}`; + const res = await this.pgPool.query( statement, Object.values(remoteData), ); if (res.rowCount === 0) { return null; } - const row = res.rows[0]; - const remoteGroupData: IRemoteGroupData = { - gateway: row.gateway, - properties: row.properties, - room_name: row.room_name, - }; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type: "group" } }), - // Id is not used. - remote: new RemoteRoom("", remoteGroupData as Record), - data: {} - }; + return rowToGroupRoomBridgeStoreEntry(res.rows[0]); } public async getAdminRoom(matrixUserId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id FROM admin_rooms WHERE user_id = $1", + const res = await this.pgPool.query( + `SELECT room_id FROM ${ROOM_TABLE_UADMIN} WHERE user_id = $1`, [ matrixUserId ], ); return res.rows[0]?.room_id || null; } public async getIMRoom(matrixUserId: string, protocolId: string, remoteUserId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id FROM im_rooms WHERE user_id = $1 AND remote_id = $2 AND protocol_id = $3", + const res = await this.pgPool.query( + `SELECT room_id FROM ${ROOM_TABLE_IM} WHERE user_id = $1 AND remote_id = $2 AND protocol_id = $3`, [ matrixUserId, remoteUserId, protocolId ], ); if (res.rowCount === 0) { return null; } - const row = res.rows[0]; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type: MROOM_TYPE_IM } }), - remote: new RemoteRoom("", { - matrixUser: matrixUserId, - protocol_id: protocolId, - recipient: remoteUserId, - }), - data: {} - }; + return dataToRoomBridgeStoreEntry(res.rows[0].room_id, MROOM_TYPE_IM, { + matrixUser: matrixUserId, + protocol_id: protocolId, + recipient: remoteUserId, + }); } public async getAllIMRoomsForAccount(matrixUserId: string, protocolId: string): Promise { - const res = await this.pgPool.query( - "SELECT room_id, remote_id FROM im_rooms WHERE user_id = $1 AND protocol_id = $2", + const res = await this.pgPool.query>( + `SELECT room_id, remote_id FROM ${ROOM_TABLE_IM} WHERE user_id = $1 AND protocol_id = $2`, [ matrixUserId, protocolId ], ); - return res.rows.map(row => ({ - matrix: new MatrixRoom(row.room_id, { extras: { type: MROOM_TYPE_IM } }), - remote: new RemoteRoom("", { - matrixUser: matrixUserId, - protocol_id: protocolId, - recipient: row.remote_id, - }), - data: {} + return res.rows.map(row => dataToRoomBridgeStoreEntry(row.room_id, MROOM_TYPE_IM, { + matrixUser: matrixUserId, + protocol_id: protocolId, + recipient: row.remote_id, })); } @@ -270,21 +330,8 @@ export class PgDataStore implements IStore { public async getRoomsOfType(type: MROOM_TYPES): Promise { const tableName = RoomTypeToTable[type]; - const res = await this.pgPool.query(`SELECT * FROM ${tableName};`); - return res.rows.map((row) => { - const remoteData: IRemoteGroupData = { - gateway: row.gateway, - properties: row.properties, - room_name: row.room_name, - protocol_id: row.protocol_id, - }; - return { - matrix: new MatrixRoom(row.room_id, { extras: { type } }), - // Id is not used. - remote: new RemoteRoom("", remoteData as Record), - data: {} - }; - }); + const res = await this.pgPool.query(`SELECT * FROM ${tableName}`); + return res.rows.map((row) => rowToRoomBridgeStoreEntry(type, row)); } public async storeAccount(userId: string, protocol: BifrostProtocol, username: string, extraData?: any) { @@ -335,7 +382,7 @@ export class PgDataStore implements IStore { public async getRoomEntryByMatrixId(roomId: string): Promise { log.debug("Getting room", roomId); - const typeRes = await this.pgPool.query( + const typeRes = await this.pgPool.query<{table_name: ROOM_TABLES}>( "SELECT tableoid::regclass as table_name FROM rooms WHERE room_id = $1 LIMIT 1", [ roomId ], ); @@ -345,89 +392,60 @@ export class PgDataStore implements IStore { } const tableName = typeRes.rows[0].table_name; const type = TableToRoomType[tableName]; - const res = await this.pgPool.query( + if (!type) { + throw new Error("Room was of unknown type!"); + } + const res = await this.pgPool.query( `SELECT * FROM ${tableName} WHERE room_id = $1 LIMIT 1`, [ roomId ], ); if (!res.rowCount) { throw Error("Missing data for room that we did manage to select!"); } - const row = res.rows[0]; - let remoteData: IRemoteRoomData; - if (type === "group") { - remoteData = { - gateway: row.gateway, - room_name: row.room_name, - protocol_id: row.protocol_id, - properties: row.properties, - } as IRemoteGroupData; - } else if (type === "im") { - remoteData = { - matrixUser: new MatrixUser(row.user_id).userId, - recipient: row.remote_id, - protocol_id: row.protocol_id, - } as IRemoteImData; - } else if (type === "user-admin") { - remoteData = { - matrixUser: new MatrixUser(row.user_id).userId, - } as IRemoteUserAdminData; - } else { - throw Error("Room was of unknown type!"); - } - log.debug("Found room ", JSON.stringify(remoteData)); - return { - remote: new RemoteRoom("", remoteData as Record), - matrix: new MatrixRoom(roomId, { extras: { type }}), - data: {} - }; + const entry = rowToRoomBridgeStoreEntry(type, res.rows[0]); + log.debug("Found room ", JSON.stringify(entry.remote)); + return entry; } - public async storeRoom(matrixId: string, type: MROOM_TYPES, remoteId: string, remoteData: IRemoteRoomData) + public async storeRoom(matrixId: string, type: T, remoteId: string, remoteData: RoomTypeToRemoteRoomData[T]) : Promise { log.debug("Storing room", matrixId); let statement: string; - const res = { - remote: new RemoteRoom(remoteId, remoteData as Record), - matrix: new MatrixRoom(matrixId, { extras: { type } }), - data: {} - }; + const res = dataToRoomBridgeStoreEntry(matrixId, type, remoteData, remoteId); - if (type === "user-admin") { - const adminProps = { + if (type === MROOM_TYPE_UADMIN) { + const adminProps: AdminRoomRow = { room_id: matrixId, user_id: (remoteData as IRemoteUserAdminData).matrixUser, }; // We don't upsert here. - await this.pgPool.query("INSERT INTO admin_rooms (room_id, user_id) VALUES ($1, $2)", Object.values(adminProps)); + await this.pgPool.query(`INSERT INTO ${ROOM_TABLE_UADMIN} (room_id, user_id) VALUES ($1, $2)`, Object.values(adminProps)); return res; } - if (type === "im") { - const imData = (remoteData as IRemoteImData); - const imProps = { + if (type === MROOM_TYPE_IM) { + const imData = remoteData as IRemoteImData; + const imProps: ImRoomRow = { room_id: matrixId, user_id: imData.matrixUser, remote_id: imData.recipient, protocol_id: imData.protocol_id, }; - statement = PgDataStore.BuildUpsertStatement("im_rooms", "(room_id)", Object.keys(imProps)); + statement = PgDataStore.BuildUpsertStatement(ROOM_TABLE_IM, "(room_id)", Object.keys(imProps)); await this.pgPool.query(statement, Object.values(imProps)); return res; } - const props = { + const groupData = remoteData as IRemoteGroupData; + const props: GroupRoomRow = { room_id: matrixId, - protocol_id: remoteData.protocol_id || "", - room_name: "", - gateway: false, - properties: "{}", + protocol_id: remoteData.protocol_id ?? "", + room_name: groupData.room_name ?? "", + gateway: groupData.gateway ?? false, + properties: JSON.stringify(groupData.properties), }; - const groupData = remoteData as IRemoteGroupData; - props.gateway = groupData.gateway || false; - props.room_name = groupData.room_name || ""; - props.properties = JSON.stringify(groupData.properties); statement = PgDataStore.BuildUpsertStatement( - "group_rooms", "(room_id)", Object.keys(props), + ROOM_TABLE_GROUP, "(room_id)", Object.keys(props), ); await this.pgPool.query(statement, Object.values(props)); log.debug("Stored room", matrixId); @@ -494,7 +512,7 @@ export class PgDataStore implements IStore { private async updateSchemaVersion(version: number) { log.debug(`updateSchemaVersion: ${version}`); - await this.pgPool.query("UPDATE schema SET version = $1;", [version]); + await this.pgPool.query("UPDATE schema SET version = $1", [version]); } private async getSchemaVersion(): Promise {