diff --git a/app/services/db.server.ts b/app/services/db.server.ts index 7da46012..5ba80e7c 100644 --- a/app/services/db.server.ts +++ b/app/services/db.server.ts @@ -13,6 +13,7 @@ import { import type * as DB from './type' const debug = createDebug('app:db') +const SQLITE_BUSY_TIMEOUT_MS = 5000 export { sql } export type { DB, Insertable, Selectable, Updateable } @@ -23,6 +24,7 @@ if (!process.env.DATABASE_URL) { const filename = `${process.env.NODE_ENV === 'production' ? '' : '.'}${new URL(process.env.DATABASE_URL).pathname}` const database = new SQLite(filename) database.pragma('journal_mode = WAL') +database.pragma(`busy_timeout = ${SQLITE_BUSY_TIMEOUT_MS}`) database.pragma('wal_autocheckpoint = 1000') export const dialect = new SqliteDialect({ database }) export const db = new Kysely({ diff --git a/app/services/jobs/analyze-worker.ts b/app/services/jobs/analyze-worker.ts new file mode 100644 index 00000000..60e2cd5e --- /dev/null +++ b/app/services/jobs/analyze-worker.ts @@ -0,0 +1,57 @@ +/** + * Worker thread for CPU-intensive PR analysis. + * + * IMPORTANT: module.register() must be called before any ~/... imports. + */ +if (import.meta.url.endsWith('.ts')) { + const { register } = await import('node:module') + register('./path-alias-hooks.mjs', import.meta.url) +} + +import 'dotenv/config' +import { parentPort, workerData } from 'node:worker_threads' +import type { OrganizationId } from '~/app/types/organization' + +interface WorkerInput { + organizationId: string + repositoryId: string + releaseDetectionMethod: string + releaseDetectionKey: string + excludedUsers: string + filterPrNumbers?: number[] +} + +const input = workerData as WorkerInput +const orgId = input.organizationId as OrganizationId + +const [{ buildPullRequests }, { createStore }] = (await Promise.all([ + import('~/batch/github/pullrequest'), + import('~/batch/github/store'), +])) as [ + { + buildPullRequests: typeof import('~/batch/github/pullrequest').buildPullRequests + }, + { createStore: typeof import('~/batch/github/store').createStore }, +] + +const store = createStore({ + organizationId: orgId, + repositoryId: input.repositoryId, +}) +store.preloadAll() + +const result: Awaited> = + await buildPullRequests( + { + organizationId: orgId, + repositoryId: input.repositoryId, + releaseDetectionMethod: input.releaseDetectionMethod, + releaseDetectionKey: input.releaseDetectionKey, + excludedUsers: input.excludedUsers, + }, + await store.loader.pullrequests(), + store.loader, + input.filterPrNumbers ? new Set(input.filterPrNumbers) : undefined, + ) + +parentPort?.postMessage(result) diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 49bd3523..2f3978da 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -148,7 +148,7 @@ export const crawlJob = defineJob({ step.log.info('No updated PRs, skipping analyze.') await step.run('finalize', async () => { const tenantDb = getTenantDb(orgId) - await sql`PRAGMA wal_checkpoint(TRUNCATE)`.execute(tenantDb) + await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb) clearOrgCache(orgId) }) return { fetchedRepos: repoCount, pullCount: 0 } diff --git a/app/services/jobs/path-alias-hooks.mjs b/app/services/jobs/path-alias-hooks.mjs new file mode 100644 index 00000000..333130af --- /dev/null +++ b/app/services/jobs/path-alias-hooks.mjs @@ -0,0 +1,65 @@ +import { existsSync, statSync } from 'node:fs' +import path from 'node:path' +import { fileURLToPath, pathToFileURL } from 'node:url' + +const here = path.dirname(fileURLToPath(import.meta.url)) +const projectRoot = path.resolve(here, '../../..') + +function resolveCandidate(base) { + const candidates = [ + `${base}.ts`, + `${base}.tsx`, + `${base}.js`, + `${base}.mjs`, + path.join(base, 'index.ts'), + path.join(base, 'index.tsx'), + path.join(base, 'index.js'), + path.join(base, 'index.mjs'), + base, + ] + + for (const candidate of candidates) { + if (existsSync(candidate) && statSync(candidate).isFile()) return candidate + } + + return null +} + +function resolveProjectPath(specifier) { + return resolveCandidate(path.join(projectRoot, specifier.slice(2))) +} + +function resolveLocalPath(specifier, parentURL) { + const parentPath = parentURL.startsWith('file:') + ? fileURLToPath(parentURL) + : parentURL + const base = specifier.startsWith('/') + ? specifier + : path.resolve(path.dirname(parentPath), specifier) + + return resolveCandidate(base) +} + +export function resolve(specifier, context, nextResolve) { + if (specifier.startsWith('~/')) { + const resolved = resolveProjectPath(specifier) + if (!resolved) { + throw new Error(`Unable to resolve alias import: ${specifier}`) + } + return nextResolve(pathToFileURL(resolved).href, context) + } + + if ( + (specifier.startsWith('./') || + specifier.startsWith('../') || + specifier.startsWith('/')) && + context.parentURL + ) { + const resolved = resolveLocalPath(specifier, context.parentURL) + if (resolved) { + return nextResolve(pathToFileURL(resolved).href, context) + } + } + + return nextResolve(specifier, context) +} diff --git a/app/services/jobs/run-in-worker.test.ts b/app/services/jobs/run-in-worker.test.ts new file mode 100644 index 00000000..a2838a3f --- /dev/null +++ b/app/services/jobs/run-in-worker.test.ts @@ -0,0 +1,23 @@ +import path from 'node:path' +import { describe, expect, test } from 'vitest' +import { getWorkerRuntime } from './run-in-worker' + +describe('getWorkerRuntime', () => { + test('uses bundled worker files in production', () => { + const runtime = getWorkerRuntime('analyze-worker', 'production') + + expect(runtime.workerPath).toBe( + path.resolve(process.cwd(), 'build/workers/analyze-worker.js'), + ) + expect(runtime.workerOptions.execArgv).toBeUndefined() + }) + + test('uses ts worker files in development', () => { + const runtime = getWorkerRuntime('upsert-worker', 'development') + + expect(runtime.workerPath).toBe( + path.resolve(process.cwd(), 'app/services/jobs/upsert-worker.ts'), + ) + expect(runtime.workerOptions.execArgv).toEqual(['--import', 'tsx']) + }) +}) diff --git a/app/services/jobs/run-in-worker.ts b/app/services/jobs/run-in-worker.ts new file mode 100644 index 00000000..72aca7e2 --- /dev/null +++ b/app/services/jobs/run-in-worker.ts @@ -0,0 +1,199 @@ +import path from 'node:path' +import { Worker, type WorkerOptions } from 'node:worker_threads' +import { logger } from '~/batch/helper/logger' + +interface AnalyzeWorkerInput { + organizationId: string + repositoryId: string + releaseDetectionMethod: string + releaseDetectionKey: string + excludedUsers: string + filterPrNumbers?: number[] +} + +export interface SqliteBusyEvent { + entrypoint: WorkerEntrypoint + organizationId: string + attempt: number + delayMs: number + errorMessage: string + gaveUp: boolean +} + +interface RunWorkerOptions { + onSqliteBusy?: (event: SqliteBusyEvent) => void +} + +/** + * Run PR analysis in a worker thread to keep the main event loop responsive. + * The worker opens its own SQLite connection and runs buildPullRequests independently. + */ +export function runAnalyzeInWorker( + input: AnalyzeWorkerInput, + options?: RunWorkerOptions, +): Promise { + return runWorker('analyze-worker', input, options) +} + +interface UpsertWorkerInput { + organizationId: string + pulls: TPull[] + reviews: TReview[] + reviewers: TReviewer[] +} + +export function runUpsertInWorker( + input: UpsertWorkerInput, + options?: RunWorkerOptions, +): Promise<{ ok: true }> { + return runWorker<{ ok: true }>('upsert-worker', input, options) +} + +type WorkerEntrypoint = 'analyze-worker' | 'upsert-worker' +const SQLITE_BUSY_RETRY_LIMIT = 3 +const SQLITE_BUSY_RETRY_DELAYS_MS = [150, 400, 1000] + +export function getWorkerRuntime( + entrypoint: WorkerEntrypoint, + nodeEnv = process.env.NODE_ENV, +) { + const isProduction = nodeEnv === 'production' + const projectRoot = process.cwd() + const filename = isProduction ? `${entrypoint}.js` : `${entrypoint}.ts` + const workerPath = isProduction + ? path.resolve(projectRoot, 'build/workers', filename) + : path.resolve(projectRoot, 'app/services/jobs', filename) + + const workerOptions: WorkerOptions = { + execArgv: isProduction ? undefined : ['--import', 'tsx'], + } + + return { workerPath, workerOptions } +} + +function runWorker( + entrypoint: WorkerEntrypoint, + workerData: unknown, + options?: RunWorkerOptions, +): Promise { + return runWorkerWithRetry(entrypoint, workerData, options) +} + +async function runWorkerWithRetry( + entrypoint: WorkerEntrypoint, + workerData: unknown, + options?: RunWorkerOptions, +): Promise { + let lastError: unknown + + for (let attempt = 0; attempt <= SQLITE_BUSY_RETRY_LIMIT; attempt++) { + try { + return await runWorkerOnce(entrypoint, workerData) + } catch (error) { + lastError = error + if (!isSqliteBusyError(error) || attempt === SQLITE_BUSY_RETRY_LIMIT) { + if (isSqliteBusyError(error)) { + const event = createBusyEvent( + entrypoint, + workerData, + attempt, + 0, + error, + ) + logger.warn(formatBusyLog(event)) + options?.onSqliteBusy?.(event) + } + throw error + } + + const delayMs = SQLITE_BUSY_RETRY_DELAYS_MS[attempt] ?? 1000 + const event = createBusyEvent( + entrypoint, + workerData, + attempt, + delayMs, + error, + ) + logger.warn(formatBusyLog(event)) + options?.onSqliteBusy?.(event) + await delay(delayMs) + } + } + + throw lastError +} + +function runWorkerOnce( + entrypoint: WorkerEntrypoint, + workerData: unknown, +): Promise { + const { workerPath, workerOptions } = getWorkerRuntime(entrypoint) + return new Promise((resolve, reject) => { + const worker = new Worker(workerPath, { + workerData, + ...workerOptions, + }) + worker.on('message', (result: T) => { + resolve(result) + worker.terminate() + }) + worker.on('error', (err) => { + reject(err) + worker.terminate() + }) + worker.on('exit', (code) => { + if (code !== 0) { + reject(new Error(`Worker exited with code ${code}`)) + } + }) + }) +} + +function isSqliteBusyError(error: unknown) { + const message = error instanceof Error ? error.message : String(error) + return ( + message.includes('SQLITE_BUSY') || message.includes('database is locked') + ) +} + +function delay(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function createBusyEvent( + entrypoint: WorkerEntrypoint, + workerData: unknown, + attempt: number, + delayMs: number, + error: unknown, +): SqliteBusyEvent { + return { + entrypoint, + organizationId: getWorkerOrganizationId(workerData), + attempt, + delayMs, + errorMessage: error instanceof Error ? error.message : String(error), + gaveUp: delayMs === 0, + } +} + +function formatBusyLog(event: SqliteBusyEvent) { + const attemptLabel = event.gaveUp + ? `giving up after ${event.attempt} retries` + : `retry ${event.attempt + 1}/${SQLITE_BUSY_RETRY_LIMIT}` + const waitLabel = event.delayMs > 0 ? `, waiting ${event.delayMs}ms` : '' + return `[sqlite-busy] ${event.entrypoint} org=${event.organizationId} ${attemptLabel}${waitLabel}: ${event.errorMessage}` +} + +function getWorkerOrganizationId(workerData: unknown) { + if ( + workerData && + typeof workerData === 'object' && + 'organizationId' in workerData && + typeof workerData.organizationId === 'string' + ) { + return workerData.organizationId + } + + return 'unknown' +} diff --git a/app/services/jobs/shared-steps.server.ts b/app/services/jobs/shared-steps.server.ts index 38bb3ab5..c0fa4a90 100644 --- a/app/services/jobs/shared-steps.server.ts +++ b/app/services/jobs/shared-steps.server.ts @@ -10,15 +10,20 @@ import { exportPulls, exportReviewResponses, } from '~/batch/bizlogic/export-spreadsheet' -import { upsertAnalyzedData } from '~/batch/db' -import { buildPullRequests } from '~/batch/github/pullrequest' -import { createStore } from '~/batch/github/store' import type { AnalyzedReview, AnalyzedReviewResponse, AnalyzedReviewer, } from '~/batch/github/types' import { classifyPullRequests } from '~/batch/usecases/classify-pull-requests' +import { runAnalyzeInWorker, runUpsertInWorker } from './run-in-worker' + +interface AnalyzeResult { + pulls: Selectable[] + reviews: AnalyzedReview[] + reviewers: AnalyzedReviewer[] + reviewResponses: AnalyzedReviewResponse[] +} interface OrganizationData { organizationSetting: Pick< @@ -44,6 +49,26 @@ interface AnalyzeAndFinalizeOptions { steps?: JobSteps } +function formatDurationMs(durationMs: number) { + if (durationMs < 1000) return `${durationMs}ms` + return `${(durationMs / 1000).toFixed(1)}s` +} + +async function runTimedStep( + step: StepContext, + name: string, + action: () => Promise, +) { + const startedAt = Date.now() + try { + return await action() + } finally { + step.log.info( + `${name} completed in ${formatDurationMs(Date.now() - startedAt)}`, + ) + } +} + /** * analyze → upsert → classify → export → finalize の共通パイプライン。 * durably の step context を受け取り、ステップ名付きで実行する。 @@ -71,17 +96,12 @@ export async function analyzeAndFinalizeSteps( if (skipRepo?.(repo.id)) continue const result = await step.run(`analyze:${repo.repo}`, async () => { - step.progress(i + 1, repoCount, `Analyzing ${repo.repo}...`) - - const store = createStore({ - organizationId: orgId, - repositoryId: repo.id, - }) - await store.preloadAll() + return await runTimedStep(step, `analyze:${repo.repo}`, async () => { + step.progress(i + 1, repoCount, `Analyzing ${repo.repo}...`) - const orgSetting = organization.organizationSetting - return await buildPullRequests( - { + const orgSetting = organization.organizationSetting + const prNumbers = filterPrNumbers?.get(repo.id) + return await runAnalyzeInWorker({ organizationId: orgId, repositoryId: repo.id, releaseDetectionMethod: @@ -89,11 +109,9 @@ export async function analyzeAndFinalizeSteps( releaseDetectionKey: repo.releaseDetectionKey ?? orgSetting.releaseDetectionKey, excludedUsers: orgSetting.excludedUsers, - }, - await store.loader.pullrequests(), - store.loader, - filterPrNumbers?.get(repo.id), - ) + filterPrNumbers: prNumbers ? [...prNumbers] : undefined, + }) + }) }) allPulls.push(...result.pulls) allReviews.push(...result.reviews) @@ -104,11 +122,14 @@ export async function analyzeAndFinalizeSteps( // Upsert if (runUpsert) { await step.run('upsert', async () => { - step.progress(0, 0, 'Upserting to database...') - await upsertAnalyzedData(orgId, { - pulls: allPulls, - reviews: allReviews, - reviewers: allReviewers, + await runTimedStep(step, 'upsert', async () => { + step.progress(0, 0, 'Upserting to database...') + await runUpsertInWorker({ + organizationId: orgId, + pulls: allPulls, + reviews: allReviews, + reviewers: allReviewers, + }) }) }) } @@ -116,8 +137,10 @@ export async function analyzeAndFinalizeSteps( // Classify if (runClassify) { await step.run('classify', async () => { - step.progress(0, 0, 'Classifying PRs...') - await classifyPullRequests(orgId) + await runTimedStep(step, 'classify', async () => { + step.progress(0, 0, 'Classifying PRs...') + await classifyPullRequests(orgId) + }) }) } @@ -125,22 +148,26 @@ export async function analyzeAndFinalizeSteps( const { exportSetting } = organization if (runExport && exportSetting) { await step.run('export', async () => { - step.progress(0, 0, 'Exporting to spreadsheet...') - try { - await exportPulls(exportSetting, allPulls) - await exportReviewResponses(exportSetting, allReviewResponses) - } catch (e) { - step.log.warn(`Export failed: ${e instanceof Error ? e.message : e}`) - } + await runTimedStep(step, 'export', async () => { + step.progress(0, 0, 'Exporting to spreadsheet...') + try { + await exportPulls(exportSetting, allPulls) + await exportReviewResponses(exportSetting, allReviewResponses) + } catch (e) { + step.log.warn(`Export failed: ${e instanceof Error ? e.message : e}`) + } + }) }) } // Finalize await step.run('finalize', async () => { - step.progress(0, 0, 'Finalizing...') - const tenantDb = getTenantDb(orgId) - await sql`PRAGMA wal_checkpoint(TRUNCATE)`.execute(tenantDb) - clearOrgCache(orgId) + await runTimedStep(step, 'finalize', async () => { + step.progress(0, 0, 'Finalizing...') + const tenantDb = getTenantDb(orgId) + await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb) + clearOrgCache(orgId) + }) }) return { pullCount: allPulls.length } diff --git a/app/services/jobs/upsert-worker.ts b/app/services/jobs/upsert-worker.ts new file mode 100644 index 00000000..bae3e307 --- /dev/null +++ b/app/services/jobs/upsert-worker.ts @@ -0,0 +1,35 @@ +/** + * Worker thread for DB-heavy analyzed-data upsert. + * + * IMPORTANT: module.register() must be called before any ~/... imports. + */ +if (import.meta.url.endsWith('.ts')) { + const { register } = await import('node:module') + register('./path-alias-hooks.mjs', import.meta.url) +} + +import 'dotenv/config' +import type { Selectable } from 'kysely' +import { parentPort, workerData } from 'node:worker_threads' +import type { TenantDB } from '~/app/services/tenant-db.server' +import type { OrganizationId } from '~/app/types/organization' +import type { AnalyzedReview, AnalyzedReviewer } from '~/batch/github/types' + +interface WorkerInput { + organizationId: string + pulls: Selectable[] + reviews: AnalyzedReview[] + reviewers: AnalyzedReviewer[] +} + +const input = workerData as WorkerInput + +const { upsertAnalyzedData } = await import('~/batch/db') + +await upsertAnalyzedData(input.organizationId as OrganizationId, { + pulls: input.pulls, + reviews: input.reviews, + reviewers: input.reviewers, +}) + +parentPort?.postMessage({ ok: true as const }) diff --git a/app/services/tenant-db.server.ts b/app/services/tenant-db.server.ts index ae214c15..54c71978 100644 --- a/app/services/tenant-db.server.ts +++ b/app/services/tenant-db.server.ts @@ -17,6 +17,7 @@ export type { TenantDB } export type { OrganizationId } from '~/app/types/organization' const debug = createDebug('app:tenant-db') +const SQLITE_BUSY_TIMEOUT_MS = 5000 const tenantDbCache = new Map< string, @@ -39,6 +40,7 @@ function ensureTenantDb(organizationId: OrganizationId) { const filename = getTenantDbPath(organizationId) const database = new SQLite(filename, { fileMustExist: true }) database.pragma('journal_mode = WAL') + database.pragma(`busy_timeout = ${SQLITE_BUSY_TIMEOUT_MS}`) database.pragma('wal_autocheckpoint = 1000') const kysely = new Kysely({ diff --git a/batch/db/mutations.ts b/batch/db/mutations.ts index 919595bd..7e68a022 100644 --- a/batch/db/mutations.ts +++ b/batch/db/mutations.ts @@ -167,6 +167,51 @@ export async function upsertPullRequestReviewers( }) } +export async function batchReplacePullRequestReviewers( + organizationId: OrganizationId, + rows: AnalyzedReviewer[], + chunkSize = 100, +) { + if (rows.length === 0) return + + const tenantDb = getTenantDb(organizationId) + + await tenantDb.transaction().execute(async (trx) => { + for (let i = 0; i < rows.length; i += chunkSize) { + const chunk = rows.slice(i, i + chunkSize) + + for (const row of chunk) { + await trx + .deleteFrom('pullRequestReviewers') + .where('repositoryId', '=', row.repositoryId) + .where('pullRequestNumber', '=', row.pullRequestNumber) + .execute() + } + + const values = chunk.flatMap((row) => { + const seen = new Set() + return row.reviewers.flatMap((reviewer) => { + if (!reviewer.login) return [] + if (seen.has(reviewer.login)) return [] + seen.add(reviewer.login) + return [ + { + pullRequestNumber: row.pullRequestNumber, + repositoryId: row.repositoryId, + reviewer: reviewer.login, + requestedAt: reviewer.requestedAt, + }, + ] + }) + }) + + if (values.length > 0) { + await trx.insertInto('pullRequestReviewers').values(values).execute() + } + } + }) +} + /** * batch で発見した GitHub ユーザーを companyGithubUsers に自動登録する。 * isActive: 0(無効)で挿入し、既存レコードは一切上書きしない。 @@ -251,13 +296,6 @@ export async function upsertAnalyzedData( // Upsert reviewers logger.info('upsert reviewers started...', organizationId) - for (const reviewer of data.reviewers) { - await upsertPullRequestReviewers( - organizationId, - reviewer.repositoryId, - reviewer.pullRequestNumber, - reviewer.reviewers, - ) - } + await batchReplacePullRequestReviewers(organizationId, data.reviewers) logger.info('upsert reviewers completed.', organizationId) } diff --git a/package.json b/package.json index 8c0a3aa4..9dcfbacf 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,9 @@ "sideEffects": false, "type": "module", "scripts": { - "build": "run-s build:react-router build:job build:db-scripts", + "build": "run-s build:react-router build:job build:workers build:db-scripts", "build:job": "esbuild --platform=node --format=esm ./batch/job-scheduler.ts --outdir=build --bundle --packages=external", + "build:workers": "esbuild --platform=node --format=esm ./app/services/jobs/analyze-worker.ts ./app/services/jobs/upsert-worker.ts --outdir=build/workers --bundle --packages=external", "build:db-scripts": "esbuild --platform=node --format=esm ./db/apply-tenant-migrations.ts --outdir=build/db --bundle --packages=external", "build:react-router": "react-router build", "dev": "react-router dev",