From f9ae4195a7964ef6b98016b7bcd368a0c2db8ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petri=20=C3=84mm=C3=A4l=C3=A4?= Date: Fri, 9 Jan 2026 16:31:35 +0200 Subject: [PATCH] Add possibility to pass transaction object to migrate function When used, there might be some other database operations done in transaction which require also migrations to be run in the same transaction. --- .../database-utils/src/migration-runner.ts | 93 ++++++++++--------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/packages/database-utils/src/migration-runner.ts b/packages/database-utils/src/migration-runner.ts index 796eebc..07763a3 100644 --- a/packages/database-utils/src/migration-runner.ts +++ b/packages/database-utils/src/migration-runner.ts @@ -1,6 +1,6 @@ import { isValid, parse } from 'date-fns' import { promises as fs } from 'fs' -import pg, { Connection, Pgrm } from 'pg-using-bluebird' +import pg, { Connection } from 'pg-using-bluebird' import { Logger } from 'winston' export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => { @@ -15,16 +15,14 @@ export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => { return parsedDate } -const createMigrationTable = (pgrm: Pgrm) => - pgrm.withTransaction(tx => - tx.queryAsync(` +const createMigrationTable = (tx: Connection) => + tx.queryAsync(` CREATE TABLE IF NOT EXISTS schemaversion ( schemaversion_id BIGSERIAL PRIMARY KEY, filename TEXT UNIQUE NOT NULL, applied TIMESTAMP WITH TIME ZONE NOT NULL ) `) - ) const lockSchemaTable = (tx: Connection) => tx.queryAsync('LOCK TABLE schemaversion IN ACCESS EXCLUSIVE MODE NOWAIT') @@ -57,7 +55,6 @@ const getUnappliedMigrationFiles = async ( const applyMigration = async ( logger: Logger, - pgrm: Pgrm, lockTx: Connection, specPath: string, quiet: boolean, @@ -86,13 +83,15 @@ export const migrate = async ({ dbUrl, dbNames, quiet = false, - migrationDir = `${process.cwd()}/db/migrations` + migrationDir = `${process.cwd()}/db/migrations`, + tx }: { logger: Logger dbUrl: string dbNames: string[] quiet?: boolean migrationDir?: string + tx?: Connection }) => { const isValidService = dbNames.some(name => dbUrl.includes(name)) if (!isValidService || (dbUrl.includes('aws') && !process.env.YTL_AWS_STACK)) { @@ -106,7 +105,11 @@ export const migrate = async ({ const pgrm = pg({ dbUrl }) try { - await createMigrationTable(pgrm) + if (tx) { + await createMigrationTable(tx) + } else { + await pgrm.withTransaction(tx => createMigrationTable(tx)) + } } catch (error) { const stack = String((error as Error).stack) logger.error(`Fatal error occured while creating migration table (schemaversion): ${stack}`) @@ -114,40 +117,11 @@ export const migrate = async ({ } try { - await pgrm.withTransaction(async (lockTx: Connection) => { - try { - if (!(await lockSchemaTable(lockTx))) { - logger.warn('Could not lock schema table. Someone else is probably running the migrations.') - return - } - } catch (error) { - if ((error as NodeJS.ErrnoException).code === '55P03') { - logger.warn( - 'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.' - ) - return - } else { - logger.error(`Unknown error while locking the schema table: ${String(error)}`) - throw error - } - } - const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir) - if (!quiet) { - logger.info(`Applying total of ${files.length} migrations`) - } - - try { - for (const file of files) { - await applyMigration(logger, pgrm, lockTx, file, quiet, migrationDir) - } - } catch (e) { - if (e instanceof Error) { - logger.error(e.message) - } else { - logger.error(e) - } - } - }) + if (tx) { + await runMigrationsInTransaction(tx) + } else { + await pgrm.withTransaction((lockTx: Connection) => runMigrationsInTransaction(lockTx)) + } } catch (error) { if (error instanceof Error) { logger.error(`Fatal error occured while running migrations: ${error.message}`) @@ -156,4 +130,39 @@ export const migrate = async ({ } throw error } + + async function runMigrationsInTransaction(lockTx: Connection) { + try { + if (!(await lockSchemaTable(lockTx))) { + logger.warn('Could not lock schema table. Someone else is probably running the migrations.') + return + } + } catch (error) { + if ((error as NodeJS.ErrnoException).code === '55P03') { + logger.warn( + 'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.' + ) + return + } else { + logger.error(`Unknown error while locking the schema table: ${String(error)}`) + throw error + } + } + const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir) + if (!quiet) { + logger.info(`Applying total of ${files.length} migrations`) + } + + try { + for (const file of files) { + await applyMigration(logger, lockTx, file, quiet, migrationDir) + } + } catch (e) { + if (e instanceof Error) { + logger.error(e.message) + } else { + logger.error(e) + } + } + } }