diff --git a/packages/database-utils/src/migration-runner.ts b/packages/database-utils/src/migration-runner.ts index 796eebc..07763a3 100644 --- a/packages/database-utils/src/migration-runner.ts +++ b/packages/database-utils/src/migration-runner.ts @@ -1,6 +1,6 @@ import { isValid, parse } from 'date-fns' import { promises as fs } from 'fs' -import pg, { Connection, Pgrm } from 'pg-using-bluebird' +import pg, { Connection } from 'pg-using-bluebird' import { Logger } from 'winston' export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => { @@ -15,16 +15,14 @@ export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => { return parsedDate } -const createMigrationTable = (pgrm: Pgrm) => - pgrm.withTransaction(tx => - tx.queryAsync(` +const createMigrationTable = (tx: Connection) => + tx.queryAsync(` CREATE TABLE IF NOT EXISTS schemaversion ( schemaversion_id BIGSERIAL PRIMARY KEY, filename TEXT UNIQUE NOT NULL, applied TIMESTAMP WITH TIME ZONE NOT NULL ) `) - ) const lockSchemaTable = (tx: Connection) => tx.queryAsync('LOCK TABLE schemaversion IN ACCESS EXCLUSIVE MODE NOWAIT') @@ -57,7 +55,6 @@ const getUnappliedMigrationFiles = async ( const applyMigration = async ( logger: Logger, - pgrm: Pgrm, lockTx: Connection, specPath: string, quiet: boolean, @@ -86,13 +83,15 @@ export const migrate = async ({ dbUrl, dbNames, quiet = false, - migrationDir = `${process.cwd()}/db/migrations` + migrationDir = `${process.cwd()}/db/migrations`, + tx }: { logger: Logger dbUrl: string dbNames: string[] quiet?: boolean migrationDir?: string + tx?: Connection }) => { const isValidService = dbNames.some(name => dbUrl.includes(name)) if (!isValidService || (dbUrl.includes('aws') && !process.env.YTL_AWS_STACK)) { @@ -106,7 +105,11 @@ export const migrate = async ({ const pgrm = pg({ dbUrl }) try { - await createMigrationTable(pgrm) + if (tx) { + await createMigrationTable(tx) + } else { + await pgrm.withTransaction(tx => createMigrationTable(tx)) + } } catch (error) { const stack = String((error as Error).stack) logger.error(`Fatal error occured while creating migration table (schemaversion): ${stack}`) @@ -114,40 +117,11 @@ export const migrate = async ({ } try { - await pgrm.withTransaction(async (lockTx: Connection) => { - try { - if (!(await lockSchemaTable(lockTx))) { - logger.warn('Could not lock schema table. Someone else is probably running the migrations.') - return - } - } catch (error) { - if ((error as NodeJS.ErrnoException).code === '55P03') { - logger.warn( - 'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.' - ) - return - } else { - logger.error(`Unknown error while locking the schema table: ${String(error)}`) - throw error - } - } - const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir) - if (!quiet) { - logger.info(`Applying total of ${files.length} migrations`) - } - - try { - for (const file of files) { - await applyMigration(logger, pgrm, lockTx, file, quiet, migrationDir) - } - } catch (e) { - if (e instanceof Error) { - logger.error(e.message) - } else { - logger.error(e) - } - } - }) + if (tx) { + await runMigrationsInTransaction(tx) + } else { + await pgrm.withTransaction((lockTx: Connection) => runMigrationsInTransaction(lockTx)) + } } catch (error) { if (error instanceof Error) { logger.error(`Fatal error occured while running migrations: ${error.message}`) @@ -156,4 +130,39 @@ export const migrate = async ({ } throw error } + + async function runMigrationsInTransaction(lockTx: Connection) { + try { + if (!(await lockSchemaTable(lockTx))) { + logger.warn('Could not lock schema table. Someone else is probably running the migrations.') + return + } + } catch (error) { + if ((error as NodeJS.ErrnoException).code === '55P03') { + logger.warn( + 'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.' + ) + return + } else { + logger.error(`Unknown error while locking the schema table: ${String(error)}`) + throw error + } + } + const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir) + if (!quiet) { + logger.info(`Applying total of ${files.length} migrations`) + } + + try { + for (const file of files) { + await applyMigration(logger, lockTx, file, quiet, migrationDir) + } + } catch (e) { + if (e instanceof Error) { + logger.error(e.message) + } else { + logger.error(e) + } + } + } }