Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 51 additions & 42 deletions packages/database-utils/src/migration-runner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { isValid, parse } from 'date-fns'
import { promises as fs } from 'fs'
import pg, { Connection, Pgrm } from 'pg-using-bluebird'
import pg, { Connection } from 'pg-using-bluebird'
import { Logger } from 'winston'

export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => {
Expand All @@ -15,16 +15,14 @@ export const parseDateFromMigrationFilePath = (str: string, logger: Logger) => {
return parsedDate
}

const createMigrationTable = (pgrm: Pgrm) =>
pgrm.withTransaction(tx =>
tx.queryAsync(`
const createMigrationTable = (tx: Connection) =>
tx.queryAsync(`
CREATE TABLE IF NOT EXISTS schemaversion (
schemaversion_id BIGSERIAL PRIMARY KEY,
filename TEXT UNIQUE NOT NULL,
applied TIMESTAMP WITH TIME ZONE NOT NULL
)
`)
)

const lockSchemaTable = (tx: Connection) => tx.queryAsync('LOCK TABLE schemaversion IN ACCESS EXCLUSIVE MODE NOWAIT')

Expand Down Expand Up @@ -57,7 +55,6 @@ const getUnappliedMigrationFiles = async (

const applyMigration = async (
logger: Logger,
pgrm: Pgrm,
lockTx: Connection,
specPath: string,
quiet: boolean,
Expand Down Expand Up @@ -86,13 +83,15 @@ export const migrate = async ({
dbUrl,
dbNames,
quiet = false,
migrationDir = `${process.cwd()}/db/migrations`
migrationDir = `${process.cwd()}/db/migrations`,
tx
}: {
logger: Logger
dbUrl: string
dbNames: string[]
quiet?: boolean
migrationDir?: string
tx?: Connection
}) => {
const isValidService = dbNames.some(name => dbUrl.includes(name))
if (!isValidService || (dbUrl.includes('aws') && !process.env.YTL_AWS_STACK)) {
Expand All @@ -106,48 +105,23 @@ export const migrate = async ({

const pgrm = pg({ dbUrl })
try {
await createMigrationTable(pgrm)
if (tx) {
await createMigrationTable(tx)
} else {
await pgrm.withTransaction(tx => createMigrationTable(tx))
}
} catch (error) {
const stack = String((error as Error).stack)
logger.error(`Fatal error occured while creating migration table (schemaversion): ${stack}`)
throw error
}

try {
await pgrm.withTransaction(async (lockTx: Connection) => {
try {
if (!(await lockSchemaTable(lockTx))) {
logger.warn('Could not lock schema table. Someone else is probably running the migrations.')
return
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code === '55P03') {
logger.warn(
'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.'
)
return
} else {
logger.error(`Unknown error while locking the schema table: ${String(error)}`)
throw error
}
}
const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir)
if (!quiet) {
logger.info(`Applying total of ${files.length} migrations`)
}

try {
for (const file of files) {
await applyMigration(logger, pgrm, lockTx, file, quiet, migrationDir)
}
} catch (e) {
if (e instanceof Error) {
logger.error(e.message)
} else {
logger.error(e)
}
}
})
if (tx) {
await runMigrationsInTransaction(tx)
} else {
await pgrm.withTransaction((lockTx: Connection) => runMigrationsInTransaction(lockTx))
}
} catch (error) {
if (error instanceof Error) {
logger.error(`Fatal error occured while running migrations: ${error.message}`)
Expand All @@ -156,4 +130,39 @@ export const migrate = async ({
}
throw error
}

async function runMigrationsInTransaction(lockTx: Connection) {
try {
if (!(await lockSchemaTable(lockTx))) {
logger.warn('Could not lock schema table. Someone else is probably running the migrations.')
return
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code === '55P03') {
logger.warn(
'Error 55P03 lock_not_available while locking schema table. Ignoring error without trying to migrate as someone else is probably running the migrations.'
)
return
} else {
logger.error(`Unknown error while locking the schema table: ${String(error)}`)
throw error
}
}
const files = await getUnappliedMigrationFiles(logger, lockTx, migrationDir)
if (!quiet) {
logger.info(`Applying total of ${files.length} migrations`)
}

try {
for (const file of files) {
await applyMigration(logger, lockTx, file, quiet, migrationDir)
}
} catch (e) {
if (e instanceof Error) {
logger.error(e.message)
} else {
logger.error(e)
}
}
}
}