diff --git a/.gitignore b/.gitignore index 7b769d1b..db5c4cc7 100644 --- a/.gitignore +++ b/.gitignore @@ -57,4 +57,3 @@ lab/output/ # opensrc - source code for packages opensrc/ -AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..0b35df15 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,42 @@ +# AGENTS.md + +Instructions for AI coding agents working with this codebase. + +## Communication + +- Do not present guesses as facts. Label uncertainty explicitly. +- If you were wrong, say so briefly and correct it directly. +- Do not argue defensively or justify an incorrect claim before correcting it. +- Keep the tone neutral and practical. Avoid condescension. +- Use polite Japanese (`です`/`ます`) when responding in Japanese. +- Do not use unnatural validation, praise, or evaluative filler such as telling the user their reaction is valid or that an idea is highly effective unless that judgment is necessary. +- Do not end responses by hinting at additional useful context without giving it. If adjacent context matters, include it briefly in the same response. +- Do not present a conclusion as if it were obvious only after the user suggests it. If you have a concrete conclusion, state it first and directly. +- If the user questions a factual claim, do not double down from memory. Check the code, docs, or other primary evidence before answering. +- Do not use vague hedge words to leave yourself escape hatches. State what is confirmed, state what is unconfirmed, and make the condition explicit when an answer depends on one. +- When blocked by environment limits, state the limit plainly and move to the best available workaround. + +For all other project-specific guidance, conventions, and workflow details, see `CLAUDE.md`. + + + +## Source Code Reference + +Source code for dependencies is available in `opensrc/` for deeper understanding of implementation details. + +See `opensrc/sources.json` for the list of available packages and their versions. + +Use this source code when you need to understand how a package works internally, not just its types/interface. + +### Fetching Additional Source Code + +To fetch source code for a package or repository you need to understand, run: + +```bash +npx opensrc # npm package (e.g., npx opensrc zod) +npx opensrc pypi: # Python package (e.g., npx opensrc pypi:requests) +npx opensrc crates: # Rust crate (e.g., npx opensrc crates:serde) +npx opensrc / # GitHub repo (e.g., npx opensrc vercel/ai) +``` + + diff --git a/app/services/durably.server.ts b/app/services/durably.server.ts index cf1b4567..f8a9debb 100644 --- a/app/services/durably.server.ts +++ b/app/services/durably.server.ts @@ -2,6 +2,7 @@ import { createDurably, createDurablyHandler } from '@coji/durably' import SQLite from 'better-sqlite3' import { SqliteDialect } from 'kysely' import { getSession, getUserOrganizations } from '~/app/libs/auth.server' +import { backfillJob } from '~/app/services/jobs/backfill.server' import { crawlJob } from '~/app/services/jobs/crawl.server' import { recalculateJob } from '~/app/services/jobs/recalculate.server' @@ -17,6 +18,7 @@ function createDurablyInstance() { leaseMs: 300_000, // 5 minutes (default 30s is too short for large orgs) leaseRenewIntervalMs: 30_000, jobs: { + backfill: backfillJob, crawl: crawlJob, recalculate: recalculateJob, }, diff --git a/app/services/jobs/backfill.server.ts b/app/services/jobs/backfill.server.ts new file mode 100644 index 00000000..6cd0a253 --- /dev/null +++ b/app/services/jobs/backfill.server.ts @@ -0,0 +1,57 @@ +import { defineJob } from '@coji/durably' +import { z } from 'zod' +import type { OrganizationId } from '~/app/types/organization' +import { getOrganization } from '~/batch/db/queries' +import { backfillRepo } from '~/batch/github/backfill-repo' + +export const backfillJob = defineJob({ + name: 'backfill', + input: z.object({ + organizationId: z.string(), + files: z.boolean().default(false), + }), + output: z.object({ + repositoryCount: z.number(), + }), + run: async (step, input) => { + const orgId = input.organizationId as OrganizationId + + const organization = await step.run('load-organization', async () => { + step.progress(0, 0, 'Loading organization...') + const org = await getOrganization(orgId) + if (!org.integration?.privateToken) { + throw new Error('No integration or token configured') + } + return { + repositories: org.repositories, + } + }) + + const org = await getOrganization(orgId) + const token = org.integration?.privateToken + if (!token) { + throw new Error('No integration token') + } + + const repoCount = organization.repositories.length + + for (let i = 0; i < organization.repositories.length; i++) { + const repository = organization.repositories[i] + const repoLabel = `${repository.owner}/${repository.repo}` + + await step.run(`backfill:${repoLabel}`, async () => { + step.progress(i + 1, repoCount, `Backfilling ${repoLabel}...`) + await backfillRepo( + orgId, + repository, + { privateToken: token }, + { + files: input.files, + }, + ) + }) + } + + return { repositoryCount: repoCount } + }, +}) diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 2f3978da..d694e99d 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -1,5 +1,4 @@ import { defineJob } from '@coji/durably' -import { sql } from 'kysely' import { z } from 'zod' import { clearOrgCache } from '~/app/services/cache.server' import { getTenantDb } from '~/app/services/tenant-db.server' @@ -146,9 +145,7 @@ export const crawlJob = defineJob({ // Skip analyze if no updates (and not a refresh) if (!input.refresh && updatedPrNumbers.size === 0) { step.log.info('No updated PRs, skipping analyze.') - await step.run('finalize', async () => { - const tenantDb = getTenantDb(orgId) - await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb) + await step.run('finalize', () => { clearOrgCache(orgId) }) return { fetchedRepos: repoCount, pullCount: 0 } diff --git a/app/services/jobs/run-in-worker.test.ts b/app/services/jobs/run-in-worker.test.ts index a2838a3f..447c7463 100644 --- a/app/services/jobs/run-in-worker.test.ts +++ b/app/services/jobs/run-in-worker.test.ts @@ -13,10 +13,10 @@ describe('getWorkerRuntime', () => { }) test('uses ts worker files in development', () => { - const runtime = getWorkerRuntime('upsert-worker', 'development') + const runtime = getWorkerRuntime('analyze-worker', 'development') expect(runtime.workerPath).toBe( - path.resolve(process.cwd(), 'app/services/jobs/upsert-worker.ts'), + path.resolve(process.cwd(), 'app/services/jobs/analyze-worker.ts'), ) expect(runtime.workerOptions.execArgv).toEqual(['--import', 'tsx']) }) diff --git a/app/services/jobs/run-in-worker.ts b/app/services/jobs/run-in-worker.ts index 72aca7e2..a4c46388 100644 --- a/app/services/jobs/run-in-worker.ts +++ b/app/services/jobs/run-in-worker.ts @@ -34,22 +34,7 @@ export function runAnalyzeInWorker( ): Promise { return runWorker('analyze-worker', input, options) } - -interface UpsertWorkerInput { - organizationId: string - pulls: TPull[] - reviews: TReview[] - reviewers: TReviewer[] -} - -export function runUpsertInWorker( - input: UpsertWorkerInput, - options?: RunWorkerOptions, -): Promise<{ ok: true }> { - return runWorker<{ ok: true }>('upsert-worker', input, options) -} - -type WorkerEntrypoint = 'analyze-worker' | 'upsert-worker' +type WorkerEntrypoint = 'analyze-worker' const SQLITE_BUSY_RETRY_LIMIT = 3 const SQLITE_BUSY_RETRY_DELAYS_MS = [150, 400, 1000] diff --git a/app/services/jobs/shared-steps.server.ts b/app/services/jobs/shared-steps.server.ts index c0fa4a90..26d6d09b 100644 --- a/app/services/jobs/shared-steps.server.ts +++ b/app/services/jobs/shared-steps.server.ts @@ -2,21 +2,23 @@ * crawl と recalculate ジョブで共有する analyze → upsert → classify → export → finalize ステップ群 */ import type { StepContext } from '@coji/durably' -import { sql, type Selectable } from 'kysely' +import type { Selectable } from 'kysely' import { clearOrgCache } from '~/app/services/cache.server' -import { getTenantDb, type TenantDB } from '~/app/services/tenant-db.server' +import type { TenantDB } from '~/app/services/tenant-db.server' import type { OrganizationId } from '~/app/types/organization' import { exportPulls, exportReviewResponses, } from '~/batch/bizlogic/export-spreadsheet' +import { upsertAnalyzedData } from '~/batch/db' import type { AnalyzedReview, AnalyzedReviewResponse, AnalyzedReviewer, } from '~/batch/github/types' import { classifyPullRequests } from '~/batch/usecases/classify-pull-requests' -import { runAnalyzeInWorker, runUpsertInWorker } from './run-in-worker' +import type { SqliteBusyEvent } from './run-in-worker' +import { runAnalyzeInWorker } from './run-in-worker' interface AnalyzeResult { pulls: Selectable[] @@ -54,6 +56,23 @@ function formatDurationMs(durationMs: number) { return `${(durationMs / 1000).toFixed(1)}s` } +function summarizeBusyEvents(events: SqliteBusyEvent[]) { + if (events.length === 0) return null + + const retries = events.filter((event) => !event.gaveUp) + const gaveUp = events.filter((event) => event.gaveUp) + const totalWaitMs = retries.reduce((sum, event) => sum + event.delayMs, 0) + const counts = new Map() + for (const event of events) { + counts.set(event.entrypoint, (counts.get(event.entrypoint) ?? 0) + 1) + } + const byEntrypoint = [...counts.entries()] + .map(([entrypoint, count]) => `${entrypoint}:${count}`) + .join(', ') + + return `sqlite busy events=${events.length} retries=${retries.length} gaveUp=${gaveUp.length} totalWait=${formatDurationMs(totalWaitMs)} byWorker=[${byEntrypoint}]` +} + async function runTimedStep( step: StepContext, name: string, @@ -90,6 +109,7 @@ export async function analyzeAndFinalizeSteps( const allReviews: AnalyzedReview[] = [] const allReviewers: AnalyzedReviewer[] = [] const allReviewResponses: AnalyzedReviewResponse[] = [] + const sqliteBusyEvents: SqliteBusyEvent[] = [] for (let i = 0; i < organization.repositories.length; i++) { const repo = organization.repositories[i] @@ -101,16 +121,21 @@ export async function analyzeAndFinalizeSteps( const orgSetting = organization.organizationSetting const prNumbers = filterPrNumbers?.get(repo.id) - return await runAnalyzeInWorker({ - organizationId: orgId, - repositoryId: repo.id, - releaseDetectionMethod: - repo.releaseDetectionMethod ?? orgSetting.releaseDetectionMethod, - releaseDetectionKey: - repo.releaseDetectionKey ?? orgSetting.releaseDetectionKey, - excludedUsers: orgSetting.excludedUsers, - filterPrNumbers: prNumbers ? [...prNumbers] : undefined, - }) + return await runAnalyzeInWorker( + { + organizationId: orgId, + repositoryId: repo.id, + releaseDetectionMethod: + repo.releaseDetectionMethod ?? orgSetting.releaseDetectionMethod, + releaseDetectionKey: + repo.releaseDetectionKey ?? orgSetting.releaseDetectionKey, + excludedUsers: orgSetting.excludedUsers, + filterPrNumbers: prNumbers ? [...prNumbers] : undefined, + }, + { + onSqliteBusy: (event) => sqliteBusyEvents.push(event), + }, + ) }) }) allPulls.push(...result.pulls) @@ -124,8 +149,7 @@ export async function analyzeAndFinalizeSteps( await step.run('upsert', async () => { await runTimedStep(step, 'upsert', async () => { step.progress(0, 0, 'Upserting to database...') - await runUpsertInWorker({ - organizationId: orgId, + await upsertAnalyzedData(orgId, { pulls: allPulls, reviews: allReviews, reviewers: allReviewers, @@ -164,11 +188,15 @@ export async function analyzeAndFinalizeSteps( await step.run('finalize', async () => { await runTimedStep(step, 'finalize', async () => { step.progress(0, 0, 'Finalizing...') - const tenantDb = getTenantDb(orgId) - await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb) clearOrgCache(orgId) + await Promise.resolve() }) }) + const busySummary = summarizeBusyEvents(sqliteBusyEvents) + if (busySummary) { + step.log.warn(busySummary) + } + return { pullCount: allPulls.length } } diff --git a/app/services/jobs/upsert-worker.ts b/app/services/jobs/upsert-worker.ts deleted file mode 100644 index bae3e307..00000000 --- a/app/services/jobs/upsert-worker.ts +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Worker thread for DB-heavy analyzed-data upsert. - * - * IMPORTANT: module.register() must be called before any ~/... imports. - */ -if (import.meta.url.endsWith('.ts')) { - const { register } = await import('node:module') - register('./path-alias-hooks.mjs', import.meta.url) -} - -import 'dotenv/config' -import type { Selectable } from 'kysely' -import { parentPort, workerData } from 'node:worker_threads' -import type { TenantDB } from '~/app/services/tenant-db.server' -import type { OrganizationId } from '~/app/types/organization' -import type { AnalyzedReview, AnalyzedReviewer } from '~/batch/github/types' - -interface WorkerInput { - organizationId: string - pulls: Selectable[] - reviews: AnalyzedReview[] - reviewers: AnalyzedReviewer[] -} - -const input = workerData as WorkerInput - -const { upsertAnalyzedData } = await import('~/batch/db') - -await upsertAnalyzedData(input.organizationId as OrganizationId, { - pulls: input.pulls, - reviews: input.reviews, - reviewers: input.reviewers, -}) - -parentPort?.postMessage({ ok: true as const }) diff --git a/app/services/tenant-db.server.ts b/app/services/tenant-db.server.ts index 54c71978..ee8a9173 100644 --- a/app/services/tenant-db.server.ts +++ b/app/services/tenant-db.server.ts @@ -18,6 +18,7 @@ export type { OrganizationId } from '~/app/types/organization' const debug = createDebug('app:tenant-db') const SQLITE_BUSY_TIMEOUT_MS = 5000 +const SQLITE_WAL_JOURNAL_SIZE_LIMIT_BYTES = 64 * 1024 * 1024 const tenantDbCache = new Map< string, @@ -42,6 +43,7 @@ function ensureTenantDb(organizationId: OrganizationId) { database.pragma('journal_mode = WAL') database.pragma(`busy_timeout = ${SQLITE_BUSY_TIMEOUT_MS}`) database.pragma('wal_autocheckpoint = 1000') + database.pragma(`journal_size_limit = ${SQLITE_WAL_JOURNAL_SIZE_LIMIT_BYTES}`) const kysely = new Kysely({ dialect: new SqliteDialect({ database }), diff --git a/batch/cli.ts b/batch/cli.ts index 1072f577..2a4190c0 100644 --- a/batch/cli.ts +++ b/batch/cli.ts @@ -104,7 +104,7 @@ const backfill = command( }, help: { description: - 'Re-fetch PR metadata to fill missing fields in raw data. Run recalculate after this.', + 'Re-fetch PR metadata to fill missing fields in raw data. Runs as a durable job. Run recalculate after this.', }, }, async (argv) => { diff --git a/batch/commands/backfill.ts b/batch/commands/backfill.ts index f40889f2..dae27f53 100644 --- a/batch/commands/backfill.ts +++ b/batch/commands/backfill.ts @@ -1,6 +1,5 @@ import consola from 'consola' -import invariant from 'tiny-invariant' -import { backfillRepo } from '~/batch/github/backfill-repo' +import { durably } from '~/app/services/durably.server' import { requireOrganization } from './helpers' import { shutdown } from './shutdown' @@ -13,18 +12,34 @@ export async function backfillCommand(props: BackfillCommandProps) { const result = await requireOrganization(props.organizationId) if (!result) return - const { orgId, organization } = result - invariant(organization.integration, 'integration should related') + const { orgId } = result try { - for (const repository of organization.repositories) { - await backfillRepo(orgId, repository, organization.integration, { - files: props.files, - }) - } + consola.info( + `Starting backfill for ${orgId}${props.files ? ' (files only)' : ''}...`, + ) - consola.success('backfill completed. Run `recalculate` to apply changes.') + const { output } = await durably.jobs.backfill.triggerAndWait( + { organizationId: orgId, files: props.files ?? false }, + { + concurrencyKey: `backfill:${orgId}`, + labels: { organizationId: orgId }, + onProgress: (p) => { + if (p.message) consola.info(p.message) + }, + onLog: (l) => { + if (l.level === 'error') consola.error(l.message) + else if (l.level === 'warn') consola.warn(l.message) + else consola.info(l.message) + }, + }, + ) + + consola.success( + `Backfill completed for ${output.repositoryCount} repositories. Run \`recalculate\` to apply changes.`, + ) } finally { + await durably.stop() await shutdown() } } diff --git a/batch/db/mutations.ts b/batch/db/mutations.ts index 7e68a022..b3acd3e6 100644 --- a/batch/db/mutations.ts +++ b/batch/db/mutations.ts @@ -1,4 +1,5 @@ import type { Insertable, Selectable } from 'kysely' +import { setImmediate as yieldToEventLoop } from 'node:timers/promises' import { getTenantDb, type TenantDB } from '~/app/services/tenant-db.server' import type { OrganizationId } from '~/app/types/organization' import type { AnalyzedReview, AnalyzedReviewer } from '../github/types' @@ -63,7 +64,7 @@ export function upsertPullRequestReview( export async function batchUpsertPullRequests( organizationId: OrganizationId, rows: Insertable[], - chunkSize = 50, + chunkSize = 20, ) { if (rows.length === 0) return const tenantDb = getTenantDb(organizationId) @@ -101,13 +102,15 @@ export async function batchUpsertPullRequests( })), ) .execute() + + await yieldToEventLoop() } } export async function batchUpsertPullRequestReviews( organizationId: OrganizationId, rows: Insertable[], - chunkSize = 100, + chunkSize = 50, ) { if (rows.length === 0) return const tenantDb = getTenantDb(organizationId) @@ -126,6 +129,8 @@ export async function batchUpsertPullRequestReviews( })), ) .execute() + + await yieldToEventLoop() } } @@ -170,16 +175,15 @@ export async function upsertPullRequestReviewers( export async function batchReplacePullRequestReviewers( organizationId: OrganizationId, rows: AnalyzedReviewer[], - chunkSize = 100, + chunkSize = 25, ) { if (rows.length === 0) return const tenantDb = getTenantDb(organizationId) - await tenantDb.transaction().execute(async (trx) => { - for (let i = 0; i < rows.length; i += chunkSize) { - const chunk = rows.slice(i, i + chunkSize) - + for (let i = 0; i < rows.length; i += chunkSize) { + const chunk = rows.slice(i, i + chunkSize) + await tenantDb.transaction().execute(async (trx) => { for (const row of chunk) { await trx .deleteFrom('pullRequestReviewers') @@ -208,8 +212,10 @@ export async function batchReplacePullRequestReviewers( if (values.length > 0) { await trx.insertInto('pullRequestReviewers').values(values).execute() } - } - }) + }) + + await yieldToEventLoop() + } } /** diff --git a/package.json b/package.json index 9dcfbacf..89e672ec 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "scripts": { "build": "run-s build:react-router build:job build:workers build:db-scripts", "build:job": "esbuild --platform=node --format=esm ./batch/job-scheduler.ts --outdir=build --bundle --packages=external", - "build:workers": "esbuild --platform=node --format=esm ./app/services/jobs/analyze-worker.ts ./app/services/jobs/upsert-worker.ts --outdir=build/workers --bundle --packages=external", + "build:workers": "esbuild --platform=node --format=esm ./app/services/jobs/analyze-worker.ts --outdir=build/workers --bundle --packages=external", "build:db-scripts": "esbuild --platform=node --format=esm ./db/apply-tenant-migrations.ts --outdir=build/db --bundle --packages=external", "build:react-router": "react-router build", "dev": "react-router dev",