From 44ffbab163dedf540743b6218c675149893d4e98 Mon Sep 17 00:00:00 2001 From: Che <30403707+Che-Zhu@users.noreply.github.com> Date: Mon, 9 Mar 2026 20:41:45 +0800 Subject: [PATCH 1/2] feat: implement real github import pipeline --- .../_components/import-github-dialog.tsx | 72 +++++- lib/actions/github.ts | 2 + lib/actions/project.ts | 190 +++++++++++---- lib/events/sandbox/sandboxListener.ts | 11 + lib/jobs/database/databaseReconcile.ts | 12 +- lib/jobs/project-import/index.ts | 2 + .../project-import/projectImportReconcile.ts | 228 ++++++++++++++++++ lib/jobs/sandbox/sandboxReconcile.ts | 12 +- lib/repo/project-import.ts | 150 ++++++++++++ lib/startup/index.ts | 16 ++ lib/util/ttyd-exec.ts | 54 +++-- prisma/schema.prisma | 14 ++ 12 files changed, 675 insertions(+), 88 deletions(-) create mode 100644 lib/jobs/project-import/index.ts create mode 100644 lib/jobs/project-import/projectImportReconcile.ts create mode 100644 lib/repo/project-import.ts diff --git a/app/(dashboard)/projects/_components/import-github-dialog.tsx b/app/(dashboard)/projects/_components/import-github-dialog.tsx index 3a281d2..9e8e121 100644 --- a/app/(dashboard)/projects/_components/import-github-dialog.tsx +++ b/app/(dashboard)/projects/_components/import-github-dialog.tsx @@ -3,6 +3,8 @@ import { useCallback, useEffect, useState } from 'react' import { FaGithub } from 'react-icons/fa' import { MdRefresh } from 'react-icons/md' +import type { ProjectImportStatus } from '@prisma/client' +import { useRouter } from 'next/navigation' import { toast } from 'sonner' import { Button } from '@/components/ui/button' @@ -14,8 +16,9 @@ import { getInstallations, type GitHubRepo, } from '@/lib/actions/github' -import { createProject } from '@/lib/actions/project' +import { importProjectFromGitHub } from '@/lib/actions/project' import { env } from '@/lib/env' +import { GET } from '@/lib/fetch-client' type Step = 'check-github-app' | 'select-repo' @@ -25,26 +28,31 @@ interface ImportGitHubDialogProps { } export function ImportGitHubDialog({ open, onOpenChange }: ImportGitHubDialogProps) { + const router = useRouter() const [step, setStep] = useState('check-github-app') const [isLoading, setIsLoading] = useState(false) const [searchQuery, setSearchQuery] = useState('') // Step 1 state const [hasInstallation, setHasInstallation] = useState(false) + const [installationId, setInstallationId] = useState(null) // Step 2 state const [repos, setRepos] = useState([]) const [selectedRepo, setSelectedRepo] = useState(null) const [isCreating, setIsCreating] = useState(false) + const [importProjectId, setImportProjectId] = useState(null) const resetState = useCallback(() => { setStep('check-github-app') setIsLoading(true) setSearchQuery('') setHasInstallation(false) + setInstallationId(null) setRepos([]) setSelectedRepo(null) setIsCreating(false) + setImportProjectId(null) }, []) const checkIdentity = useCallback(async () => { @@ -53,8 +61,10 @@ export function ImportGitHubDialog({ open, onOpenChange }: ImportGitHubDialogPro // Directly check for GitHub App installation const installResult = await getInstallations() if (installResult.success && installResult.data.length > 0) { + const firstInstallationId = installResult.data[0].installationId setHasInstallation(true) - const repoResult = await getInstallationRepos(installResult.data[0].installationId.toString()) + setInstallationId(firstInstallationId) + const repoResult = await getInstallationRepos(firstInstallationId.toString()) if (repoResult.success) { setRepos(repoResult.data) setStep('select-repo') @@ -77,6 +87,41 @@ export function ImportGitHubDialog({ open, onOpenChange }: ImportGitHubDialogPro } }, [open, resetState, checkIdentity]) + useEffect(() => { + if (!open || !importProjectId) { + return + } + + const pollImportStatus = async () => { + try { + const project = await GET<{ importStatus: ProjectImportStatus }>( + `/api/projects/${importProjectId}` + ) + + if (project.importStatus === 'READY') { + toast.success('Repository imported successfully') + onOpenChange(false) + setImportProjectId(null) + router.refresh() + return + } + + if (project.importStatus === 'FAILED') { + toast.error('Repository import failed. An empty project was created instead.') + onOpenChange(false) + setImportProjectId(null) + router.refresh() + } + } catch (error) { + console.error('Failed to poll import status:', error) + } + } + + const timer = setInterval(pollImportStatus, 3000) + void pollImportStatus() + return () => clearInterval(timer) + }, [importProjectId, onOpenChange, open, router]) + const handleInstallApp = () => { const appName = env.NEXT_PUBLIC_GITHUB_APP_NAME if (!appName) { @@ -131,20 +176,27 @@ export function ImportGitHubDialog({ open, onOpenChange }: ImportGitHubDialogPro } const handleImport = async () => { - if (!selectedRepo) return + if (!selectedRepo || !installationId) return setIsCreating(true) try { - const result = await createProject(selectedRepo.name) + const result = await importProjectFromGitHub({ + installationId, + repoId: selectedRepo.id, + repoName: selectedRepo.name, + repoFullName: selectedRepo.full_name, + defaultBranch: selectedRepo.default_branch, + }) + if (result.success) { - toast.success(`Project "${selectedRepo.name}" created successfully!`) - onOpenChange(false) + toast.success(`Project "${selectedRepo.name}" is being imported...`) + setImportProjectId(result.data.id) } else { - toast.error(result.error || 'Failed to create project') + toast.error(result.error || 'Failed to import project') } } catch (error) { - console.error('Failed to create project:', error) - toast.error('Failed to create project') + console.error('Failed to import project:', error) + toast.error('Failed to import project') } finally { setIsCreating(false) } @@ -252,7 +304,7 @@ export function ImportGitHubDialog({ open, onOpenChange }: ImportGitHubDialogPro {isCreating ? ( <> - Creating... + Importing... ) : ( 'Import' diff --git a/lib/actions/github.ts b/lib/actions/github.ts index 54bb659..87a58ab 100644 --- a/lib/actions/github.ts +++ b/lib/actions/github.ts @@ -26,6 +26,7 @@ export interface GitHubRepo { id: number name: string full_name: string + default_branch: string description: string | null private: boolean language: string | null @@ -81,6 +82,7 @@ export async function getInstallationRepos( id: repo.id, name: repo.name, full_name: repo.full_name, + default_branch: repo.default_branch, description: repo.description, private: repo.private, language: repo.language, diff --git a/lib/actions/project.ts b/lib/actions/project.ts index 85780ff..3b85013 100644 --- a/lib/actions/project.ts +++ b/lib/actions/project.ts @@ -7,7 +7,7 @@ * instead of API Routes directly. */ -import type { Project } from '@prisma/client' +import type { Project, ProjectImportStatus } from '@prisma/client' import { auth } from '@/lib/auth' import { EnvironmentCategory } from '@/lib/const' @@ -16,6 +16,8 @@ import { getK8sServiceForUser } from '@/lib/k8s/k8s-service-helper' import { KubernetesUtils } from '@/lib/k8s/kubernetes-utils' import { VERSIONS } from '@/lib/k8s/versions' import { logger as baseLogger } from '@/lib/logger' +import { getInstallationByGitHubId } from '@/lib/repo/github' +import { listInstallationRepos } from '@/lib/services/github-app' import { generateRandomString } from '@/lib/util/common' import type { ActionResult } from './types' @@ -54,39 +56,39 @@ function validateProjectName(name: string): { valid: boolean; error?: string } { return { valid: true } } -/** - * Create a new project with database and sandbox. - * - * @param name - Project name - * @param description - Optional project description - */ -export async function createProject( - name: string, +type CreateProjectWithSandboxOptions = { + userId: string + name: string description?: string -): Promise> { - const session = await auth() - - if (!session) { - return { success: false, error: 'Unauthorized' } - } - - // Validate project name format - const nameValidation = validateProjectName(name) - if (!nameValidation.valid) { - return { success: false, error: nameValidation.error || 'Invalid project name format' } + importData?: { + importStatus: ProjectImportStatus + githubAppInstallationId: string + githubRepoId: number + githubRepoFullName: string + githubRepoDefaultBranch?: string } +} - logger.info(`Creating project: ${name} for user: ${session.user.id}`) +/** + * Shared project creation flow used by both "New Project" and "Import from GitHub". + * Creates Project + Sandbox + required environment variables in one transaction. + */ +async function createProjectWithSandbox({ + userId, + name, + description, + importData, +}: CreateProjectWithSandboxOptions): Promise> { + logger.info(`Creating project: ${name} for user: ${userId}`) - // Get K8s service for user let k8sService let namespace try { - k8sService = await getK8sServiceForUser(session.user.id) + k8sService = await getK8sServiceForUser(userId) namespace = k8sService.getDefaultNamespace() } catch (error) { if (error instanceof Error && error.message.includes('does not have KUBECONFIG configured')) { - logger.warn(`Project creation failed - missing kubeconfig for user: ${session.user.id}`) + logger.warn(`Project creation failed - missing kubeconfig for user: ${userId}`) return { success: false, error: 'Please configure your kubeconfig before creating a project', @@ -95,28 +97,31 @@ export async function createProject( throw error } - // Generate K8s compatible names const k8sProjectName = KubernetesUtils.toK8sProjectName(name) const randomSuffix = KubernetesUtils.generateRandomString() - const ttydAuthToken = generateRandomString(24) // 24 chars = ~143 bits entropy for terminal auth - const fileBrowserUsername = `fb-${randomSuffix}` // filebrowser username - const fileBrowserPassword = generateRandomString(16) // 16 char random password + const ttydAuthToken = generateRandomString(24) + const fileBrowserUsername = `fb-${randomSuffix}` + const fileBrowserPassword = generateRandomString(16) const sandboxName = `${k8sProjectName}-${randomSuffix}` - // Create project with sandbox in a transaction const result = await prisma.$transaction( async (tx) => { - // 1. Create Project with status CREATING const project = await tx.project.create({ data: { name, description, - userId: session.user.id, + userId, status: 'CREATING', + importStatus: importData?.importStatus ?? 'READY', + githubAppInstallationId: importData?.githubAppInstallationId, + githubRepoId: importData?.githubRepoId, + githubRepoFullName: importData?.githubRepoFullName, + githubRepoDefaultBranch: importData?.githubRepoDefaultBranch, + importError: null, + importLockedUntil: null, }, }) - // 2. Create Sandbox record - lockedUntil is null so reconcile job can process immediately const sandbox = await tx.sandbox.create({ data: { projectId: project.id, @@ -124,8 +129,7 @@ export async function createProject( k8sNamespace: namespace, sandboxName: sandboxName, status: 'CREATING', - lockedUntil: null, // Unlocked - ready for reconcile job to process - // Resource configuration from versions + lockedUntil: null, runtimeImage: VERSIONS.RUNTIME_IMAGE, cpuRequest: VERSIONS.RESOURCES.SANDBOX.requests.cpu, cpuLimit: VERSIONS.RESOURCES.SANDBOX.limits.cpu, @@ -134,19 +138,17 @@ export async function createProject( }, }) - // 3. Create Environment record for ttyd access token - const ttydEnv = await tx.environment.create({ + await tx.environment.create({ data: { projectId: project.id, key: 'TTYD_ACCESS_TOKEN', value: ttydAuthToken, category: EnvironmentCategory.TTYD, - isSecret: true, // Mark as secret since it's an access token + isSecret: true, }, }) - // 4. Create Environment records for filebrowser credentials - const fileBrowserUsernameEnv = await tx.environment.create({ + await tx.environment.create({ data: { projectId: project.id, key: 'FILE_BROWSER_USERNAME', @@ -156,32 +158,116 @@ export async function createProject( }, }) - const fileBrowserPasswordEnv = await tx.environment.create({ + await tx.environment.create({ data: { projectId: project.id, key: 'FILE_BROWSER_PASSWORD', value: fileBrowserPassword, category: EnvironmentCategory.FILE_BROWSER, - isSecret: true, // Mark as secret since it's a password + isSecret: true, }, }) - return { - project, - sandbox, - ttydEnv, - fileBrowserUsernameEnv, - fileBrowserPasswordEnv, - } + return { project, sandbox } }, { timeout: 20000, } ) - logger.info( - `Project created: ${result.project.id} with sandbox: ${result.sandbox.id}` - ) - + logger.info(`Project created: ${result.project.id} with sandbox: ${result.sandbox.id}`) return { success: true, data: result.project } } + +/** + * Create a new project with database and sandbox. + * + * @param name - Project name + * @param description - Optional project description + */ +export async function createProject( + name: string, + description?: string +): Promise> { + const session = await auth() + + if (!session) { + return { success: false, error: 'Unauthorized' } + } + + // Validate project name format + const nameValidation = validateProjectName(name) + if (!nameValidation.valid) { + return { success: false, error: nameValidation.error || 'Invalid project name format' } + } + + return createProjectWithSandbox({ + userId: session.user.id, + name, + description, + }) +} + +export interface ImportProjectPayload { + installationId: number + repoId: number + repoName: string + repoFullName: string + defaultBranch: string + description?: string +} + +/** + * Create project in import mode. This only creates project + sandbox metadata and returns immediately. + * The background import reconcile job performs the actual clone when sandbox becomes RUNNING. + */ +export async function importProjectFromGitHub( + payload: ImportProjectPayload +): Promise> { + const session = await auth() + + if (!session) { + return { success: false, error: 'Unauthorized' } + } + + if (!payload.repoName || !payload.repoFullName || !payload.defaultBranch) { + return { success: false, error: 'Repository metadata is required' } + } + + const nameValidation = validateProjectName(payload.repoName) + if (!nameValidation.valid) { + return { success: false, error: nameValidation.error || 'Invalid project name format' } + } + + const installation = await getInstallationByGitHubId(payload.installationId) + if (!installation || installation.userId !== session.user.id) { + return { success: false, error: 'Installation not found' } + } + + try { + const repos = await listInstallationRepos(installation.installationId) + const matchedRepo = repos.find( + (repo) => repo.id === payload.repoId && repo.full_name === payload.repoFullName + ) + + if (!matchedRepo) { + return { success: false, error: 'Repository not found in selected installation' } + } + } catch (error) { + logger.error(`Failed to verify repository for import: ${error}`) + return { success: false, error: 'Failed to verify repository access' } + } + + return createProjectWithSandbox({ + userId: session.user.id, + name: payload.repoName, + description: payload.description, + importData: { + importStatus: 'PENDING', + githubAppInstallationId: installation.id, + githubRepoId: payload.repoId, + githubRepoFullName: payload.repoFullName, + githubRepoDefaultBranch: payload.defaultBranch, + }, + }) +} diff --git a/lib/events/sandbox/sandboxListener.ts b/lib/events/sandbox/sandboxListener.ts index b456f9d..269ce86 100644 --- a/lib/events/sandbox/sandboxListener.ts +++ b/lib/events/sandbox/sandboxListener.ts @@ -1,3 +1,4 @@ +import { triggerProjectImportForProject } from '@/lib/jobs/project-import' import { getK8sServiceForUser } from '@/lib/k8s/k8s-service-helper' import { logger as baseLogger } from '@/lib/logger' import { getProjectEnvironments } from '@/lib/repo/environment' @@ -117,6 +118,7 @@ async function handleStartSandbox(payload: SandboxEventPayload): Promise { await updateSandboxStatus(sandbox.id, 'RUNNING') await projectStatusReconcile(project.id) logger.info(`Sandbox ${sandbox.id} is now RUNNING`) + void triggerImportOnSandboxRunning(project.id) } else { logger.info(`Sandbox ${sandbox.id} is still starting (K8s status: ${k8sStatus})`) // Keep status as STARTING, may need to poll again @@ -306,6 +308,7 @@ async function handleUpdateSandbox(payload: SandboxEventPayload): Promise await updateSandboxStatus(sandbox.id, 'RUNNING') await projectStatusReconcile(project.id) logger.info(`Sandbox ${sandbox.id} is now RUNNING`) + void triggerImportOnSandboxRunning(project.id) } else if (k8sStatus === 'STARTING') { // Pod is restarting due to env var changes - change status to STARTING await updateSandboxStatus(sandbox.id, 'STARTING') @@ -349,3 +352,11 @@ export function registerSandboxListeners(): void { // Auto-register listeners when module is imported registerSandboxListeners() + +async function triggerImportOnSandboxRunning(projectId: string): Promise { + try { + await triggerProjectImportForProject(projectId) + } catch (error) { + logger.error(`Failed to trigger project import for ${projectId}: ${error}`) + } +} diff --git a/lib/jobs/database/databaseReconcile.ts b/lib/jobs/database/databaseReconcile.ts index 017d3f5..d55fd9b 100644 --- a/lib/jobs/database/databaseReconcile.ts +++ b/lib/jobs/database/databaseReconcile.ts @@ -8,18 +8,22 @@ const logger = baseLogger.child({ module: 'lib/jobs/database/databaseReconcile' // Maximum number of databases to process per reconcile cycle const MAX_DATABASES_PER_CYCLE = parseInt(process.env.MAX_DATABASES_PER_RECONCILE || '10', 10) +const RECONCILE_INTERVAL_SECONDS = parseInt( + process.env.DATABASE_RECONCILE_INTERVAL_SECONDS || '10', + 10 +) /** * Database reconciliation job - * Runs every 3 seconds to find databases in transition states and emit events + * Runs on interval to find databases in transition states and emit events */ export function startDatabaseReconcileJob() { logger.info('Starting database reconcile job') logger.info(`Lock duration: ${LOCK_DURATION_SECONDS} seconds`) logger.info(`Max databases per cycle: ${MAX_DATABASES_PER_CYCLE}`) + logger.info(`Reconcile interval: ${RECONCILE_INTERVAL_SECONDS} seconds`) - // Run every 3 seconds - const job = new Cron('*/3 * * * * *', async () => { + const job = new Cron(`*/${RECONCILE_INTERVAL_SECONDS} * * * * *`, async () => { try { await reconcileDatabases() } catch (error) { @@ -27,7 +31,7 @@ export function startDatabaseReconcileJob() { } }) - logger.info(' Database reconcile job started (every 3 seconds)') + logger.info(`✅ Database reconcile job started (every ${RECONCILE_INTERVAL_SECONDS} seconds)`) return job } diff --git a/lib/jobs/project-import/index.ts b/lib/jobs/project-import/index.ts new file mode 100644 index 0000000..c0cc926 --- /dev/null +++ b/lib/jobs/project-import/index.ts @@ -0,0 +1,2 @@ +export { startProjectImportReconcileJob } from './projectImportReconcile' +export { triggerProjectImportForProject } from './projectImportReconcile' diff --git a/lib/jobs/project-import/projectImportReconcile.ts b/lib/jobs/project-import/projectImportReconcile.ts new file mode 100644 index 0000000..5910e97 --- /dev/null +++ b/lib/jobs/project-import/projectImportReconcile.ts @@ -0,0 +1,228 @@ +import { Cron } from 'croner' + +import { logger as baseLogger } from '@/lib/logger' +import { + acquireAndLockImportProjects, + getImportProjectById, + type ImportProjectWithRelations, + LOCK_DURATION_SECONDS, + setProjectImportState, + tryClaimImportExecutionLock, +} from '@/lib/repo/project-import' +import { getInstallationToken } from '@/lib/services/github-app' +import { getSandboxTtydContext } from '@/lib/util/ttyd-context' +import { execCommand } from '@/lib/util/ttyd-exec' + +const logger = baseLogger.child({ module: 'lib/jobs/project-import/projectImportReconcile' }) + +const MAX_IMPORTS_PER_CYCLE = parseInt(process.env.MAX_PROJECT_IMPORTS_PER_RECONCILE || '5', 10) +const RECONCILE_INTERVAL_SECONDS = parseInt( + process.env.PROJECT_IMPORT_RECONCILE_INTERVAL_SECONDS || '10', + 10 +) +const CLONING_LOCK_DURATION_SECONDS = 300 +const CLONE_MAX_ATTEMPTS = parseInt(process.env.PROJECT_IMPORT_CLONE_MAX_ATTEMPTS || '3', 10) +const IMPORT_EXEC_TIMEOUT_MS = parseInt(process.env.PROJECT_IMPORT_EXEC_TIMEOUT_MS || '90000', 10) + +type ImportTriggerSource = 'reconcile' | 'sandbox-running-event' + +export function startProjectImportReconcileJob() { + logger.info('Starting project import reconcile job') + logger.info(`Lock duration: ${LOCK_DURATION_SECONDS} seconds`) + logger.info(`Max projects per cycle: ${MAX_IMPORTS_PER_CYCLE}`) + logger.info(`Reconcile interval: ${RECONCILE_INTERVAL_SECONDS} seconds`) + + const job = new Cron(`*/${RECONCILE_INTERVAL_SECONDS} * * * * *`, async () => { + try { + await reconcileProjectImports() + } catch (error) { + logger.error(`Project import reconcile job error: ${error}`) + } + }) + + logger.info( + `✅ Project import reconcile job started (every ${RECONCILE_INTERVAL_SECONDS} seconds)` + ) + return job +} + +async function reconcileProjectImports() { + const projects = await acquireAndLockImportProjects(MAX_IMPORTS_PER_CYCLE) + + if (projects.length === 0) { + return + } + + logger.info(`Acquired ${projects.length} projects for import reconcile`) + + for (const project of projects) { + try { + await handleSingleProjectImport(project, 'reconcile') + } catch (error) { + logger.error(`Failed to process project import for ${project.id}: ${error}`) + await setProjectImportState(project.id, { + importStatus: 'FAILED', + importError: String(error), + importLockedUntil: null, + }) + } + } +} + +export async function triggerProjectImportForProject(projectId: string): Promise { + const project = await getImportProjectById(projectId) + if (!project) { + return + } + + if (project.importStatus !== 'PENDING' && project.importStatus !== 'CLONING') { + return + } + + const sandbox = project.sandboxes[0] + if (!sandbox || sandbox.status !== 'RUNNING') { + return + } + + const claimed = await tryClaimImportExecutionLock(project.id, CLONING_LOCK_DURATION_SECONDS) + if (!claimed) { + return + } + + const projectAfterLock = await getImportProjectById(project.id) + if (!projectAfterLock) { + return + } + + await handleSingleProjectImport(projectAfterLock, 'sandbox-running-event') +} + +async function handleSingleProjectImport( + project: ImportProjectWithRelations, + source: ImportTriggerSource +): Promise { + if (!project.githubRepoFullName || !project.githubAppInstallation || !project.githubRepoDefaultBranch) { + await setProjectImportState(project.id, { + importStatus: 'FAILED', + importError: 'Missing repository metadata for import', + importLockedUntil: null, + }) + return + } + + const sandbox = project.sandboxes[0] + if (!sandbox) { + await setProjectImportState(project.id, { + importStatus: 'FAILED', + importError: 'Sandbox not found', + importLockedUntil: null, + }) + return + } + + if (sandbox.status !== 'RUNNING') { + if (project.importStatus === 'CLONING') { + await setProjectImportState(project.id, { + importStatus: 'PENDING', + importLockedUntil: null, + }) + } + return + } + + if (source === 'reconcile') { + await setProjectImportState(project.id, { + importStatus: 'CLONING', + importError: null, + importLockedUntil: new Date(Date.now() + CLONING_LOCK_DURATION_SECONDS * 1000), + }) + } + + const cloneResult = await cloneRepoToSandboxWithRetry(project, sandbox.id) + if (cloneResult.success) { + await setProjectImportState(project.id, { + importStatus: 'READY', + importError: null, + importLockedUntil: null, + }) + return + } + + await setProjectImportState(project.id, { + importStatus: 'FAILED', + importError: cloneResult.error, + importLockedUntil: null, + }) +} + +async function cloneRepoToSandboxWithRetry( + project: ImportProjectWithRelations, + sandboxId: string +): Promise<{ success: true } | { success: false; error: string }> { + let lastError = 'Unknown clone error' + + for (let attempt = 1; attempt <= CLONE_MAX_ATTEMPTS; attempt++) { + const attemptStart = Date.now() + try { + logger.info(`Import clone attempt ${attempt}/${CLONE_MAX_ATTEMPTS} for project ${project.id}`) + await cloneRepoToSandbox(project, sandboxId) + logger.info( + `Import clone succeeded on attempt ${attempt} for project ${project.id} in ${Date.now() - attemptStart}ms` + ) + return { success: true } + } catch (error) { + lastError = error instanceof Error ? error.message : String(error) + logger.warn( + `Import clone attempt ${attempt}/${CLONE_MAX_ATTEMPTS} failed for project ${project.id} after ${Date.now() - attemptStart}ms: ${lastError}` + ) + if (attempt < CLONE_MAX_ATTEMPTS) { + await sleep(1500) + } + } + } + + return { + success: false, + error: `Clone failed after ${CLONE_MAX_ATTEMPTS} attempts: ${lastError}`, + } +} + +async function cloneRepoToSandbox(project: ImportProjectWithRelations, sandboxId: string): Promise { + const installationToken = await getInstallationToken(project.githubAppInstallation!.installationId) + const { ttyd } = await getSandboxTtydContext(sandboxId, project.user.id) + + const authUrl = `https://x-access-token:${installationToken}@github.com/${project.githubRepoFullName}.git` + const escapedAuthUrl = shellEscapeSingleQuoted(authUrl) + const escapedBranch = shellEscapeSingleQuoted(project.githubRepoDefaultBranch!) + const repoName = project.githubRepoFullName.split('/').at(-1) || 'repo' + const importDirName = repoName.replace(/[^a-zA-Z0-9._-]/g, '-') + const uniqueImportDirName = `${importDirName}-${project.id}` + const escapedImportDirName = shellEscapeSingleQuoted(uniqueImportDirName) + + const cloneCommand = [ + 'set -e', + 'mkdir -p import', + `target_dir='import/${escapedImportDirName}'`, + 'tmp_dir=$(mktemp -d)', + `GIT_TERMINAL_PROMPT=0 GIT_ASKPASS=/bin/echo git clone --depth 1 --branch '${escapedBranch}' '${escapedAuthUrl}' "$tmp_dir/repo"`, + 'mkdir -p "$target_dir"', + 'cp -a "$tmp_dir/repo"/. "$target_dir"/', + 'rm -rf "$tmp_dir"', + ].join(' && ') + + await execCommand( + ttyd.baseUrl, + ttyd.accessToken, + cloneCommand, + IMPORT_EXEC_TIMEOUT_MS, + ttyd.authorization + ) +} + +function shellEscapeSingleQuoted(input: string): string { + return input.replace(/'/g, `'\\''`) +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/lib/jobs/sandbox/sandboxReconcile.ts b/lib/jobs/sandbox/sandboxReconcile.ts index 9f70eea..fc2a4b7 100644 --- a/lib/jobs/sandbox/sandboxReconcile.ts +++ b/lib/jobs/sandbox/sandboxReconcile.ts @@ -8,18 +8,22 @@ const logger = baseLogger.child({ module: 'lib/jobs/sandbox/sandboxReconcile' }) // Maximum number of sandboxes to process per reconcile cycle const MAX_SANDBOXES_PER_CYCLE = parseInt(process.env.MAX_SANDBOXES_PER_RECONCILE || '10', 10) +const RECONCILE_INTERVAL_SECONDS = parseInt( + process.env.SANDBOX_RECONCILE_INTERVAL_SECONDS || '10', + 10 +) /** * Sandbox reconciliation job - * Runs every 3 seconds to find sandboxes in transition states and emit events + * Runs on interval to find sandboxes in transition states and emit events */ export function startSandboxReconcileJob() { logger.info('Starting sandbox reconcile job') logger.info(`Lock duration: ${LOCK_DURATION_SECONDS} seconds`) logger.info(`Max sandboxes per cycle: ${MAX_SANDBOXES_PER_CYCLE}`) + logger.info(`Reconcile interval: ${RECONCILE_INTERVAL_SECONDS} seconds`) - // Run every 3 seconds - const job = new Cron('*/3 * * * * *', async () => { + const job = new Cron(`*/${RECONCILE_INTERVAL_SECONDS} * * * * *`, async () => { try { await reconcileSandboxes() } catch (error) { @@ -27,7 +31,7 @@ export function startSandboxReconcileJob() { } }) - logger.info('✅ Sandbox reconcile job started (every 3 seconds)') + logger.info(`✅ Sandbox reconcile job started (every ${RECONCILE_INTERVAL_SECONDS} seconds)`) return job } diff --git a/lib/repo/project-import.ts b/lib/repo/project-import.ts new file mode 100644 index 0000000..6753c56 --- /dev/null +++ b/lib/repo/project-import.ts @@ -0,0 +1,150 @@ +import type { Project } from '@prisma/client' + +import { prisma } from '@/lib/db' +import { logger as baseLogger } from '@/lib/logger' + +const logger = baseLogger.child({ module: 'lib/repo/project-import' }) + +const LOCK_DURATION_SECONDS = parseInt(process.env.PROJECT_IMPORT_LOCK_DURATION_SECONDS || '5', 10) + +type ImportProjectWithRelations = Project & { + user: { + id: string + } + githubAppInstallation: { + installationId: number + } | null + sandboxes: Array<{ + id: string + status: string + }> +} + +const importProjectWithRelationsInclude = { + user: { + select: { + id: true, + }, + }, + githubAppInstallation: { + select: { + installationId: true, + }, + }, + sandboxes: { + select: { + id: true, + status: true, + }, + orderBy: { + createdAt: 'asc', + }, + }, +} as const + +export async function acquireAndLockImportProjects( + limit: number = 10, + baseLockSeconds: number = LOCK_DURATION_SECONDS, + randomOffsetSeconds: number = 2 +): Promise { + try { + const now = new Date() + const randomSeconds = Math.random() * randomOffsetSeconds + const lockUntil = new Date(now.getTime() + (baseLockSeconds + randomSeconds) * 1000) + + const lockedProjects = await prisma.$transaction(async (tx) => { + return await tx.$queryRaw` + UPDATE "Project" + SET + "importLockedUntil" = ${lockUntil}, + "updatedAt" = NOW() + WHERE "id" IN ( + SELECT "id" + FROM "Project" + WHERE "importStatus" IN ('PENDING', 'CLONING') + AND ("importLockedUntil" IS NULL OR "importLockedUntil" <= ${now}) + ORDER BY "updatedAt" ASC + LIMIT ${limit} + FOR UPDATE SKIP LOCKED + ) + RETURNING * + ` + }) + + if (lockedProjects.length === 0) { + return [] + } + + const projectsWithRelations = await prisma.project.findMany({ + where: { + id: { + in: lockedProjects.map((project) => project.id), + }, + }, + include: importProjectWithRelationsInclude, + }) + + return projectsWithRelations + } catch (error) { + logger.error(`Error acquiring and locking import projects: ${error}`) + return [] + } +} + +export async function getImportProjectById(projectId: string): Promise { + return await prisma.project.findUnique({ + where: { + id: projectId, + }, + include: importProjectWithRelationsInclude, + }) +} + +export async function tryClaimImportExecutionLock( + projectId: string, + lockSeconds: number +): Promise { + const now = new Date() + const lockUntil = new Date(now.getTime() + lockSeconds * 1000) + + const result = await prisma.project.updateMany({ + where: { + id: projectId, + importStatus: { + in: ['PENDING', 'CLONING'], + }, + OR: [{ importLockedUntil: null }, { importLockedUntil: { lte: now } }], + }, + data: { + importStatus: 'CLONING', + importError: null, + importLockedUntil: lockUntil, + updatedAt: new Date(), + }, + }) + + return result.count > 0 +} + +export async function setProjectImportState( + projectId: string, + data: { + importStatus: 'PENDING' | 'CLONING' | 'READY' | 'FAILED' + importError?: string | null + importLockedUntil?: Date | null + } +): Promise { + await prisma.project.update({ + where: { id: projectId }, + data: { + importStatus: data.importStatus, + importError: data.importError ?? null, + importLockedUntil: + data.importLockedUntil === undefined ? undefined : data.importLockedUntil, + updatedAt: new Date(), + }, + }) +} + +export { LOCK_DURATION_SECONDS } +export type { ImportProjectWithRelations } diff --git a/lib/startup/index.ts b/lib/startup/index.ts index dcc8cab..3523368 100644 --- a/lib/startup/index.ts +++ b/lib/startup/index.ts @@ -77,10 +77,17 @@ async function startBackgroundJobs() { const sandboxJob = startSandboxReconcileJob() logger.info('✅ Sandbox reconcile job started') + // Start project import reconciliation job + // Runs every 3 seconds to process GitHub imports when sandboxes become RUNNING + const { startProjectImportReconcileJob } = await import('@/lib/jobs/project-import') + const projectImportJob = startProjectImportReconcileJob() + logger.info('✅ Project import reconcile job started') + // Store job references for potential cleanup (optional) // In most cases, these jobs will run for the lifetime of the server globalThis.__databaseReconcileJob = databaseJob globalThis.__sandboxReconcileJob = sandboxJob + globalThis.__projectImportReconcileJob = projectImportJob logger.info('All background jobs started successfully') } catch (error) { @@ -116,6 +123,13 @@ export function cleanup() { logger.info('✅ Sandbox reconcile job stopped') } + // Stop project import reconcile job if it exists + if (globalThis.__projectImportReconcileJob) { + globalThis.__projectImportReconcileJob.stop() + globalThis.__projectImportReconcileJob = undefined + logger.info('✅ Project import reconcile job stopped') + } + logger.info('✅ Cleanup completed') } catch (error) { logger.error(`❌ Cleanup failed: ${error}`) @@ -127,4 +141,6 @@ declare global { var __databaseReconcileJob: Cron | undefined var __sandboxReconcileJob: Cron | undefined + + var __projectImportReconcileJob: Cron | undefined } diff --git a/lib/util/ttyd-exec.ts b/lib/util/ttyd-exec.ts index d4ac941..d789cd1 100644 --- a/lib/util/ttyd-exec.ts +++ b/lib/util/ttyd-exec.ts @@ -261,11 +261,13 @@ export async function executeTtydCommand(options: TtydExecOptions): Promise { let outputBuffer = '' let isResolved = false let timeoutHandle: ReturnType | null = null + let fallbackSendHandle: ReturnType | null = null let ws: UnifiedWebSocket | null = null const cleanup = () => { @@ -273,6 +275,10 @@ export async function executeTtydCommand(options: TtydExecOptions): Promise { - if (!ws || ws.readyState !== WS_OPEN) return - - // Send the wrapped command - const wrappedCommand = wrapCommand(command, markerId) - const inputPayload = new Uint8Array(wrappedCommand.length * 3 + 1) - inputPayload[0] = ClientCommand.INPUT.charCodeAt(0) - const stats = textEncoder.encodeInto(wrappedCommand, inputPayload.subarray(1)) - ws.send(inputPayload.subarray(0, (stats.written as number) + 1)) - - // Send Enter key - const enterPayload = new Uint8Array(2) - enterPayload[0] = ClientCommand.INPUT.charCodeAt(0) - enterPayload[1] = 0x0d // Carriage return - ws.send(enterPayload) - }, 100) + // Fallback: if shell banner/prompt does not arrive soon, still send command. + fallbackSendHandle = setTimeout(() => { + if (!ws || ws.readyState !== WS_OPEN || commandSent) return + sendWrappedCommand(ws) + }, 1000) }) // Handle message event @@ -443,6 +437,12 @@ export async function executeTtydCommand(options: TtydExecOptions): Promise Date: Tue, 10 Mar 2026 10:07:03 +0800 Subject: [PATCH 2/2] fix: satisfy strict null checks in import clone path --- lib/jobs/project-import/projectImportReconcile.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/jobs/project-import/projectImportReconcile.ts b/lib/jobs/project-import/projectImportReconcile.ts index 5910e97..c67ac3a 100644 --- a/lib/jobs/project-import/projectImportReconcile.ts +++ b/lib/jobs/project-import/projectImportReconcile.ts @@ -194,7 +194,8 @@ async function cloneRepoToSandbox(project: ImportProjectWithRelations, sandboxId const authUrl = `https://x-access-token:${installationToken}@github.com/${project.githubRepoFullName}.git` const escapedAuthUrl = shellEscapeSingleQuoted(authUrl) const escapedBranch = shellEscapeSingleQuoted(project.githubRepoDefaultBranch!) - const repoName = project.githubRepoFullName.split('/').at(-1) || 'repo' + const repoFullName = project.githubRepoFullName! + const repoName = repoFullName.split('/').at(-1) || 'repo' const importDirName = repoName.replace(/[^a-zA-Z0-9._-]/g, '-') const uniqueImportDirName = `${importDirName}-${project.id}` const escapedImportDirName = shellEscapeSingleQuoted(uniqueImportDirName)