Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,3 @@ lab/output/

# opensrc - source code for packages
opensrc/
AGENTS.md
42 changes: 42 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# AGENTS.md

Instructions for AI coding agents working with this codebase.

## Communication

- Do not present guesses as facts. Label uncertainty explicitly.
- If you were wrong, say so briefly and correct it directly.
- Do not argue defensively or justify an incorrect claim before correcting it.
- Keep the tone neutral and practical. Avoid condescension.
- Use polite Japanese (`です`/`ます`) when responding in Japanese.
- Do not use unnatural validation, praise, or evaluative filler such as telling the user their reaction is valid or that an idea is highly effective unless that judgment is necessary.
- Do not end responses by hinting at additional useful context without giving it. If adjacent context matters, include it briefly in the same response.
- Do not present a conclusion as if it were obvious only after the user suggests it. If you have a concrete conclusion, state it first and directly.
- If the user questions a factual claim, do not double down from memory. Check the code, docs, or other primary evidence before answering.
- Do not use vague hedge words to leave yourself escape hatches. State what is confirmed, state what is unconfirmed, and make the condition explicit when an answer depends on one.
- When blocked by environment limits, state the limit plainly and move to the best available workaround.

For all other project-specific guidance, conventions, and workflow details, see `CLAUDE.md`.

<!-- opensrc:start -->

## Source Code Reference

Source code for dependencies is available in `opensrc/` for deeper understanding of implementation details.

See `opensrc/sources.json` for the list of available packages and their versions.

Use this source code when you need to understand how a package works internally, not just its types/interface.

### Fetching Additional Source Code

To fetch source code for a package or repository you need to understand, run:

```bash
npx opensrc <package> # npm package (e.g., npx opensrc zod)
npx opensrc pypi:<package> # Python package (e.g., npx opensrc pypi:requests)
npx opensrc crates:<package> # Rust crate (e.g., npx opensrc crates:serde)
npx opensrc <owner>/<repo> # GitHub repo (e.g., npx opensrc vercel/ai)
```

<!-- opensrc:end -->
2 changes: 2 additions & 0 deletions app/services/durably.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createDurably, createDurablyHandler } from '@coji/durably'
import SQLite from 'better-sqlite3'
import { SqliteDialect } from 'kysely'
import { getSession, getUserOrganizations } from '~/app/libs/auth.server'
import { backfillJob } from '~/app/services/jobs/backfill.server'
import { crawlJob } from '~/app/services/jobs/crawl.server'
import { recalculateJob } from '~/app/services/jobs/recalculate.server'

Expand All @@ -17,6 +18,7 @@ function createDurablyInstance() {
leaseMs: 300_000, // 5 minutes (default 30s is too short for large orgs)
leaseRenewIntervalMs: 30_000,
jobs: {
backfill: backfillJob,
crawl: crawlJob,
recalculate: recalculateJob,
},
Expand Down
57 changes: 57 additions & 0 deletions app/services/jobs/backfill.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { defineJob } from '@coji/durably'
import { z } from 'zod'
import type { OrganizationId } from '~/app/types/organization'
import { getOrganization } from '~/batch/db/queries'
import { backfillRepo } from '~/batch/github/backfill-repo'

export const backfillJob = defineJob({
name: 'backfill',
input: z.object({
organizationId: z.string(),
files: z.boolean().default(false),
}),
output: z.object({
repositoryCount: z.number(),
}),
run: async (step, input) => {
const orgId = input.organizationId as OrganizationId

const organization = await step.run('load-organization', async () => {
step.progress(0, 0, 'Loading organization...')
const org = await getOrganization(orgId)
if (!org.integration?.privateToken) {
throw new Error('No integration or token configured')
}
return {
repositories: org.repositories,
}
})

const org = await getOrganization(orgId)
const token = org.integration?.privateToken
if (!token) {
throw new Error('No integration token')
}

const repoCount = organization.repositories.length

for (let i = 0; i < organization.repositories.length; i++) {
const repository = organization.repositories[i]
const repoLabel = `${repository.owner}/${repository.repo}`

await step.run(`backfill:${repoLabel}`, async () => {
step.progress(i + 1, repoCount, `Backfilling ${repoLabel}...`)
await backfillRepo(
orgId,
repository,
{ privateToken: token },
{
files: input.files,
},
)
})
}

return { repositoryCount: repoCount }
},
})
5 changes: 1 addition & 4 deletions app/services/jobs/crawl.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { defineJob } from '@coji/durably'
import { sql } from 'kysely'
import { z } from 'zod'
import { clearOrgCache } from '~/app/services/cache.server'
import { getTenantDb } from '~/app/services/tenant-db.server'
Expand Down Expand Up @@ -146,9 +145,7 @@ export const crawlJob = defineJob({
// Skip analyze if no updates (and not a refresh)
if (!input.refresh && updatedPrNumbers.size === 0) {
step.log.info('No updated PRs, skipping analyze.')
await step.run('finalize', async () => {
const tenantDb = getTenantDb(orgId)
await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb)
await step.run('finalize', () => {
clearOrgCache(orgId)
})
return { fetchedRepos: repoCount, pullCount: 0 }
Expand Down
4 changes: 2 additions & 2 deletions app/services/jobs/run-in-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ describe('getWorkerRuntime', () => {
})

test('uses ts worker files in development', () => {
const runtime = getWorkerRuntime('upsert-worker', 'development')
const runtime = getWorkerRuntime('analyze-worker', 'development')

expect(runtime.workerPath).toBe(
path.resolve(process.cwd(), 'app/services/jobs/upsert-worker.ts'),
path.resolve(process.cwd(), 'app/services/jobs/analyze-worker.ts'),
)
expect(runtime.workerOptions.execArgv).toEqual(['--import', 'tsx'])
})
Expand Down
17 changes: 1 addition & 16 deletions app/services/jobs/run-in-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,7 @@ export function runAnalyzeInWorker<T>(
): Promise<T> {
return runWorker<T>('analyze-worker', input, options)
}

interface UpsertWorkerInput<TPull, TReview, TReviewer> {
organizationId: string
pulls: TPull[]
reviews: TReview[]
reviewers: TReviewer[]
}

export function runUpsertInWorker<TPull, TReview, TReviewer>(
input: UpsertWorkerInput<TPull, TReview, TReviewer>,
options?: RunWorkerOptions,
): Promise<{ ok: true }> {
return runWorker<{ ok: true }>('upsert-worker', input, options)
}

type WorkerEntrypoint = 'analyze-worker' | 'upsert-worker'
type WorkerEntrypoint = 'analyze-worker'
const SQLITE_BUSY_RETRY_LIMIT = 3
const SQLITE_BUSY_RETRY_DELAYS_MS = [150, 400, 1000]

Expand Down
62 changes: 45 additions & 17 deletions app/services/jobs/shared-steps.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@
* crawl と recalculate ジョブで共有する analyze → upsert → classify → export → finalize ステップ群
*/
import type { StepContext } from '@coji/durably'
import { sql, type Selectable } from 'kysely'
import type { Selectable } from 'kysely'
import { clearOrgCache } from '~/app/services/cache.server'
import { getTenantDb, type TenantDB } from '~/app/services/tenant-db.server'
import type { TenantDB } from '~/app/services/tenant-db.server'
import type { OrganizationId } from '~/app/types/organization'
import {
exportPulls,
exportReviewResponses,
} from '~/batch/bizlogic/export-spreadsheet'
import { upsertAnalyzedData } from '~/batch/db'
import type {
AnalyzedReview,
AnalyzedReviewResponse,
AnalyzedReviewer,
} from '~/batch/github/types'
import { classifyPullRequests } from '~/batch/usecases/classify-pull-requests'
import { runAnalyzeInWorker, runUpsertInWorker } from './run-in-worker'
import type { SqliteBusyEvent } from './run-in-worker'
import { runAnalyzeInWorker } from './run-in-worker'

interface AnalyzeResult {
pulls: Selectable<TenantDB.PullRequests>[]
Expand Down Expand Up @@ -54,6 +56,23 @@ function formatDurationMs(durationMs: number) {
return `${(durationMs / 1000).toFixed(1)}s`
}

function summarizeBusyEvents(events: SqliteBusyEvent[]) {
if (events.length === 0) return null

const retries = events.filter((event) => !event.gaveUp)
const gaveUp = events.filter((event) => event.gaveUp)
const totalWaitMs = retries.reduce((sum, event) => sum + event.delayMs, 0)
const counts = new Map<string, number>()
for (const event of events) {
counts.set(event.entrypoint, (counts.get(event.entrypoint) ?? 0) + 1)
}
const byEntrypoint = [...counts.entries()]
.map(([entrypoint, count]) => `${entrypoint}:${count}`)
.join(', ')

return `sqlite busy events=${events.length} retries=${retries.length} gaveUp=${gaveUp.length} totalWait=${formatDurationMs(totalWaitMs)} byWorker=[${byEntrypoint}]`
}

async function runTimedStep<T>(
step: StepContext,
name: string,
Expand Down Expand Up @@ -90,6 +109,7 @@ export async function analyzeAndFinalizeSteps(
const allReviews: AnalyzedReview[] = []
const allReviewers: AnalyzedReviewer[] = []
const allReviewResponses: AnalyzedReviewResponse[] = []
const sqliteBusyEvents: SqliteBusyEvent[] = []

for (let i = 0; i < organization.repositories.length; i++) {
const repo = organization.repositories[i]
Expand All @@ -101,16 +121,21 @@ export async function analyzeAndFinalizeSteps(

const orgSetting = organization.organizationSetting
const prNumbers = filterPrNumbers?.get(repo.id)
return await runAnalyzeInWorker<AnalyzeResult>({
organizationId: orgId,
repositoryId: repo.id,
releaseDetectionMethod:
repo.releaseDetectionMethod ?? orgSetting.releaseDetectionMethod,
releaseDetectionKey:
repo.releaseDetectionKey ?? orgSetting.releaseDetectionKey,
excludedUsers: orgSetting.excludedUsers,
filterPrNumbers: prNumbers ? [...prNumbers] : undefined,
})
return await runAnalyzeInWorker<AnalyzeResult>(
{
organizationId: orgId,
repositoryId: repo.id,
releaseDetectionMethod:
repo.releaseDetectionMethod ?? orgSetting.releaseDetectionMethod,
releaseDetectionKey:
repo.releaseDetectionKey ?? orgSetting.releaseDetectionKey,
excludedUsers: orgSetting.excludedUsers,
filterPrNumbers: prNumbers ? [...prNumbers] : undefined,
},
{
onSqliteBusy: (event) => sqliteBusyEvents.push(event),
},
)
})
})
allPulls.push(...result.pulls)
Expand All @@ -124,8 +149,7 @@ export async function analyzeAndFinalizeSteps(
await step.run('upsert', async () => {
await runTimedStep(step, 'upsert', async () => {
step.progress(0, 0, 'Upserting to database...')
await runUpsertInWorker({
organizationId: orgId,
await upsertAnalyzedData(orgId, {
pulls: allPulls,
reviews: allReviews,
reviewers: allReviewers,
Expand Down Expand Up @@ -164,11 +188,15 @@ export async function analyzeAndFinalizeSteps(
await step.run('finalize', async () => {
await runTimedStep(step, 'finalize', async () => {
step.progress(0, 0, 'Finalizing...')
const tenantDb = getTenantDb(orgId)
await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(tenantDb)
clearOrgCache(orgId)
await Promise.resolve()
})
})

const busySummary = summarizeBusyEvents(sqliteBusyEvents)
if (busySummary) {
step.log.warn(busySummary)
}

return { pullCount: allPulls.length }
}
35 changes: 0 additions & 35 deletions app/services/jobs/upsert-worker.ts

This file was deleted.

2 changes: 2 additions & 0 deletions app/services/tenant-db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type { OrganizationId } from '~/app/types/organization'

const debug = createDebug('app:tenant-db')
const SQLITE_BUSY_TIMEOUT_MS = 5000
const SQLITE_WAL_JOURNAL_SIZE_LIMIT_BYTES = 64 * 1024 * 1024

const tenantDbCache = new Map<
string,
Expand All @@ -42,6 +43,7 @@ function ensureTenantDb(organizationId: OrganizationId) {
database.pragma('journal_mode = WAL')
database.pragma(`busy_timeout = ${SQLITE_BUSY_TIMEOUT_MS}`)
database.pragma('wal_autocheckpoint = 1000')
database.pragma(`journal_size_limit = ${SQLITE_WAL_JOURNAL_SIZE_LIMIT_BYTES}`)

const kysely = new Kysely<TenantDB.DB>({
dialect: new SqliteDialect({ database }),
Expand Down
2 changes: 1 addition & 1 deletion batch/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const backfill = command(
},
help: {
description:
'Re-fetch PR metadata to fill missing fields in raw data. Run recalculate after this.',
'Re-fetch PR metadata to fill missing fields in raw data. Runs as a durable job. Run recalculate after this.',
},
},
async (argv) => {
Expand Down
Loading