Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/services/db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import type * as DB from './type'

const debug = createDebug('app:db')
const SQLITE_BUSY_TIMEOUT_MS = 5000

export { sql }
export type { DB, Insertable, Selectable, Updateable }
Expand All @@ -23,6 +24,7 @@ if (!process.env.DATABASE_URL) {
const filename = `${process.env.NODE_ENV === 'production' ? '' : '.'}${new URL(process.env.DATABASE_URL).pathname}`
const database = new SQLite(filename)
database.pragma('journal_mode = WAL')
database.pragma(`busy_timeout = ${SQLITE_BUSY_TIMEOUT_MS}`)
database.pragma('wal_autocheckpoint = 1000')
export const dialect = new SqliteDialect({ database })
export const db = new Kysely<DB.DB>({
Expand Down
57 changes: 57 additions & 0 deletions app/services/jobs/analyze-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Worker thread for CPU-intensive PR analysis.
*
* IMPORTANT: module.register() must be called before any ~/... imports.
*/
if (import.meta.url.endsWith('.ts')) {
const { register } = await import('node:module')
register('./path-alias-hooks.mjs', import.meta.url)
}

import 'dotenv/config'
import { parentPort, workerData } from 'node:worker_threads'
import type { OrganizationId } from '~/app/types/organization'

interface WorkerInput {
organizationId: string
repositoryId: string
releaseDetectionMethod: string
releaseDetectionKey: string
excludedUsers: string
filterPrNumbers?: number[]
}

const input = workerData as WorkerInput
const orgId = input.organizationId as OrganizationId

const [{ buildPullRequests }, { createStore }] = (await Promise.all([
import('~/batch/github/pullrequest'),
import('~/batch/github/store'),
])) as [
{
buildPullRequests: typeof import('~/batch/github/pullrequest').buildPullRequests
},
{ createStore: typeof import('~/batch/github/store').createStore },
]

const store = createStore({
organizationId: orgId,
repositoryId: input.repositoryId,
})
store.preloadAll()

const result: Awaited<ReturnType<typeof buildPullRequests>> =
await buildPullRequests(
{
organizationId: orgId,
repositoryId: input.repositoryId,
releaseDetectionMethod: input.releaseDetectionMethod,
releaseDetectionKey: input.releaseDetectionKey,
excludedUsers: input.excludedUsers,
},
await store.loader.pullrequests(),
store.loader,
input.filterPrNumbers ? new Set(input.filterPrNumbers) : undefined,
)

parentPort?.postMessage(result)
2 changes: 1 addition & 1 deletion app/services/jobs/crawl.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export const crawlJob = defineJob({
step.log.info('No updated PRs, skipping analyze.')
await step.run('finalize', async () => {
const tenantDb = getTenantDb(orgId)
await sql`PRAGMA wal_checkpoint(TRUNCATE)`.execute(tenantDb)
await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb)
clearOrgCache(orgId)
})
return { fetchedRepos: repoCount, pullCount: 0 }
Expand Down
65 changes: 65 additions & 0 deletions app/services/jobs/path-alias-hooks.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { existsSync, statSync } from 'node:fs'
import path from 'node:path'
import { fileURLToPath, pathToFileURL } from 'node:url'

const here = path.dirname(fileURLToPath(import.meta.url))
const projectRoot = path.resolve(here, '../../..')

function resolveCandidate(base) {
const candidates = [
`${base}.ts`,
`${base}.tsx`,
`${base}.js`,
`${base}.mjs`,
path.join(base, 'index.ts'),
path.join(base, 'index.tsx'),
path.join(base, 'index.js'),
path.join(base, 'index.mjs'),
base,
]

for (const candidate of candidates) {
if (existsSync(candidate) && statSync(candidate).isFile()) return candidate
}

return null
}

function resolveProjectPath(specifier) {
return resolveCandidate(path.join(projectRoot, specifier.slice(2)))
}

function resolveLocalPath(specifier, parentURL) {
const parentPath = parentURL.startsWith('file:')
? fileURLToPath(parentURL)
: parentURL
const base = specifier.startsWith('/')
? specifier
: path.resolve(path.dirname(parentPath), specifier)

return resolveCandidate(base)
}

export function resolve(specifier, context, nextResolve) {
if (specifier.startsWith('~/')) {
const resolved = resolveProjectPath(specifier)
if (!resolved) {
throw new Error(`Unable to resolve alias import: ${specifier}`)
}
return nextResolve(pathToFileURL(resolved).href, context)
}

if (
(specifier.startsWith('./') ||
specifier.startsWith('../') ||
specifier.startsWith('/')) &&
context.parentURL
) {
const resolved = resolveLocalPath(specifier, context.parentURL)
if (resolved) {
return nextResolve(pathToFileURL(resolved).href, context)
}
}

return nextResolve(specifier, context)
}
23 changes: 23 additions & 0 deletions app/services/jobs/run-in-worker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import path from 'node:path'
import { describe, expect, test } from 'vitest'
import { getWorkerRuntime } from './run-in-worker'

describe('getWorkerRuntime', () => {
test('uses bundled worker files in production', () => {
const runtime = getWorkerRuntime('analyze-worker', 'production')

expect(runtime.workerPath).toBe(
path.resolve(process.cwd(), 'build/workers/analyze-worker.js'),
)
expect(runtime.workerOptions.execArgv).toBeUndefined()
})

test('uses ts worker files in development', () => {
const runtime = getWorkerRuntime('upsert-worker', 'development')

expect(runtime.workerPath).toBe(
path.resolve(process.cwd(), 'app/services/jobs/upsert-worker.ts'),
)
expect(runtime.workerOptions.execArgv).toEqual(['--import', 'tsx'])
})
})
199 changes: 199 additions & 0 deletions app/services/jobs/run-in-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import path from 'node:path'
import { Worker, type WorkerOptions } from 'node:worker_threads'
import { logger } from '~/batch/helper/logger'

interface AnalyzeWorkerInput {
organizationId: string
repositoryId: string
releaseDetectionMethod: string
releaseDetectionKey: string
excludedUsers: string
filterPrNumbers?: number[]
}

export interface SqliteBusyEvent {
entrypoint: WorkerEntrypoint
organizationId: string
attempt: number
delayMs: number
errorMessage: string
gaveUp: boolean
}

interface RunWorkerOptions {
onSqliteBusy?: (event: SqliteBusyEvent) => void
}

/**
* Run PR analysis in a worker thread to keep the main event loop responsive.
* The worker opens its own SQLite connection and runs buildPullRequests independently.
*/
export function runAnalyzeInWorker<T>(
input: AnalyzeWorkerInput,
options?: RunWorkerOptions,
): Promise<T> {
return runWorker<T>('analyze-worker', input, options)
}

interface UpsertWorkerInput<TPull, TReview, TReviewer> {
organizationId: string
pulls: TPull[]
reviews: TReview[]
reviewers: TReviewer[]
}

export function runUpsertInWorker<TPull, TReview, TReviewer>(
input: UpsertWorkerInput<TPull, TReview, TReviewer>,
options?: RunWorkerOptions,
): Promise<{ ok: true }> {
return runWorker<{ ok: true }>('upsert-worker', input, options)
}

type WorkerEntrypoint = 'analyze-worker' | 'upsert-worker'
const SQLITE_BUSY_RETRY_LIMIT = 3
const SQLITE_BUSY_RETRY_DELAYS_MS = [150, 400, 1000]

export function getWorkerRuntime(
entrypoint: WorkerEntrypoint,
nodeEnv = process.env.NODE_ENV,
) {
const isProduction = nodeEnv === 'production'
const projectRoot = process.cwd()
const filename = isProduction ? `${entrypoint}.js` : `${entrypoint}.ts`
const workerPath = isProduction
? path.resolve(projectRoot, 'build/workers', filename)
: path.resolve(projectRoot, 'app/services/jobs', filename)

const workerOptions: WorkerOptions = {
execArgv: isProduction ? undefined : ['--import', 'tsx'],
}

return { workerPath, workerOptions }
}

function runWorker<T>(
entrypoint: WorkerEntrypoint,
workerData: unknown,
options?: RunWorkerOptions,
): Promise<T> {
return runWorkerWithRetry<T>(entrypoint, workerData, options)
}

async function runWorkerWithRetry<T>(
entrypoint: WorkerEntrypoint,
workerData: unknown,
options?: RunWorkerOptions,
): Promise<T> {
let lastError: unknown

for (let attempt = 0; attempt <= SQLITE_BUSY_RETRY_LIMIT; attempt++) {
try {
return await runWorkerOnce<T>(entrypoint, workerData)
} catch (error) {
lastError = error
if (!isSqliteBusyError(error) || attempt === SQLITE_BUSY_RETRY_LIMIT) {
if (isSqliteBusyError(error)) {
const event = createBusyEvent(
entrypoint,
workerData,
attempt,
0,
error,
)
logger.warn(formatBusyLog(event))
options?.onSqliteBusy?.(event)
}
throw error
}

const delayMs = SQLITE_BUSY_RETRY_DELAYS_MS[attempt] ?? 1000
const event = createBusyEvent(
entrypoint,
workerData,
attempt,
delayMs,
error,
)
logger.warn(formatBusyLog(event))
options?.onSqliteBusy?.(event)
await delay(delayMs)
}
}

throw lastError
}

function runWorkerOnce<T>(
entrypoint: WorkerEntrypoint,
workerData: unknown,
): Promise<T> {
const { workerPath, workerOptions } = getWorkerRuntime(entrypoint)
return new Promise((resolve, reject) => {
const worker = new Worker(workerPath, {
workerData,
...workerOptions,
})
worker.on('message', (result: T) => {
resolve(result)
worker.terminate()
})
worker.on('error', (err) => {
reject(err)
worker.terminate()
})
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker exited with code ${code}`))
}
})
})
}

function isSqliteBusyError(error: unknown) {
const message = error instanceof Error ? error.message : String(error)
return (
message.includes('SQLITE_BUSY') || message.includes('database is locked')
)
}

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

function createBusyEvent(
entrypoint: WorkerEntrypoint,
workerData: unknown,
attempt: number,
delayMs: number,
error: unknown,
): SqliteBusyEvent {
return {
entrypoint,
organizationId: getWorkerOrganizationId(workerData),
attempt,
delayMs,
errorMessage: error instanceof Error ? error.message : String(error),
gaveUp: delayMs === 0,
}
}

function formatBusyLog(event: SqliteBusyEvent) {
const attemptLabel = event.gaveUp
? `giving up after ${event.attempt} retries`
: `retry ${event.attempt + 1}/${SQLITE_BUSY_RETRY_LIMIT}`
const waitLabel = event.delayMs > 0 ? `, waiting ${event.delayMs}ms` : ''
return `[sqlite-busy] ${event.entrypoint} org=${event.organizationId} ${attemptLabel}${waitLabel}: ${event.errorMessage}`
}

function getWorkerOrganizationId(workerData: unknown) {
if (
workerData &&
typeof workerData === 'object' &&
'organizationId' in workerData &&
typeof workerData.organizationId === 'string'
) {
return workerData.organizationId
}

return 'unknown'
}
Loading