From b1bf618ae9feb2049a994ef003e362dc8cadd567 Mon Sep 17 00:00:00 2001 From: rabsef-bicyrm Date: Wed, 28 Jan 2026 14:28:01 -0800 Subject: [PATCH] feat: Add Codex resume and switching - Add Codex resume picker/rollout scanning and session mapping - Bridge Codex MCP to mobile/local UI with optional ACP mirror - Harden message handling, ink sanitize, and queue reset behavior --- src/api/api.ts | 7 +- src/api/apiSession.ts | 11 +- src/api/rpc/RpcHandlerManager.ts | 18 +- .../rpc/__tests__/RpcHandlerManager.test.ts | 51 + .../__tests__/rolloutScannerPreview.test.ts | 327 +++++++ src/codex/codexLocalLauncher.ts | 195 ++++ src/codex/codexMcpClient.ts | 19 +- src/codex/codexRemoteLauncher.ts | 445 +++++++++ src/codex/loop.ts | 82 ++ src/codex/mode.ts | 6 + src/codex/runCodex.ts | 910 ++++-------------- src/codex/sessionController.ts | 6 + src/codex/types.ts | 1 + src/codex/utils/codexAcp.ts | 58 ++ src/codex/utils/codexSessionMap.ts | 74 ++ src/codex/utils/permissionHandler.ts | 40 +- src/codex/utils/ready.ts | 27 + src/codex/utils/resume.ts | 14 + src/codex/utils/resumePicker.ts | 102 ++ src/codex/utils/rolloutScanner.ts | 793 +++++++++++++++ src/index.ts | 51 +- src/ui/ink/CodexDisplay.tsx | 47 +- src/ui/ink/CodexResumeSelector.tsx | 178 ++++ src/ui/ink/RemoteModeDisplay.tsx | 7 +- src/utils/BasePermissionHandler.ts | 124 ++- src/utils/MessageQueue2.test.ts | 22 +- src/utils/MessageQueue2.ts | 10 +- src/utils/__tests__/inkSanitize.test.ts | 29 + src/utils/inkSanitize.ts | 21 + 29 files changed, 2904 insertions(+), 771 deletions(-) create mode 100644 src/api/rpc/__tests__/RpcHandlerManager.test.ts create mode 100644 src/codex/__tests__/rolloutScannerPreview.test.ts create mode 100644 src/codex/codexLocalLauncher.ts create mode 100644 src/codex/codexRemoteLauncher.ts create mode 100644 src/codex/loop.ts create mode 100644 src/codex/mode.ts create mode 100644 src/codex/sessionController.ts create mode 100644 src/codex/utils/codexAcp.ts create mode 100644 src/codex/utils/codexSessionMap.ts create mode 100644 src/codex/utils/ready.ts create mode 100644 src/codex/utils/resume.ts create mode 100644 src/codex/utils/resumePicker.ts create mode 100644 src/codex/utils/rolloutScanner.ts create mode 100644 src/ui/ink/CodexResumeSelector.tsx create mode 100644 src/utils/__tests__/inkSanitize.test.ts create mode 100644 src/utils/inkSanitize.ts diff --git a/src/api/api.ts b/src/api/api.ts index fc381180..678e9774 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -39,14 +39,15 @@ export class ApiClient { let encryptionVariant: 'legacy' | 'dataKey'; if (this.credential.encryption.type === 'dataKey') { - // Generate new encryption key - encryptionKey = getRandomBytes(32); + // Use a stable per-machine key so resuming an existing session tag can decrypt + // previously-encrypted metadata and agent state. + encryptionKey = this.credential.encryption.machineKey; encryptionVariant = 'dataKey'; // Derive and encrypt data encryption key // const contentDataKey = await deriveKey(this.secret, 'Happy EnCoder', ['content']); // const publicKey = libsodiumPublicKeyFromSecretKey(contentDataKey); - let encryptedDataKey = libsodiumEncryptForPublicKey(encryptionKey, this.credential.encryption.publicKey); + let encryptedDataKey = libsodiumEncryptForPublicKey(this.credential.encryption.machineKey, this.credential.encryption.publicKey); dataEncryptionKey = new Uint8Array(encryptedDataKey.length + 1); dataEncryptionKey.set([0], 0); // Version byte dataEncryptionKey.set(encryptedDataKey, 1); // Data key diff --git a/src/api/apiSession.ts b/src/api/apiSession.ts index 187ce5b8..7ebfd325 100644 --- a/src/api/apiSession.ts +++ b/src/api/apiSession.ts @@ -72,7 +72,16 @@ export class ApiSessionClient extends EventEmitter { encryptionVariant: this.encryptionVariant, logger: (msg, data) => logger.debug(msg, data) }); - registerCommonHandlers(this.rpcHandlerManager, this.metadata.path); + + // Some legacy/invalid sessions may fail to decrypt metadata (null). Avoid crashing the CLI; + // the session can still function for remote messaging and will receive metadata updates later. + if (this.metadata?.path) { + registerCommonHandlers(this.rpcHandlerManager, this.metadata.path); + } else { + logger.debug('[API] Session metadata unavailable; skipping common handler registration', { + sessionId: this.sessionId, + }); + } // // Create socket diff --git a/src/api/rpc/RpcHandlerManager.ts b/src/api/rpc/RpcHandlerManager.ts index 36ffd096..01233b07 100644 --- a/src/api/rpc/RpcHandlerManager.ts +++ b/src/api/rpc/RpcHandlerManager.ts @@ -56,7 +56,21 @@ export class RpcHandlerManager { request: RpcRequest, ): Promise { try { - const handler = this.handlers.get(request.method); + let handler = this.handlers.get(request.method); + + // Compatibility: some clients may send unscoped methods (e.g. "permission") + // instead of the usual scoped `${scopePrefix}:${method}` form. + if (!handler && !request.method.includes(':')) { + const prefixedMethod = this.getPrefixedMethod(request.method); + handler = this.handlers.get(prefixedMethod); + if (handler) { + this.logger('[RPC] Matched unscoped method to scoped handler', { + method: request.method, + prefixedMethod, + }); + request = { ...request, method: prefixedMethod }; + } + } if (!handler) { this.logger('[RPC] [ERROR] Method not found', { method: request.method }); @@ -135,4 +149,4 @@ export class RpcHandlerManager { */ export function createRpcHandlerManager(config: RpcHandlerConfig): RpcHandlerManager { return new RpcHandlerManager(config); -} \ No newline at end of file +} diff --git a/src/api/rpc/__tests__/RpcHandlerManager.test.ts b/src/api/rpc/__tests__/RpcHandlerManager.test.ts new file mode 100644 index 00000000..fb128871 --- /dev/null +++ b/src/api/rpc/__tests__/RpcHandlerManager.test.ts @@ -0,0 +1,51 @@ +import { describe, it, expect } from 'vitest'; +import { RpcHandlerManager } from '../RpcHandlerManager'; +import { decodeBase64, decrypt, encodeBase64, encrypt } from '@/api/encryption'; + +describe('RpcHandlerManager', () => { + it('routes unscoped method names to scoped handlers', async () => { + const key = new Uint8Array(32).fill(7); + const encryptionVariant: 'legacy' = 'legacy'; + + const manager = new RpcHandlerManager({ + scopePrefix: 'sid123', + encryptionKey: key, + encryptionVariant, + logger: () => {}, + }); + + let seen: unknown = null; + manager.registerHandler('permission', async (params) => { + seen = params; + return { ok: true }; + }); + + const encryptedParams = encodeBase64(encrypt(key, encryptionVariant, { id: 'p1', approved: true })); + const response = await manager.handleRequest({ method: 'permission', params: encryptedParams }); + const decrypted = decrypt(key, encryptionVariant, decodeBase64(response)); + + expect(seen).toEqual({ id: 'p1', approved: true }); + expect(decrypted).toEqual({ ok: true }); + }); + + it('accepts already-scoped method names', async () => { + const key = new Uint8Array(32).fill(9); + const encryptionVariant: 'legacy' = 'legacy'; + + const manager = new RpcHandlerManager({ + scopePrefix: 'sid123', + encryptionKey: key, + encryptionVariant, + logger: () => {}, + }); + + manager.registerHandler('permission', async () => ({ ok: true })); + + const encryptedParams = encodeBase64(encrypt(key, encryptionVariant, { id: 'p1', approved: true })); + const response = await manager.handleRequest({ method: 'sid123:permission', params: encryptedParams }); + const decrypted = decrypt(key, encryptionVariant, decodeBase64(response)); + + expect(decrypted).toEqual({ ok: true }); + }); +}); + diff --git a/src/codex/__tests__/rolloutScannerPreview.test.ts b/src/codex/__tests__/rolloutScannerPreview.test.ts new file mode 100644 index 00000000..9f3a1fd8 --- /dev/null +++ b/src/codex/__tests__/rolloutScannerPreview.test.ts @@ -0,0 +1,327 @@ +import { describe, expect, it } from 'vitest'; + +import { mkdtemp, mkdir, rm, writeFile } from 'node:fs/promises'; +import os from 'node:os'; +import { join } from 'node:path'; + +import { listCodexResumeSessions } from '../utils/rolloutScanner'; + +describe('rolloutScanner preview sanitization', () => { + it('strips ANSI escape codes and control characters from resume previews', async () => { + const originalCodexHome = process.env.CODEX_HOME; + + const tmpRoot = await mkdtemp(join(os.tmpdir(), 'happy-cli-codex-preview-')); + try { + const projectDir = join(tmpRoot, 'project'); + const sessionsDir = join(tmpRoot, 'sessions'); + await mkdir(projectDir, { recursive: true }); + await mkdir(sessionsDir, { recursive: true }); + + process.env.CODEX_HOME = tmpRoot; + + const sessionId = '019bbb78-fd0a-7be1-b731-684e43c306cf'; + const rawMessage = [ + 'Hello', + '\u001b[31mRED\u001b[0m', + '\u001b]0;title\u0007', + '\u0000null', + 'world', + ].join(' '); + + const rolloutFile = join( + sessionsDir, + 'rollout-2026-01-15T00-00-00-00000000-0000-0000-0000-000000000000.jsonl' + ); + + await writeFile( + rolloutFile, + [ + JSON.stringify({ + type: 'session_meta', + payload: { + meta: { + id: sessionId, + cwd: projectDir, + git: { branch: 'master' }, + }, + }, + }), + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: rawMessage }, + }), + ].join('\n') + '\n' + ); + + const entries = await listCodexResumeSessions({ workingDirectory: projectDir }); + expect(entries).toHaveLength(1); + + const preview = entries[0]?.preview ?? ''; + expect(preview).toContain('Hello RED'); + expect(preview).toContain('world'); + expect(preview).not.toMatch(/[\u001B\u009B]/); + expect(preview).not.toMatch(/[\u0000-\u001F\u007F-\u009F]/); + } finally { + process.env.CODEX_HOME = originalCodexHome; + await rm(tmpRoot, { recursive: true, force: true }); + } + }); + + it('uses the first meaningful user message (not injected AGENTS.md)', async () => { + const originalCodexHome = process.env.CODEX_HOME; + + const tmpRoot = await mkdtemp(join(os.tmpdir(), 'happy-cli-codex-preview-latest-')); + try { + const projectDir = join(tmpRoot, 'project'); + const sessionsDir = join(tmpRoot, 'sessions'); + await mkdir(projectDir, { recursive: true }); + await mkdir(sessionsDir, { recursive: true }); + + process.env.CODEX_HOME = tmpRoot; + + const sessionId = '019bbb78-fd0a-7be1-b731-684e43c306cf'; + const injectedAgents = '# AGENTS.md instructions for /path\n\nfoo\n'; + const realPrompt = 'Please perform the same cleanup for undated-46740309.'; + + const rolloutFile = join( + sessionsDir, + 'rollout-2026-01-15T00-00-00-00000000-0000-0000-0000-000000000000.jsonl' + ); + + await writeFile( + rolloutFile, + [ + JSON.stringify({ + type: 'session_meta', + payload: { + meta: { + id: sessionId, + cwd: projectDir, + git: { branch: 'master' }, + }, + }, + }), + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: injectedAgents }, + }), + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: realPrompt }, + }), + ].join('\n') + '\n' + ); + + const entries = await listCodexResumeSessions({ workingDirectory: projectDir }); + expect(entries).toHaveLength(1); + expect(entries[0]?.preview).toContain('Please perform the same cleanup'); + } finally { + process.env.CODEX_HOME = originalCodexHome; + await rm(tmpRoot, { recursive: true, force: true }); + } + }); + + it('keeps the preview stable even when later lines include huge tool output', async () => { + const originalCodexHome = process.env.CODEX_HOME; + + const tmpRoot = await mkdtemp(join(os.tmpdir(), 'happy-cli-codex-preview-head-fallback-')); + try { + const projectDir = join(tmpRoot, 'project'); + const sessionsDir = join(tmpRoot, 'sessions'); + await mkdir(projectDir, { recursive: true }); + await mkdir(sessionsDir, { recursive: true }); + + process.env.CODEX_HOME = tmpRoot; + + const sessionId = '019bbb78-fd0a-7be1-b731-684e43c306cf'; + const injectedAgents = '# AGENTS.md instructions for /path\n\nfoo\n'; + const realPrompt = 'Codex please form a commit on this repo and push to origin.'; + + // Make the file large by adding a big tool output after the user's prompt. + const bigOutput = 'X'.repeat(1200 * 1024); + + const rolloutFile = join( + sessionsDir, + 'rollout-2026-01-15T00-00-00-00000000-0000-0000-0000-000000000000.jsonl' + ); + + await writeFile( + rolloutFile, + [ + JSON.stringify({ + type: 'session_meta', + payload: { + meta: { + id: sessionId, + cwd: projectDir, + git: { branch: 'master' }, + }, + }, + }), + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: injectedAgents }, + }), + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: realPrompt }, + }), + JSON.stringify({ + type: 'response_item', + payload: { + type: 'function_call_output', + call_id: 'call_big', + output: bigOutput, + }, + }), + JSON.stringify({ + type: 'response_item', + payload: { + type: 'message', + role: 'assistant', + content: [{ type: 'output_text', text: 'ok' }], + }, + }), + ].join('\n') + '\n' + ); + + const entries = await listCodexResumeSessions({ workingDirectory: projectDir }); + expect(entries).toHaveLength(1); + expect(entries[0]?.preview).toContain('Codex please form a commit'); + } finally { + process.env.CODEX_HOME = originalCodexHome; + await rm(tmpRoot, { recursive: true, force: true }); + } + }); + + it('matches Codex filtering: excludes rollouts without a user event in the head scan window', async () => { + const originalCodexHome = process.env.CODEX_HOME; + + const tmpRoot = await mkdtemp(join(os.tmpdir(), 'happy-cli-codex-preview-head-filter-')); + try { + const projectDir = join(tmpRoot, 'project'); + const sessionsDir = join(tmpRoot, 'sessions'); + await mkdir(projectDir, { recursive: true }); + await mkdir(sessionsDir, { recursive: true }); + + process.env.CODEX_HOME = tmpRoot; + + const sessionId = '019bbb78-fd0a-7be1-b731-684e43c306cf'; + + const rolloutFile = join( + sessionsDir, + 'rollout-2026-01-15T00-00-00-00000000-0000-0000-0000-000000000000.jsonl' + ); + + // 11 records total: + // - session_meta + 9 assistant messages => first 10 records contain NO user event + // - user message appears only at record 11, so Codex would exclude this rollout + const records: string[] = [ + JSON.stringify({ + type: 'session_meta', + payload: { + meta: { + id: sessionId, + cwd: projectDir, + git: { branch: 'master' }, + }, + }, + }), + ]; + + for (let i = 0; i < 9; i++) { + records.push( + JSON.stringify({ + type: 'response_item', + payload: { + type: 'message', + role: 'assistant', + content: [{ type: 'output_text', text: `assistant-${i}` }], + }, + }) + ); + } + + records.push( + JSON.stringify({ + type: 'event_msg', + payload: { type: 'user_message', message: 'this is too late' }, + }) + ); + + await writeFile(rolloutFile, records.join('\n') + '\n'); + + const entries = await listCodexResumeSessions({ workingDirectory: projectDir }); + expect(entries).toHaveLength(0); + } finally { + process.env.CODEX_HOME = originalCodexHome; + await rm(tmpRoot, { recursive: true, force: true }); + } + }); + + it('matches Codex filtering: excludes rollouts that only have response_item user messages (no user_message event)', async () => { + const originalCodexHome = process.env.CODEX_HOME; + + const tmpRoot = await mkdtemp(join(os.tmpdir(), 'happy-cli-codex-preview-event-msg-only-')); + try { + const projectDir = join(tmpRoot, 'project'); + const sessionsDir = join(tmpRoot, 'sessions'); + await mkdir(projectDir, { recursive: true }); + await mkdir(sessionsDir, { recursive: true }); + + process.env.CODEX_HOME = tmpRoot; + + const sessionId = '019bbb78-fd0a-7be1-b731-684e43c306cf'; + + const rolloutFile = join( + sessionsDir, + 'rollout-2026-01-15T00-00-00-00000000-0000-0000-0000-000000000000.jsonl' + ); + + // First 10 records include a user message as a response_item, but there is NO event_msg:user_message. + // Codex excludes these from its resume list. + const records: string[] = [ + JSON.stringify({ + type: 'session_meta', + payload: { + meta: { + id: sessionId, + cwd: projectDir, + git: { branch: 'master' }, + }, + }, + }), + JSON.stringify({ + type: 'response_item', + payload: { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'hello from response_item' }], + }, + }), + ]; + + // Pad to 10 total records with assistant messages + while (records.length < 10) { + records.push( + JSON.stringify({ + type: 'response_item', + payload: { + type: 'message', + role: 'assistant', + content: [{ type: 'output_text', text: 'ok' }], + }, + }) + ); + } + + await writeFile(rolloutFile, records.join('\n') + '\n'); + + const entries = await listCodexResumeSessions({ workingDirectory: projectDir }); + expect(entries).toHaveLength(0); + } finally { + process.env.CODEX_HOME = originalCodexHome; + await rm(tmpRoot, { recursive: true, force: true }); + } + }); +}); diff --git a/src/codex/codexLocalLauncher.ts b/src/codex/codexLocalLauncher.ts new file mode 100644 index 00000000..c1e3838b --- /dev/null +++ b/src/codex/codexLocalLauncher.ts @@ -0,0 +1,195 @@ +import { spawn } from 'node:child_process'; +import type { UUID } from 'node:crypto'; + +import { logger } from '@/ui/logger'; +import { MessageQueue2 } from '@/utils/MessageQueue2'; +import type { CodexMode } from './mode'; +import { createCodexRolloutScanner, findLatestCodexRolloutForCwd, findSessionFileById } from './utils/rolloutScanner'; +import { extractResumeSessionId } from './utils/resume'; +import { ensureHappySessionTagForCodexSession } from './utils/codexSessionMap'; +import type { SessionController } from './sessionController'; +import { codexMessageToAcp, type CodexMessage } from './utils/codexAcp'; + +export type CodexLocalReason = 'switch' | 'exit'; + +export interface CodexLocalResult { + reason: CodexLocalReason; + resumeFile?: string | null; +} + +export interface CodexLocalOptions { + sessionController: SessionController; + path: string; + resumeArgs?: string[]; + resumeSessionId?: string; + sessionTag?: UUID; + messageQueue: MessageQueue2; +} + +export async function codexLocalLauncher(opts: CodexLocalOptions): Promise { + logger.debug('[codex-local] Starting local launcher'); + + const { getSession, onSessionSwap } = opts.sessionController; + let session = getSession(); + let lastRolloutFile: string | null = null; + const resumeSessionId = opts.resumeSessionId ?? extractResumeSessionId(opts.resumeArgs); + + if (resumeSessionId) { + lastRolloutFile = await findSessionFileById(resumeSessionId); + } + + const sendCodexMessage = (message: CodexMessage) => { + session.sendCodexMessage(message); + if (process.env.HAPPY_CODEX_ACP === '1') { + const acpMessage = codexMessageToAcp(message); + if (acpMessage) { + session.sendAgentMessage('codex', acpMessage); + } + } + }; + + const scanner = await createCodexRolloutScanner({ + workingDirectory: opts.path, + allowAll: opts.resumeArgs?.includes('--all') ?? false, + resumeSessionId: resumeSessionId ?? undefined, + onActiveSessionFile: (file, sessionId) => { + lastRolloutFile = file; + if (sessionId && opts.sessionTag) { + void ensureHappySessionTagForCodexSession(sessionId, opts.sessionTag).catch((error) => { + logger.debug('[codex-local] Failed to store session tag mapping', error); + }); + } + }, + onCodexMessage: (message) => { + sendCodexMessage(message); + }, + }); + + let exitReason: CodexLocalReason | null = null; + const processAbortController = new AbortController(); + let childExit: Promise | null = null; + + async function abortProcess() { + if (!processAbortController.signal.aborted) { + processAbortController.abort(); + } + if (childExit) { + await childExit; + } + } + + async function doSwitch() { + logger.debug('[codex-local] Switching to remote mode'); + if (!exitReason) { + exitReason = 'switch'; + } + await abortProcess(); + } + + async function doAbort() { + logger.debug('[codex-local] Abort requested'); + if (!exitReason) { + exitReason = 'switch'; + } + opts.messageQueue.reset(); + await abortProcess(); + } + + // Switch to remote when messages arrive + opts.messageQueue.setOnMessage(() => { + void doSwitch(); + }); + + const bindSession = (nextSession: typeof session) => { + session = nextSession; + session.rpcHandlerManager.registerHandler('abort', doAbort); + session.rpcHandlerManager.registerHandler('switch', doSwitch); + }; + + bindSession(session); + const unsubscribe = onSessionSwap((nextSession) => { + bindSession(nextSession); + }); + + if (opts.messageQueue.size() > 0 && !exitReason) { + exitReason = 'switch'; + } + + try { + let nextArgs = opts.resumeArgs; + while (true) { + if (exitReason) { + break; + } + + const args = nextArgs ?? []; + nextArgs = undefined; + logger.debug('[codex-local] Spawning codex', args); + + const env = { ...process.env }; + if (env.NO_COLOR) { + delete env.NO_COLOR; + logger.debug('[codex-local] Clearing NO_COLOR for local codex'); + } + if (process.stdout.isTTY && !env.FORCE_COLOR) { + env.FORCE_COLOR = '1'; + } + + if (process.stdin.isTTY) { + try { process.stdin.setRawMode(false); } catch { } + } + if (process.stdout.isTTY) { + process.stdout.write('\x1b[0m\x1b[?25h\x1b[39m\x1b[49m'); + } + + childExit = new Promise((resolve) => { + const child = spawn('codex', args, { + stdio: 'inherit', + cwd: opts.path, + env, + }); + + const abortHandler = () => { + if (!child.killed) { + child.kill('SIGTERM'); + } + }; + + processAbortController.signal.addEventListener('abort', abortHandler); + + child.on('exit', () => { + processAbortController.signal.removeEventListener('abort', abortHandler); + resolve(); + }); + child.on('error', () => { + processAbortController.signal.removeEventListener('abort', abortHandler); + resolve(); + }); + }); + + await childExit; + + if (!exitReason) { + exitReason = 'exit'; + } + } + } finally { + childExit = null; + opts.messageQueue.setOnMessage(null); + session.rpcHandlerManager.registerHandler('abort', async () => { }); + session.rpcHandlerManager.registerHandler('switch', async () => { }); + unsubscribe(); + await scanner.cleanup(); + } + + if (!lastRolloutFile) { + const shouldPreferMtime = opts.resumeArgs?.includes('resume') || opts.resumeArgs?.includes('--resume'); + lastRolloutFile = await findLatestCodexRolloutForCwd( + opts.path, + opts.resumeArgs?.includes('--all') ?? false, + { preferMtime: shouldPreferMtime } + ); + } + + return { reason: exitReason || 'exit', resumeFile: lastRolloutFile }; +} diff --git a/src/codex/codexMcpClient.ts b/src/codex/codexMcpClient.ts index ed101394..e1891b8a 100644 --- a/src/codex/codexMcpClient.ts +++ b/src/codex/codexMcpClient.ts @@ -30,6 +30,9 @@ function getCodexMcpCommand(): string | null { const versionStr = match[1]; const [major, minor, patch] = versionStr.split(/[-.]/).map(Number); + // Dev builds may report 0.0.0 but still require mcp-server. + if (major === 0 && minor === 0 && patch === 0) return 'mcp-server'; + // Version >= 0.43.0-alpha.5 has mcp-server if (major > 0 || minor > 43) return 'mcp-server'; if (minor === 43 && patch === 0) { @@ -146,7 +149,7 @@ export class CodexMcpClient { if (!this.permissionHandler) { logger.debug('[CodexMCP] No permission handler set, denying by default'); return { - decision: 'denied' as const, + action: 'decline' as const, }; } @@ -162,14 +165,20 @@ export class CodexMcpClient { ); logger.debug('[CodexMCP] Permission result:', result); - return { - decision: result.decision + + if (result.decision === 'approved' || result.decision === 'approved_for_session') { + return { action: 'accept' as const, content: {} }; } + + if (result.decision === 'abort') { + return { action: 'cancel' as const }; + } + + return { action: 'decline' as const }; } catch (error) { logger.debug('[CodexMCP] Error handling permission request:', error); return { - decision: 'denied' as const, - reason: error instanceof Error ? error.message : 'Permission request failed' + action: 'decline' as const, }; } } diff --git a/src/codex/codexRemoteLauncher.ts b/src/codex/codexRemoteLauncher.ts new file mode 100644 index 00000000..91230cbc --- /dev/null +++ b/src/codex/codexRemoteLauncher.ts @@ -0,0 +1,445 @@ +import { render } from 'ink'; +import React from 'react'; +import { randomUUID } from 'node:crypto'; +import type { UUID } from 'node:crypto'; + +import { ApiClient } from '@/api/api'; +import { MessageQueue2 } from '@/utils/MessageQueue2'; +import { logger } from '@/ui/logger'; +import { CodexMcpClient } from './codexMcpClient'; +import { CodexPermissionHandler } from './utils/permissionHandler'; +import { ReasoningProcessor } from './utils/reasoningProcessor'; +import { DiffProcessor } from './utils/diffProcessor'; +import type { CodexMode } from './mode'; +import { emitReadyIfIdle } from './utils/ready'; +import { findSessionFileById, readSessionMeta } from './utils/rolloutScanner'; +import { CodexSessionConfig } from './types'; +import { CHANGE_TITLE_INSTRUCTION } from '@/gemini/constants'; +import { RemoteModeDisplay } from '@/ui/ink/RemoteModeDisplay'; +import { MessageBuffer } from '@/ui/ink/messageBuffer'; +import { ensureHappySessionTagForCodexSession } from './utils/codexSessionMap'; +import type { SessionController } from './sessionController'; +import { codexMessageToAcp, type CodexMessage } from './utils/codexAcp'; + +export async function codexRemoteLauncher(opts: { + sessionController: SessionController; + api: ApiClient; + messageQueue: MessageQueue2; + mcpServers: Record; + onThinkingChange: (thinking: boolean) => void; + resumeFile?: string; + resumeSessionId?: string; + sessionTag?: UUID; +}): Promise<{ reason: 'switch' | 'exit'; resumeArgs?: string[] }> { + logger.debug('[codex-remote] Starting remote launcher'); + + const { api, messageQueue, mcpServers, onThinkingChange } = opts; + const { getSession, onSessionSwap } = opts.sessionController; + let session = getSession(); + + // Configure terminal + const hasTTY = process.stdout.isTTY && process.stdin.isTTY; + let messageBuffer = new MessageBuffer(); + let inkInstance: any = null; + + let exitReason: 'switch' | 'exit' | null = null; + let activeRolloutFile: string | null = opts.resumeFile ?? null; + const resumeSessionId = opts.resumeSessionId; + + if (!activeRolloutFile && resumeSessionId) { + activeRolloutFile = await findSessionFileById(resumeSessionId); + } + + if (hasTTY) { + console.clear(); + inkInstance = render(React.createElement(RemoteModeDisplay, { + messageBuffer, + logPath: process.env.DEBUG ? logger.logFilePath : undefined, + onExit: async () => { + logger.debug('[codex-remote] Exiting client via Ctrl-C'); + if (!exitReason) { + exitReason = 'exit'; + } + shouldExit = true; + await doAbort(); + }, + onSwitchToLocal: () => { + logger.debug('[codex-remote] Switching to local mode'); + void doSwitch(); + }, + title: 'Codex', + }), { exitOnCtrlC: false, patchConsole: false }); + } + + if (hasTTY) { + process.stdin.resume(); + if (process.stdin.isTTY) { + process.stdin.setRawMode(true); + } + process.stdin.setEncoding('utf8'); + } + + const client = new CodexMcpClient(); + const permissionHandler = new CodexPermissionHandler(session); + const sendCodexMessage = (message: CodexMessage) => { + const activeSession = getSession(); + activeSession.sendCodexMessage(message); + if (process.env.HAPPY_CODEX_ACP === '1') { + const acpMessage = codexMessageToAcp(message); + if (acpMessage) { + activeSession.sendAgentMessage('codex', acpMessage); + } + } + }; + const reasoningProcessor = new ReasoningProcessor((message) => { + sendCodexMessage(message); + }); + const diffProcessor = new DiffProcessor((message) => { + sendCodexMessage(message); + }); + + client.setPermissionHandler(permissionHandler); + + let thinking = false; + let shouldExit = false; + let wasCreated = false; + let currentModeHash: string | null = null; + let first = true; + let turnAbortController = new AbortController(); + let mappedSessionId: string | null = null; + + const maybeStoreSessionId = (sessionId?: string | null) => { + if (!sessionId || !opts.sessionTag) { + return; + } + if (mappedSessionId === sessionId) { + return; + } + mappedSessionId = sessionId; + void ensureHappySessionTagForCodexSession(sessionId, opts.sessionTag).catch((error) => { + logger.debug('[codex-remote] Failed to store session tag mapping', error); + }); + }; + + const sendReady = () => { + const activeSession = getSession(); + activeSession.sendSessionEvent({ type: 'ready' }); + try { + api.push().sendToAllDevices("It's ready!", 'Codex is waiting for your command', { sessionId: activeSession.sessionId }); + } catch (pushError) { + logger.debug('[Codex] Failed to send ready push', pushError); + } + }; + + const handleAbort = async () => { + logger.debug('[codex-remote] Abort requested - stopping current task'); + try { + turnAbortController.abort(); + messageQueue.reset(); + permissionHandler.reset(); + reasoningProcessor.abort(); + diffProcessor.reset(); + thinking = false; + onThinkingChange(false); + } finally { + turnAbortController = new AbortController(); + } + }; + + const doAbort = async () => { + await handleAbort(); + }; + + const doSwitch = async () => { + if (!exitReason) { + exitReason = 'switch'; + } + shouldExit = true; + await handleAbort(); + }; + + const bindSession = (nextSession: typeof session) => { + session = nextSession; + permissionHandler.updateSession(nextSession); + nextSession.rpcHandlerManager.registerHandler('abort', doAbort); + nextSession.rpcHandlerManager.registerHandler('switch', doSwitch); + }; + + bindSession(session); + const unsubscribe = onSessionSwap((nextSession) => { + bindSession(nextSession); + }); + + client.setHandler((msg) => { + logger.debug(`[Codex] MCP message: ${JSON.stringify(msg)}`); + + if (msg.type === 'agent_message') { + messageBuffer.addMessage(msg.message, 'assistant'); + } else if (msg.type === 'agent_reasoning_delta') { + // Skip reasoning deltas in the UI to reduce noise + } else if (msg.type === 'agent_reasoning') { + messageBuffer.addMessage(`[Thinking] ${msg.text.substring(0, 100)}...`, 'system'); + } else if (msg.type === 'exec_command_begin') { + messageBuffer.addMessage(`Executing: ${msg.command}`, 'tool'); + } else if (msg.type === 'exec_command_end') { + const output = msg.output || msg.error || 'Command completed'; + const truncatedOutput = output.substring(0, 200); + messageBuffer.addMessage( + `Result: ${truncatedOutput}${output.length > 200 ? '...' : ''}`, + 'result' + ); + } else if (msg.type === 'task_started') { + messageBuffer.addMessage('Starting task...', 'status'); + } else if (msg.type === 'task_complete') { + messageBuffer.addMessage('Task completed', 'status'); + sendReady(); + } else if (msg.type === 'turn_aborted') { + messageBuffer.addMessage('Turn aborted', 'status'); + sendReady(); + } + + if (msg.type === 'task_started') { + if (!thinking) { + thinking = true; + onThinkingChange(true); + } + } + if (msg.type === 'task_complete' || msg.type === 'turn_aborted') { + if (thinking) { + thinking = false; + onThinkingChange(false); + } + diffProcessor.reset(); + } + if (msg.type === 'agent_reasoning_section_break') { + reasoningProcessor.handleSectionBreak(); + } + if (msg.type === 'agent_reasoning_delta') { + reasoningProcessor.processDelta(msg.delta); + } + if (msg.type === 'agent_reasoning') { + reasoningProcessor.complete(msg.text); + } + if (msg.type === 'agent_message') { + sendCodexMessage({ + type: 'message', + message: msg.message, + id: randomUUID(), + }); + } + if (msg.type === 'exec_command_begin' || msg.type === 'exec_approval_request') { + let { call_id, type, ...inputs } = msg; + sendCodexMessage({ + type: 'tool-call', + name: 'CodexBash', + callId: call_id, + input: inputs, + id: randomUUID(), + }); + } + if (msg.type === 'exec_command_end') { + let { call_id, type, ...output } = msg; + sendCodexMessage({ + type: 'tool-call-result', + callId: call_id, + output: output, + id: randomUUID(), + }); + } + if (msg.type === 'token_count') { + sendCodexMessage({ + ...msg, + id: randomUUID(), + }); + } + if (msg.type === 'patch_apply_begin') { + let { call_id, auto_approved, changes } = msg; + const changeCount = Object.keys(changes).length; + const filesMsg = changeCount === 1 ? '1 file' : `${changeCount} files`; + messageBuffer.addMessage(`Modifying ${filesMsg}...`, 'tool'); + sendCodexMessage({ + type: 'tool-call', + name: 'CodexPatch', + callId: call_id, + input: { + auto_approved, + changes, + }, + id: randomUUID(), + }); + } + if (msg.type === 'patch_apply_end') { + let { call_id, stdout, stderr, success } = msg; + if (success) { + const message = stdout || 'Files modified successfully'; + messageBuffer.addMessage(message.substring(0, 200), 'result'); + } else { + const errorMsg = stderr || 'Failed to modify files'; + messageBuffer.addMessage(`Error: ${errorMsg.substring(0, 200)}`, 'result'); + } + sendCodexMessage({ + type: 'tool-call-result', + callId: call_id, + output: { + stdout, + stderr, + success, + }, + id: randomUUID(), + }); + } + if (msg.type === 'turn_diff') { + if (msg.unified_diff) { + diffProcessor.processDiff(msg.unified_diff); + } + } + + if (!activeRolloutFile) { + const sessionId = client.getSessionId(); + if (sessionId) { + maybeStoreSessionId(sessionId); + void findSessionFileById(sessionId).then((file) => { + if (file) { + activeRolloutFile = file; + } + }); + } + } + }); + + try { + await client.connect(); + + while (!shouldExit) { + const waitSignal = turnAbortController.signal; + const message = await messageQueue.waitForMessagesAndGetAsString(waitSignal); + if (!message) { + if (waitSignal.aborted && !shouldExit) { + logger.debug('[codex-remote] Wait aborted while idle; continuing'); + continue; + } + break; + } + + if (wasCreated && currentModeHash && message.hash !== currentModeHash) { + logger.debug('[codex-remote] Mode changed - restarting codex session'); + client.clearSession(); + wasCreated = false; + currentModeHash = null; + } + + messageBuffer.addMessage(message.message, 'user'); + currentModeHash = message.hash; + + try { + const approvalPolicy = (() => { + switch (message.mode.permissionMode) { + case 'default': return 'untrusted' as const; + case 'read-only': return 'never' as const; + case 'safe-yolo': return 'on-failure' as const; + case 'yolo': return 'on-failure' as const; + default: return 'untrusted' as const; + } + })(); + const sandbox = (() => { + switch (message.mode.permissionMode) { + case 'default': return 'workspace-write' as const; + case 'read-only': return 'read-only' as const; + case 'safe-yolo': return 'workspace-write' as const; + case 'yolo': return 'danger-full-access' as const; + default: return 'workspace-write' as const; + } + })(); + + if (!wasCreated) { + const shouldChangeTitle = !opts.resumeFile && !opts.resumeSessionId; + const startConfig: CodexSessionConfig = { + prompt: first && shouldChangeTitle + ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION + : message.message, + sandbox, + 'approval-policy': approvalPolicy, + config: { mcp_servers: mcpServers }, + cwd: process.cwd(), + }; + if (message.mode.model) { + startConfig.model = message.mode.model; + } + if (opts.resumeFile) { + startConfig['resume-path'] = opts.resumeFile; + logger.debug('[codex-remote] Resuming from rollout:', opts.resumeFile); + } + + await client.startSession(startConfig, { signal: turnAbortController.signal }); + if (!activeRolloutFile) { + const sessionId = client.getSessionId(); + if (sessionId) { + maybeStoreSessionId(sessionId); + activeRolloutFile = await findSessionFileById(sessionId); + } + } + wasCreated = true; + first = false; + } else { + await client.continueSession(message.message, { signal: turnAbortController.signal }); + } + } catch (error) { + const isAbortError = error instanceof Error && error.name === 'AbortError'; + if (isAbortError) { + messageBuffer.addMessage('Aborted by user', 'status'); + getSession().sendSessionEvent({ type: 'message', message: 'Aborted by user' }); + wasCreated = false; + currentModeHash = null; + } else { + messageBuffer.addMessage('Process exited unexpectedly', 'status'); + getSession().sendSessionEvent({ type: 'message', message: 'Process exited unexpectedly' }); + } + } finally { + permissionHandler.reset(); + reasoningProcessor.abort(); + diffProcessor.reset(); + thinking = false; + onThinkingChange(false); + emitReadyIfIdle({ + pending: null, + queueSize: () => messageQueue.size(), + shouldExit, + sendReady, + }); + } + } + } finally { + const activeSession = getSession(); + activeSession.rpcHandlerManager.registerHandler('abort', async () => { }); + activeSession.rpcHandlerManager.registerHandler('switch', async () => { }); + + if (hasTTY && process.stdin.isTTY) { + try { process.stdin.setRawMode(false); } catch { } + } + if (hasTTY) { + try { process.stdin.pause(); } catch { } + } + inkInstance?.unmount?.(); + await client.disconnect(); + unsubscribe(); + } + + const reason: 'switch' | 'exit' = exitReason === 'switch' ? 'switch' : 'exit'; + let resumeArgs: string[] | undefined; + if (exitReason === 'switch') { + let sessionId = resumeSessionId ?? client.getSessionId() ?? undefined; + if (!resumeSessionId && activeRolloutFile) { + const meta = await readSessionMeta(activeRolloutFile); + if (meta?.id) { + sessionId = meta.id; + } + } + if (sessionId) { + maybeStoreSessionId(sessionId); + resumeArgs = ['resume', sessionId]; + } else { + resumeArgs = ['resume', '--last']; + } + } + return { reason, resumeArgs }; +} diff --git a/src/codex/loop.ts b/src/codex/loop.ts new file mode 100644 index 00000000..32b01175 --- /dev/null +++ b/src/codex/loop.ts @@ -0,0 +1,82 @@ +import { ApiClient } from '@/api/api'; +import { MessageQueue2 } from '@/utils/MessageQueue2'; +import { logger } from '@/ui/logger'; +import { codexLocalLauncher } from './codexLocalLauncher'; +import { codexRemoteLauncher } from './codexRemoteLauncher'; +import type { CodexMode } from './mode'; +import type { UUID } from 'node:crypto'; +import type { SessionController } from './sessionController'; + +export type ControlMode = 'local' | 'remote'; + +export interface CodexLoopOptions { + path: string; + startingMode?: ControlMode; + resumeArgs?: string[]; + resumeSessionId?: string; + sessionTag?: UUID; + sessionController: SessionController; + api: ApiClient; + mcpServers: Record; + messageQueue: MessageQueue2; + onModeChange: (mode: ControlMode) => void; + onThinkingChange: (thinking: boolean) => void; +} + +export async function codexLoop(opts: CodexLoopOptions) { + let mode: ControlMode = opts.startingMode ?? 'local'; + let resumeArgs: string[] | undefined = opts.resumeArgs; + let resumeFile: string | undefined; + const resumeSessionId = opts.resumeSessionId; + + while (true) { + logger.debug(`[codex-loop] Iteration with mode: ${mode}`); + + if (mode === 'local') { + const result = await codexLocalLauncher({ + sessionController: opts.sessionController, + path: opts.path, + resumeArgs, + resumeSessionId, + sessionTag: opts.sessionTag, + messageQueue: opts.messageQueue, + }); + + // Consume resume args after first local spawn + resumeArgs = undefined; + + if (result.reason === 'exit') { + return; + } + + resumeFile = result.resumeFile ?? resumeFile; + mode = 'remote'; + opts.onModeChange(mode); + continue; + } + + if (mode === 'remote') { + const remoteResult = await codexRemoteLauncher({ + sessionController: opts.sessionController, + api: opts.api, + messageQueue: opts.messageQueue, + mcpServers: opts.mcpServers, + onThinkingChange: opts.onThinkingChange, + resumeFile, + resumeSessionId, + sessionTag: opts.sessionTag, + }); + + resumeFile = undefined; + + if (remoteResult.reason === 'exit') { + return; + } + + mode = 'local'; + opts.onModeChange(mode); + resumeArgs = remoteResult.resumeArgs; + continue; + } + } +} diff --git a/src/codex/mode.ts b/src/codex/mode.ts new file mode 100644 index 00000000..07efb82d --- /dev/null +++ b/src/codex/mode.ts @@ -0,0 +1,6 @@ +export type PermissionMode = 'default' | 'read-only' | 'safe-yolo' | 'yolo'; + +export interface CodexMode { + permissionMode: PermissionMode; + model?: string; +} diff --git a/src/codex/runCodex.ts b/src/codex/runCodex.ts index fdaf9b29..74749bca 100644 --- a/src/codex/runCodex.ts +++ b/src/codex/runCodex.ts @@ -1,125 +1,106 @@ -import { render } from "ink"; -import React from "react"; -import { ApiClient } from '@/api/api'; -import { CodexMcpClient } from './codexMcpClient'; -import { CodexPermissionHandler } from './utils/permissionHandler'; -import { ReasoningProcessor } from './utils/reasoningProcessor'; -import { DiffProcessor } from './utils/diffProcessor'; import { randomUUID } from 'node:crypto'; +import { join } from 'node:path'; + +import { ApiClient } from '@/api/api'; +import type { ApiSessionClient } from '@/api/apiSession'; import { logger } from '@/ui/logger'; import { Credentials, readSettings } from '@/persistence'; import { initialMachineMetadata } from '@/daemon/run'; -import { configuration } from '@/configuration'; -import packageJson from '../../package.json'; -import os from 'node:os'; +import { notifyDaemonSessionStarted } from '@/daemon/controlClient'; +import { registerKillSessionHandler } from '@/claude/registerKillSessionHandler'; +import { startHappyServer } from '@/claude/utils/startHappyServer'; +import { projectPath } from '@/projectPath'; +import { startCaffeinate, stopCaffeinate } from '@/utils/caffeinate'; import { MessageQueue2 } from '@/utils/MessageQueue2'; import { hashObject } from '@/utils/deterministicJson'; -import { projectPath } from '@/projectPath'; -import { resolve, join } from 'node:path'; import { createSessionMetadata } from '@/utils/createSessionMetadata'; -import fs from 'node:fs'; -import { startHappyServer } from '@/claude/utils/startHappyServer'; -import { MessageBuffer } from "@/ui/ink/messageBuffer"; -import { CodexDisplay } from "@/ui/ink/CodexDisplay"; -import { trimIdent } from "@/utils/trimIdent"; -import type { CodexSessionConfig } from './types'; -import { CHANGE_TITLE_INSTRUCTION } from '@/gemini/constants'; -import { notifyDaemonSessionStarted } from "@/daemon/controlClient"; -import { registerKillSessionHandler } from "@/claude/registerKillSessionHandler"; -import { delay } from "@/utils/time"; -import { stopCaffeinate } from "@/utils/caffeinate"; import { connectionState } from '@/utils/serverConnectionErrors'; import { setupOfflineReconnection } from '@/utils/setupOfflineReconnection'; -import type { ApiSessionClient } from '@/api/apiSession'; +import { codexLoop } from './loop'; +import type { CodexMode, PermissionMode } from './mode'; +import { extractResumeSessionId } from './utils/resume'; +import { ensureHappySessionTagForCodexSession, getHappySessionTagForCodexSession } from './utils/codexSessionMap'; +import type { SessionController } from './sessionController'; +export { emitReadyIfIdle } from './utils/ready'; +export type { CodexMode, PermissionMode } from './mode'; -type ReadyEventOptions = { - pending: unknown; - queueSize: () => number; - shouldExit: boolean; - sendReady: () => void; - notify?: () => void; -}; - -/** - * Notify connected clients when Codex finishes processing and the queue is idle. - * Returns true when a ready event was emitted. - */ -export function emitReadyIfIdle({ pending, queueSize, shouldExit, sendReady, notify }: ReadyEventOptions): boolean { - if (shouldExit) { - return false; - } - if (pending) { - return false; - } - if (queueSize() > 0) { - return false; - } - - sendReady(); - notify?.(); - return true; -} - -/** - * Main entry point for the codex command with ink UI - */ export async function runCodex(opts: { credentials: Credentials; startedBy?: 'daemon' | 'terminal'; + resumeArgs?: string[]; + startingMode?: 'local' | 'remote'; }): Promise { - // Use shared PermissionMode type for cross-agent compatibility - type PermissionMode = import('@/api/types').PermissionMode; - interface EnhancedMode { - permissionMode: PermissionMode; - model?: string; - } + logger.debug(`[CODEX] ===== CODEX MODE STARTING =====`); - // - // Define session - // - - const sessionTag = randomUUID(); + const workingDirectory = process.cwd(); + const sessionTagFallback = randomUUID(); // Set backend for offline warnings (before any API calls) connectionState.setBackend('Codex'); const api = await ApiClient.create(opts.credentials); - // Log startup options - logger.debug(`[codex] Starting with options: startedBy=${opts.startedBy || 'terminal'}`); + // Resolve initial mode + let mode: 'local' | 'remote' = opts.startingMode ?? (opts.startedBy === 'daemon' ? 'remote' : 'local'); + const resumeSessionId = extractResumeSessionId(opts.resumeArgs); + let sessionTag = sessionTagFallback; + if (resumeSessionId) { + const existingTag = await getHappySessionTagForCodexSession(resumeSessionId); + if (existingTag) { + sessionTag = existingTag; + } + sessionTag = await ensureHappySessionTagForCodexSession(resumeSessionId, sessionTag); + } - // - // Machine - // + // Validate daemon spawn requirements + if (opts.startedBy === 'daemon' && mode === 'local') { + logger.debug('Daemon spawn requested with local mode - forcing remote mode'); + mode = 'remote'; + } + // Get machine ID from settings const settings = await readSettings(); - let machineId = settings?.machineId; + const machineId = settings?.machineId; if (!machineId) { - console.error(`[START] No machine ID found in settings, which is unexpected since authAndSetupMachineIfNeeded should have created it. Please report this issue on https://github.com/slopus/happy-cli/issues`); + console.error( + `[START] No machine ID found in settings, which is unexpected since authAndSetupMachineIfNeeded should have created it. Please report this issue on https://github.com/slopus/happy-cli/issues` + ); process.exit(1); } logger.debug(`Using machineId: ${machineId}`); + + // Create machine if it doesn't exist await api.getOrCreateMachine({ machineId, - metadata: initialMachineMetadata + metadata: initialMachineMetadata, }); - // - // Create session - // - - const { state, metadata } = createSessionMetadata({ + const { state: baseState, metadata } = createSessionMetadata({ flavor: 'codex', machineId, - startedBy: opts.startedBy + startedBy: opts.startedBy, }); + + const state = { + ...baseState, + controlledByUser: mode === 'local', + }; + const response = await api.getOrCreateSession({ tag: sessionTag, metadata, state }); + logger.debug(`Session created: ${response?.id ?? 'offline'}`); - // Handle server unreachable case - create offline stub with hot reconnection let session: ApiSessionClient; - // Permission handler declared here so it can be updated in onSessionSwap callback - // (assigned later at line ~385 after client setup) - let permissionHandler: CodexPermissionHandler; + const sessionSwapListeners = new Set<(nextSession: ApiSessionClient) => void>(); + const sessionController: SessionController = { + getSession: () => session, + onSessionSwap: (listener) => { + sessionSwapListeners.add(listener); + return () => { + sessionSwapListeners.delete(listener); + }; + }, + }; + const { session: initialSession, reconnectionHandle } = setupOfflineReconnection({ api, sessionTag, @@ -128,11 +109,10 @@ export async function runCodex(opts: { response, onSessionSwap: (newSession) => { session = newSession; - // Update permission handler with new session to avoid stale reference - if (permissionHandler) { - permissionHandler.updateSession(newSession); + for (const listener of sessionSwapListeners) { + listener(newSession); } - } + }, }); session = initialSession; @@ -151,651 +131,163 @@ export async function runCodex(opts: { } } - const messageQueue = new MessageQueue2((mode) => hashObject({ - permissionMode: mode.permissionMode, - model: mode.model, - })); - - // Track current overrides to apply per message - // Use shared PermissionMode type from api/types for cross-agent compatibility - let currentPermissionMode: import('@/api/types').PermissionMode | undefined = undefined; - let currentModel: string | undefined = undefined; - - session.onUserMessage((message) => { - // Resolve permission mode (accept all modes, will be mapped in switch statement) - let messagePermissionMode = currentPermissionMode; - if (message.meta?.permissionMode) { - messagePermissionMode = message.meta.permissionMode as import('@/api/types').PermissionMode; - currentPermissionMode = messagePermissionMode; - logger.debug(`[Codex] Permission mode updated from user message to: ${currentPermissionMode}`); - } else { - logger.debug(`[Codex] User message received with no permission mode override, using current: ${currentPermissionMode ?? 'default (effective)'}`); - } - - // Resolve model; explicit null resets to default (undefined) - let messageModel = currentModel; - if (message.meta?.hasOwnProperty('model')) { - messageModel = message.meta.model || undefined; - currentModel = messageModel; - logger.debug(`[Codex] Model updated from user message: ${messageModel || 'reset to default'}`); - } else { - logger.debug(`[Codex] User message received with no model override, using current: ${currentModel || 'default'}`); - } - - const enhancedMode: EnhancedMode = { - permissionMode: messagePermissionMode || 'default', - model: messageModel, - }; - messageQueue.push(message.content.text, enhancedMode); - }); - let thinking = false; - session.keepAlive(thinking, 'remote'); - // Periodic keep-alive; store handle so we can clear on exit - const keepAliveInterval = setInterval(() => { - session.keepAlive(thinking, 'remote'); - }, 2000); - - const sendReady = () => { - session.sendSessionEvent({ type: 'ready' }); - try { - api.push().sendToAllDevices( - "It's ready!", - 'Codex is waiting for your command', - { sessionId: session.sessionId } - ); - } catch (pushError) { - logger.debug('[Codex] Failed to send ready push', pushError); - } - }; + // Start Happy MCP server + const happyServer = await startHappyServer(session); + logger.debug(`[START] Happy MCP server started at ${happyServer.url}`); - // Debug helper: log active handles/requests if DEBUG is enabled - function logActiveHandles(tag: string) { - if (!process.env.DEBUG) return; - const anyProc: any = process as any; - const handles = typeof anyProc._getActiveHandles === 'function' ? anyProc._getActiveHandles() : []; - const requests = typeof anyProc._getActiveRequests === 'function' ? anyProc._getActiveRequests() : []; - logger.debug(`[codex][handles] ${tag}: handles=${handles.length} requests=${requests.length}`); - try { - const kinds = handles.map((h: any) => (h && h.constructor ? h.constructor.name : typeof h)); - logger.debug(`[codex][handles] kinds=${JSON.stringify(kinds)}`); - } catch { } - } + // Build MCP servers for Codex + const bridgeCommand = join(projectPath(), 'bin', 'happy-mcp.mjs'); + const mcpServers = { + happy: { + command: bridgeCommand, + args: ['--url', happyServer.url], + }, + } as const; - // - // Abort handling - // IMPORTANT: There are two different operations: - // 1. Abort (handleAbort): Stops the current inference/task but keeps the session alive - // - Used by the 'abort' RPC from mobile app - // - Similar to Claude Code's abort behavior - // - Allows continuing with new prompts after aborting - // 2. Kill (handleKillSession): Terminates the entire process - // - Used by the 'killSession' RPC - // - Completely exits the CLI process - // - - let abortController = new AbortController(); - let shouldExit = false; - let storedSessionIdForResume: string | null = null; - - /** - * Handles aborting the current task/inference without exiting the process. - * This is the equivalent of Claude Code's abort - it stops what's currently - * happening but keeps the session alive for new prompts. - */ - async function handleAbort() { - logger.debug('[Codex] Abort requested - stopping current task'); - try { - // Store the current session ID before aborting for potential resume - if (client.hasActiveSession()) { - storedSessionIdForResume = client.storeSessionForResume(); - logger.debug('[Codex] Stored session for resume:', storedSessionIdForResume); - } - - abortController.abort(); - reasoningProcessor.abort(); - logger.debug('[Codex] Abort completed - session remains active'); - } catch (error) { - logger.debug('[Codex] Error during abort:', error); - } finally { - abortController = new AbortController(); - } + // Start caffeinate to prevent sleep on macOS + const caffeinateStarted = startCaffeinate(); + if (caffeinateStarted) { + logger.infoDeveloper('Sleep prevention enabled (macOS)'); } - /** - * Handles session termination and process exit. - * This is called when the session needs to be completely killed (not just aborted). - * Abort stops the current inference but keeps the session alive. - * Kill terminates the entire process. - */ - const handleKillSession = async () => { - logger.debug('[Codex] Kill session requested - terminating process'); - await handleAbort(); - logger.debug('[Codex] Abort completed, proceeding with termination'); + // Message queue for remote prompts + const messageQueue = new MessageQueue2((mode) => + hashObject({ + permissionMode: mode.permissionMode, + model: mode.model, + }) + ); - try { - // Update lifecycle state to archived before closing - if (session) { - session.updateMetadata((currentMetadata) => ({ - ...currentMetadata, - lifecycleState: 'archived', - lifecycleStateSince: Date.now(), - archivedBy: 'cli', - archiveReason: 'User terminated' - })); - - // Send session death message - session.sendSessionDeath(); - await session.flush(); - await session.close(); + // Forward messages to queue + let currentPermissionMode: PermissionMode | undefined = undefined; + let currentModel: string | undefined = undefined; + const attachMessageHandler = (currentSession: ApiSessionClient) => { + currentSession.onUserMessage((message) => { + let messagePermissionMode = currentPermissionMode; + if (message.meta?.permissionMode) { + const validModes: PermissionMode[] = ['default', 'read-only', 'safe-yolo', 'yolo']; + if (validModes.includes(message.meta.permissionMode as PermissionMode)) { + messagePermissionMode = message.meta.permissionMode as PermissionMode; + currentPermissionMode = messagePermissionMode; + logger.debug(`[Codex] Permission mode updated from user message to: ${currentPermissionMode}`); + } else { + logger.debug(`[Codex] Invalid permission mode received: ${message.meta.permissionMode}`); + } + } else { + logger.debug(`[Codex] User message received with no permission mode override, using current: ${currentPermissionMode ?? 'default (effective)'}`); } - // Force close Codex transport (best-effort) so we don't leave stray processes - try { - await client.forceCloseSession(); - } catch (e) { - logger.debug('[Codex] Error while force closing Codex session during termination', e); + let messageModel = currentModel; + if (message.meta?.hasOwnProperty('model')) { + messageModel = message.meta.model || undefined; + currentModel = messageModel; + logger.debug(`[Codex] Model updated from user message: ${messageModel || 'reset to default'}`); + } else { + logger.debug(`[Codex] User message received with no model override, using current: ${currentModel || 'default'}`); } - // Stop caffeinate - stopCaffeinate(); + const enhancedMode: CodexMode = { + permissionMode: messagePermissionMode || 'default', + model: messageModel, + }; + messageQueue.push(message.content.text, enhancedMode); + }); + }; - // Stop Happy MCP server - happyServer.stop(); + attachMessageHandler(session); + sessionController.onSessionSwap((nextSession) => { + attachMessageHandler(nextSession); + }); - logger.debug('[Codex] Session termination complete, exiting'); - process.exit(0); - } catch (error) { - logger.debug('[Codex] Error during session termination:', error); - process.exit(1); - } + // Keep-alive tracking + let thinking = false; + const sendKeepAlive = () => sessionController.getSession().keepAlive(thinking, mode); + sendKeepAlive(); + const keepAliveInterval = setInterval(() => sendKeepAlive(), 2000); + + const onModeChange = (newMode: 'local' | 'remote') => { + mode = newMode; + const activeSession = sessionController.getSession(); + activeSession.sendSessionEvent({ type: 'switch', mode: newMode }); + activeSession.updateAgentState((currentState) => ({ + ...currentState, + controlledByUser: newMode === 'local', + })); + sendKeepAlive(); }; - // Register abort handler - session.rpcHandlerManager.registerHandler('abort', handleAbort); - - registerKillSessionHandler(session.rpcHandlerManager, handleKillSession); - - // - // Initialize Ink UI - // - - const messageBuffer = new MessageBuffer(); - const hasTTY = process.stdout.isTTY && process.stdin.isTTY; - let inkInstance: any = null; - - if (hasTTY) { - console.clear(); - inkInstance = render(React.createElement(CodexDisplay, { - messageBuffer, - logPath: process.env.DEBUG ? logger.getLogPath() : undefined, - onExit: async () => { - // Exit the agent - logger.debug('[codex]: Exiting agent via Ctrl-C'); - shouldExit = true; - await handleAbort(); - } - }), { - exitOnCtrlC: false, - patchConsole: false - }); - } + const onThinkingChange = (isThinking: boolean) => { + thinking = isThinking; + sendKeepAlive(); + }; - if (hasTTY) { - process.stdin.resume(); - if (process.stdin.isTTY) { - process.stdin.setRawMode(true); + let shuttingDown = false; + const cleanup = async () => { + if (shuttingDown) { + return; } - process.stdin.setEncoding("utf8"); - } - - // - // Start Context - // - - const client = new CodexMcpClient(); - - // Helper: find Codex session transcript for a given sessionId - function findCodexResumeFile(sessionId: string | null): string | null { - if (!sessionId) return null; + shuttingDown = true; + const forceExitTimer = setTimeout(() => { + logger.debug('[CODEX] Force exit after shutdown timeout'); + process.exit(0); + }, 2000); try { - const codexHomeDir = process.env.CODEX_HOME || join(os.homedir(), '.codex'); - const rootDir = join(codexHomeDir, 'sessions'); - - // Recursively collect all files under the sessions directory - function collectFilesRecursive(dir: string, acc: string[] = []): string[] { - let entries: fs.Dirent[]; - try { - entries = fs.readdirSync(dir, { withFileTypes: true }); - } catch { - return acc; - } - for (const entry of entries) { - const full = join(dir, entry.name); - if (entry.isDirectory()) { - collectFilesRecursive(full, acc); - } else if (entry.isFile()) { - acc.push(full); - } - } - return acc; - } - - const candidates = collectFilesRecursive(rootDir) - .filter(full => full.endsWith(`-${sessionId}.jsonl`)) - .filter(full => { - try { return fs.statSync(full).isFile(); } catch { return false; } - }) - .sort((a, b) => { - const sa = fs.statSync(a).mtimeMs; - const sb = fs.statSync(b).mtimeMs; - return sb - sa; // newest first - }); - return candidates[0] || null; - } catch { - return null; - } - } - permissionHandler = new CodexPermissionHandler(session); - const reasoningProcessor = new ReasoningProcessor((message) => { - // Callback to send messages directly from the processor - session.sendCodexMessage(message); - }); - const diffProcessor = new DiffProcessor((message) => { - // Callback to send messages directly from the processor - session.sendCodexMessage(message); - }); - client.setPermissionHandler(permissionHandler); - client.setHandler((msg) => { - logger.debug(`[Codex] MCP message: ${JSON.stringify(msg)}`); - - // Add messages to the ink UI buffer based on message type - if (msg.type === 'agent_message') { - messageBuffer.addMessage(msg.message, 'assistant'); - } else if (msg.type === 'agent_reasoning_delta') { - // Skip reasoning deltas in the UI to reduce noise - } else if (msg.type === 'agent_reasoning') { - messageBuffer.addMessage(`[Thinking] ${msg.text.substring(0, 100)}...`, 'system'); - } else if (msg.type === 'exec_command_begin') { - messageBuffer.addMessage(`Executing: ${msg.command}`, 'tool'); - } else if (msg.type === 'exec_command_end') { - const output = msg.output || msg.error || 'Command completed'; - const truncatedOutput = output.substring(0, 200); - messageBuffer.addMessage( - `Result: ${truncatedOutput}${output.length > 200 ? '...' : ''}`, - 'result' - ); - } else if (msg.type === 'task_started') { - messageBuffer.addMessage('Starting task...', 'status'); - } else if (msg.type === 'task_complete') { - messageBuffer.addMessage('Task completed', 'status'); - sendReady(); - } else if (msg.type === 'turn_aborted') { - messageBuffer.addMessage('Turn aborted', 'status'); - sendReady(); - } - - if (msg.type === 'task_started') { - if (!thinking) { - logger.debug('thinking started'); - thinking = true; - session.keepAlive(thinking, 'remote'); - } - } - if (msg.type === 'task_complete' || msg.type === 'turn_aborted') { - if (thinking) { - logger.debug('thinking completed'); - thinking = false; - session.keepAlive(thinking, 'remote'); - } - // Reset diff processor on task end or abort - diffProcessor.reset(); - } - if (msg.type === 'agent_reasoning_section_break') { - // Reset reasoning processor for new section - reasoningProcessor.handleSectionBreak(); - } - if (msg.type === 'agent_reasoning_delta') { - // Process reasoning delta - tool calls are sent automatically via callback - reasoningProcessor.processDelta(msg.delta); - } - if (msg.type === 'agent_reasoning') { - // Complete the reasoning section - tool results or reasoning messages sent via callback - reasoningProcessor.complete(msg.text); - } - if (msg.type === 'agent_message') { - session.sendCodexMessage({ - type: 'message', - message: msg.message, - id: randomUUID() - }); - } - if (msg.type === 'exec_command_begin' || msg.type === 'exec_approval_request') { - let { call_id, type, ...inputs } = msg; - session.sendCodexMessage({ - type: 'tool-call', - name: 'CodexBash', - callId: call_id, - input: inputs, - id: randomUUID() - }); - } - if (msg.type === 'exec_command_end') { - let { call_id, type, ...output } = msg; - session.sendCodexMessage({ - type: 'tool-call-result', - callId: call_id, - output: output, - id: randomUUID() - }); - } - if (msg.type === 'token_count') { - session.sendCodexMessage({ - ...msg, - id: randomUUID() - }); - } - if (msg.type === 'patch_apply_begin') { - // Handle the start of a patch operation - let { call_id, auto_approved, changes } = msg; - - // Add UI feedback for patch operation - const changeCount = Object.keys(changes).length; - const filesMsg = changeCount === 1 ? '1 file' : `${changeCount} files`; - messageBuffer.addMessage(`Modifying ${filesMsg}...`, 'tool'); - - // Send tool call message - session.sendCodexMessage({ - type: 'tool-call', - name: 'CodexPatch', - callId: call_id, - input: { - auto_approved, - changes - }, - id: randomUUID() - }); - } - if (msg.type === 'patch_apply_end') { - // Handle the end of a patch operation - let { call_id, stdout, stderr, success } = msg; - - // Add UI feedback for completion - if (success) { - const message = stdout || 'Files modified successfully'; - messageBuffer.addMessage(message.substring(0, 200), 'result'); - } else { - const errorMsg = stderr || 'Failed to modify files'; - messageBuffer.addMessage(`Error: ${errorMsg.substring(0, 200)}`, 'result'); - } - - // Send tool call result message - session.sendCodexMessage({ - type: 'tool-call-result', - callId: call_id, - output: { - stdout, - stderr, - success - }, - id: randomUUID() - }); - } - if (msg.type === 'turn_diff') { - // Handle turn_diff messages and track unified_diff changes - if (msg.unified_diff) { - diffProcessor.processDiff(msg.unified_diff); - } + const activeSession = sessionController.getSession(); + activeSession.sendSessionDeath(); + await activeSession.flush(); + await activeSession.close(); + } catch (error) { + logger.debug('[CODEX] Error during cleanup:', error); } - }); - // Start Happy MCP server (HTTP) and prepare STDIO bridge config for Codex - const happyServer = await startHappyServer(session); - const bridgeCommand = join(projectPath(), 'bin', 'happy-mcp.mjs'); - const mcpServers = { - happy: { - command: bridgeCommand, - args: ['--url', happyServer.url] + if (reconnectionHandle) { + reconnectionHandle.cancel(); } - } as const; - let first = true; - - try { - logger.debug('[codex]: client.connect begin'); - await client.connect(); - logger.debug('[codex]: client.connect done'); - let wasCreated = false; - let currentModeHash: string | null = null; - let pending: { message: string; mode: EnhancedMode; isolate: boolean; hash: string } | null = null; - // If we restart (e.g., mode change), use this to carry a resume file - let nextExperimentalResume: string | null = null; - - while (!shouldExit) { - logActiveHandles('loop-top'); - // Get next batch; respect mode boundaries like Claude - let message: { message: string; mode: EnhancedMode; isolate: boolean; hash: string } | null = pending; - pending = null; - if (!message) { - // Capture the current signal to distinguish idle-abort from queue close - const waitSignal = abortController.signal; - const batch = await messageQueue.waitForMessagesAndGetAsString(waitSignal); - if (!batch) { - // If wait was aborted (e.g., remote abort with no active inference), ignore and continue - if (waitSignal.aborted && !shouldExit) { - logger.debug('[codex]: Wait aborted while idle; ignoring and continuing'); - continue; - } - logger.debug(`[codex]: batch=${!!batch}, shouldExit=${shouldExit}`); - break; - } - message = batch; - } - - // Defensive check for TS narrowing - if (!message) { - break; - } + stopCaffeinate(); + happyServer.stop(); + clearInterval(keepAliveInterval); - // If a session exists and mode changed, restart on next iteration - if (wasCreated && currentModeHash && message.hash !== currentModeHash) { - logger.debug('[Codex] Mode changed – restarting Codex session'); - messageBuffer.addMessage('═'.repeat(40), 'status'); - messageBuffer.addMessage('Starting new Codex session (mode changed)...', 'status'); - // Capture previous sessionId and try to find its transcript to resume - try { - const prevSessionId = client.getSessionId(); - nextExperimentalResume = findCodexResumeFile(prevSessionId); - if (nextExperimentalResume) { - logger.debug(`[Codex] Found resume file for session ${prevSessionId}: ${nextExperimentalResume}`); - messageBuffer.addMessage('Resuming previous context…', 'status'); - } else { - logger.debug('[Codex] No resume file found for previous session'); - } - } catch (e) { - logger.debug('[Codex] Error while searching resume file', e); - } - client.clearSession(); - wasCreated = false; - currentModeHash = null; - pending = message; - // Reset processors/permissions like end-of-turn cleanup - permissionHandler.reset(); - reasoningProcessor.abort(); - diffProcessor.reset(); - thinking = false; - session.keepAlive(thinking, 'remote'); - continue; - } + logger.debug('[CODEX] Cleanup complete, exiting'); + clearTimeout(forceExitTimer); + process.exit(0); + }; - // Display user messages in the UI - messageBuffer.addMessage(message.message, 'user'); - currentModeHash = message.hash; - - try { - // Map permission mode to approval policy and sandbox for startSession - const approvalPolicy = (() => { - switch (message.mode.permissionMode) { - // Codex native modes - case 'default': return 'untrusted' as const; // Ask for non-trusted commands - case 'read-only': return 'never' as const; // Never ask, read-only enforced by sandbox - case 'safe-yolo': return 'on-failure' as const; // Auto-run, ask only on failure - case 'yolo': return 'on-failure' as const; // Auto-run, ask only on failure - // Defensive fallback for Claude-specific modes (backward compatibility) - case 'bypassPermissions': return 'on-failure' as const; // Full access: map to yolo behavior - case 'acceptEdits': return 'on-request' as const; // Let model decide (closest to auto-approve edits) - case 'plan': return 'untrusted' as const; // Conservative: ask for non-trusted - default: return 'untrusted' as const; // Safe fallback - } - })(); - const sandbox = (() => { - switch (message.mode.permissionMode) { - // Codex native modes - case 'default': return 'workspace-write' as const; // Can write in workspace - case 'read-only': return 'read-only' as const; // Read-only filesystem - case 'safe-yolo': return 'workspace-write' as const; // Can write in workspace - case 'yolo': return 'danger-full-access' as const; // Full system access - // Defensive fallback for Claude-specific modes - case 'bypassPermissions': return 'danger-full-access' as const; // Full access: map to yolo - case 'acceptEdits': return 'workspace-write' as const; // Can edit files in workspace - case 'plan': return 'workspace-write' as const; // Can write for planning - default: return 'workspace-write' as const; // Safe default - } - })(); - - if (!wasCreated) { - const startConfig: CodexSessionConfig = { - prompt: first ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION : message.message, - sandbox, - 'approval-policy': approvalPolicy, - config: { mcp_servers: mcpServers } - }; - if (message.mode.model) { - startConfig.model = message.mode.model; - } - - // Check for resume file from multiple sources - let resumeFile: string | null = null; - - // Priority 1: Explicit resume file from mode change - if (nextExperimentalResume) { - resumeFile = nextExperimentalResume; - nextExperimentalResume = null; // consume once - logger.debug('[Codex] Using resume file from mode change:', resumeFile); - } - // Priority 2: Resume from stored abort session - else if (storedSessionIdForResume) { - const abortResumeFile = findCodexResumeFile(storedSessionIdForResume); - if (abortResumeFile) { - resumeFile = abortResumeFile; - logger.debug('[Codex] Using resume file from aborted session:', resumeFile); - messageBuffer.addMessage('Resuming from aborted session...', 'status'); - } - storedSessionIdForResume = null; // consume once - } - - // Apply resume file if found - if (resumeFile) { - (startConfig.config as any).experimental_resume = resumeFile; - } - - await client.startSession( - startConfig, - { signal: abortController.signal } - ); - wasCreated = true; - first = false; - } else { - const response = await client.continueSession( - message.message, - { signal: abortController.signal } - ); - logger.debug('[Codex] continueSession response:', response); - } - } catch (error) { - logger.warn('Error in codex session:', error); - const isAbortError = error instanceof Error && error.name === 'AbortError'; - - if (isAbortError) { - messageBuffer.addMessage('Aborted by user', 'status'); - session.sendSessionEvent({ type: 'message', message: 'Aborted by user' }); - // Abort cancels the current task/inference but keeps the Codex session alive. - // Do not clear session state here; the next user message should continue on the - // existing session if possible. - } else { - messageBuffer.addMessage('Process exited unexpectedly', 'status'); - session.sendSessionEvent({ type: 'message', message: 'Process exited unexpectedly' }); - // For unexpected exits, try to store session for potential recovery - if (client.hasActiveSession()) { - storedSessionIdForResume = client.storeSessionForResume(); - logger.debug('[Codex] Stored session after unexpected error:', storedSessionIdForResume); - } - } - } finally { - // Reset permission handler, reasoning processor, and diff processor - permissionHandler.reset(); - reasoningProcessor.abort(); // Use abort to properly finish any in-progress tool calls - diffProcessor.reset(); - thinking = false; - session.keepAlive(thinking, 'remote'); - emitReadyIfIdle({ - pending, - queueSize: () => messageQueue.size(), - shouldExit, - sendReady, - }); - logActiveHandles('after-turn'); - } - } + // Handle termination signals + process.on('SIGTERM', cleanup); + process.on('SIGINT', cleanup); - } finally { - // Clean up resources when main loop exits - logger.debug('[codex]: Final cleanup start'); - logActiveHandles('cleanup-start'); + process.on('uncaughtException', (error) => { + logger.debug('[CODEX] Uncaught exception:', error); + cleanup(); + }); - // Cancel offline reconnection if still running - if (reconnectionHandle) { - logger.debug('[codex]: Cancelling offline reconnection'); - reconnectionHandle.cancel(); - } + process.on('unhandledRejection', (reason) => { + logger.debug('[CODEX] Unhandled rejection:', reason); + cleanup(); + }); - try { - logger.debug('[codex]: sendSessionDeath'); - session.sendSessionDeath(); - logger.debug('[codex]: flush begin'); - await session.flush(); - logger.debug('[codex]: flush done'); - logger.debug('[codex]: session.close begin'); - await session.close(); - logger.debug('[codex]: session.close done'); - } catch (e) { - logger.debug('[codex]: Error while closing session', e); - } - logger.debug('[codex]: client.forceCloseSession begin'); - await client.forceCloseSession(); - logger.debug('[codex]: client.forceCloseSession done'); - // Stop Happy MCP server - logger.debug('[codex]: happyServer.stop'); - happyServer.stop(); + const attachKillHandler = (currentSession: ApiSessionClient) => { + registerKillSessionHandler(currentSession.rpcHandlerManager, cleanup); + }; + attachKillHandler(session); + sessionController.onSessionSwap((nextSession) => { + attachKillHandler(nextSession); + }); - // Clean up ink UI - if (process.stdin.isTTY) { - logger.debug('[codex]: setRawMode(false)'); - try { process.stdin.setRawMode(false); } catch { } - } - // Stop reading from stdin so the process can exit - if (hasTTY) { - logger.debug('[codex]: stdin.pause()'); - try { process.stdin.pause(); } catch { } - } - // Clear periodic keep-alive to avoid keeping event loop alive - logger.debug('[codex]: clearInterval(keepAlive)'); - clearInterval(keepAliveInterval); - if (inkInstance) { - logger.debug('[codex]: inkInstance.unmount()'); - inkInstance.unmount(); - } - messageBuffer.clear(); + // Start codex loop + await codexLoop({ + path: workingDirectory, + startingMode: mode, + resumeArgs: opts.resumeArgs, + resumeSessionId: resumeSessionId ?? undefined, + sessionTag, + sessionController, + api, + mcpServers, + messageQueue, + onModeChange, + onThinkingChange, + }); - logActiveHandles('cleanup-end'); - logger.debug('[codex]: Final cleanup completed'); - } + await cleanup(); } diff --git a/src/codex/sessionController.ts b/src/codex/sessionController.ts new file mode 100644 index 00000000..ac53f356 --- /dev/null +++ b/src/codex/sessionController.ts @@ -0,0 +1,6 @@ +import type { ApiSessionClient } from '@/api/apiSession'; + +export interface SessionController { + getSession: () => ApiSessionClient; + onSessionSwap: (listener: (session: ApiSessionClient) => void) => () => void; +} diff --git a/src/codex/types.ts b/src/codex/types.ts index efaffb67..65e7b395 100644 --- a/src/codex/types.ts +++ b/src/codex/types.ts @@ -6,6 +6,7 @@ export interface CodexSessionConfig { prompt: string; 'approval-policy'?: 'untrusted' | 'on-failure' | 'on-request' | 'never'; 'base-instructions'?: string; + 'resume-path'?: string; config?: Record; cwd?: string; 'include-plan-tool'?: boolean; diff --git a/src/codex/utils/codexAcp.ts b/src/codex/utils/codexAcp.ts new file mode 100644 index 00000000..361957b8 --- /dev/null +++ b/src/codex/utils/codexAcp.ts @@ -0,0 +1,58 @@ +import { logger } from '@/ui/logger'; +import type { ACPMessageData } from '@/api/apiSession'; + +export type CodexMessage = + | { type: 'message'; message: string; id?: string } + | { type: 'reasoning'; message: string; id?: string } + | { type: 'tool-call'; name: string; callId: string; input: unknown; id?: string } + | { type: 'tool-call-result'; callId: string; output: unknown; id?: string; isError?: boolean } + | { type: 'token_count'; [key: string]: unknown } + | { type: string; [key: string]: unknown }; + +export function codexMessageToAcp(message: CodexMessage): ACPMessageData | null { + switch (message.type) { + case 'message': + if (typeof message.message !== 'string') { + logger.debug('[codex] Missing message text; dropping message', { message }); + return null; + } + return { type: 'message', message: message.message }; + case 'reasoning': + if (typeof message.message !== 'string') { + logger.debug('[codex] Missing reasoning text; dropping message', { message }); + return null; + } + return { type: 'reasoning', message: message.message }; + case 'tool-call': { + if (typeof message.callId !== 'string' || typeof message.name !== 'string' || typeof message.id !== 'string') { + logger.debug('[codex] Missing tool-call fields; dropping message', { message }); + return null; + } + return { + type: 'tool-call', + callId: message.callId, + name: message.name, + input: message.input, + id: message.id, + }; + } + case 'tool-call-result': { + if (typeof message.callId !== 'string' || typeof message.id !== 'string') { + logger.debug('[codex] Missing tool-call-result callId; dropping message', { message }); + return null; + } + return { + type: 'tool-result', + callId: message.callId, + output: message.output, + id: message.id, + ...(typeof message.isError === 'boolean' ? { isError: message.isError } : {}), + }; + } + case 'token_count': + return message as ACPMessageData; + default: + logger.debug('[codex] Unsupported message type for ACP', { type: message.type }); + return null; + } +} diff --git a/src/codex/utils/codexSessionMap.ts b/src/codex/utils/codexSessionMap.ts new file mode 100644 index 00000000..7e2fe9c7 --- /dev/null +++ b/src/codex/utils/codexSessionMap.ts @@ -0,0 +1,74 @@ +import type { UUID } from 'node:crypto'; +import { existsSync } from 'node:fs'; +import { mkdir, readFile, rename, writeFile } from 'node:fs/promises'; +import { join } from 'node:path'; + +import { configuration } from '@/configuration'; +import { logger } from '@/ui/logger'; + +type CodexSessionTagMap = Record; + +const mapFile = join(configuration.happyHomeDir, 'codex-session-map.json'); +const tmpFile = `${mapFile}.tmp`; + +async function readMap(): Promise { + if (!existsSync(mapFile)) return {}; + + try { + const raw = await readFile(mapFile, 'utf8'); + const data = JSON.parse(raw); + if (!data || typeof data !== 'object') return {}; + return normalizeMap(data as Record); + } catch (error) { + logger.debug('[codex-session-map] Failed to read map, starting fresh', error); + return {}; + } +} + +const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +function isValidTag(value: unknown): value is UUID { + return typeof value === 'string' && uuidPattern.test(value); +} + +function normalizeMap(data: Record): CodexSessionTagMap { + const map: CodexSessionTagMap = {}; + for (const [key, value] of Object.entries(data)) { + if (isValidTag(value)) { + map[key] = value; + } + } + return map; +} + +async function writeMap(map: CodexSessionTagMap): Promise { + await mkdir(configuration.happyHomeDir, { recursive: true }); + await writeFile(tmpFile, JSON.stringify(map, null, 2)); + await rename(tmpFile, mapFile); +} + +export async function getHappySessionTagForCodexSession(sessionId: string): Promise { + const map = await readMap(); + return map[sessionId] ?? null; +} + +export async function ensureHappySessionTagForCodexSession(sessionId: string, tag: UUID): Promise { + const map = await readMap(); + const existing = map[sessionId]; + + if (existing) { + if (existing !== tag) { + logger.debug('[codex-session-map] Existing tag differs; keeping stored tag', { + sessionId, + existing, + attempted: tag, + }); + } + return existing; + } + + map[sessionId] = tag; + await writeMap(map); + logger.debug('[codex-session-map] Stored session tag mapping', { sessionId, tag }); + return tag; +} diff --git a/src/codex/utils/permissionHandler.ts b/src/codex/utils/permissionHandler.ts index 0720211d..e7a29402 100644 --- a/src/codex/utils/permissionHandler.ts +++ b/src/codex/utils/permissionHandler.ts @@ -41,6 +41,9 @@ export class CodexPermissionHandler extends BasePermissionHandler { input: unknown ): Promise { return new Promise((resolve, reject) => { + const timeoutMs = Number(process.env.HAPPY_PERMISSION_TIMEOUT_MS ?? 120_000); + const startedAt = Date.now(); + // Store the pending request this.pendingRequests.set(toolCallId, { resolve, @@ -53,6 +56,41 @@ export class CodexPermissionHandler extends BasePermissionHandler { this.addPendingRequestToState(toolCallId, toolName, input); logger.debug(`${this.getLogPrefix()} Permission request sent for tool: ${toolName} (${toolCallId})`); + + // Avoid deadlocks if the client-side permission response never arrives (e.g. mobile dialog bug). + if (Number.isFinite(timeoutMs) && timeoutMs > 0) { + setTimeout(() => { + const pending = this.pendingRequests.get(toolCallId); + if (!pending) return; + + this.pendingRequests.delete(toolCallId); + + const result: PermissionResult = { decision: 'abort' }; + pending.resolve(result); + + this.session.updateAgentState((currentState) => { + const request = currentState.requests?.[toolCallId]; + if (!request) return currentState; + + const { [toolCallId]: _, ...remainingRequests } = currentState.requests || {}; + return { + ...currentState, + requests: remainingRequests, + completedRequests: { + ...currentState.completedRequests, + [toolCallId]: { + ...request, + completedAt: Date.now(), + status: 'canceled', + reason: `Permission timed out after ${Math.max(0, Date.now() - startedAt)}ms`, + }, + }, + }; + }); + + logger.debug(`${this.getLogPrefix()} Permission timed out for ${toolName} (${toolCallId})`); + }, timeoutMs).unref?.(); + } }); } -} \ No newline at end of file +} diff --git a/src/codex/utils/ready.ts b/src/codex/utils/ready.ts new file mode 100644 index 00000000..1b70962e --- /dev/null +++ b/src/codex/utils/ready.ts @@ -0,0 +1,27 @@ +export type ReadyEventOptions = { + pending: unknown; + queueSize: () => number; + shouldExit: boolean; + sendReady: () => void; + notify?: () => void; +}; + +/** + * Notify connected clients when Codex finishes processing and the queue is idle. + * Returns true when a ready event was emitted. + */ +export function emitReadyIfIdle({ pending, queueSize, shouldExit, sendReady, notify }: ReadyEventOptions): boolean { + if (shouldExit) { + return false; + } + if (pending) { + return false; + } + if (queueSize() > 0) { + return false; + } + + sendReady(); + notify?.(); + return true; +} diff --git a/src/codex/utils/resume.ts b/src/codex/utils/resume.ts new file mode 100644 index 00000000..da2bf08e --- /dev/null +++ b/src/codex/utils/resume.ts @@ -0,0 +1,14 @@ +export function extractResumeSessionId(resumeArgs?: string[]): string | null { + if (!resumeArgs || resumeArgs.length === 0) return null; + const resumeIndex = resumeArgs.indexOf('--resume') !== -1 + ? resumeArgs.indexOf('--resume') + : resumeArgs.indexOf('resume'); + if (resumeIndex === -1) return null; + const candidate = resumeArgs[resumeIndex + 1]; + if (!candidate || candidate.startsWith('-')) return null; + const uuidMatch = candidate.match(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i); + if (uuidMatch) { + return uuidMatch[0]; + } + return null; +} diff --git a/src/codex/utils/resumePicker.ts b/src/codex/utils/resumePicker.ts new file mode 100644 index 00000000..989b8152 --- /dev/null +++ b/src/codex/utils/resumePicker.ts @@ -0,0 +1,102 @@ +import React from 'react'; +import { render } from 'ink'; + +import { CodexResumeSelector } from '@/ui/ink/CodexResumeSelector'; +import { listCodexResumeSessions, CodexResumeEntry } from './rolloutScanner'; + +function enterAltScreen(stdout: NodeJS.WriteStream): () => void { + if (!stdout.isTTY) return () => undefined; + + // Use the terminal alternate screen buffer so the picker feels like a full-screen TUI + // and we don't leave partially-rendered content in the scrollback. + stdout.write('\u001b[?1049h\u001b[2J\u001b[H'); + + const restore = () => { + try { + stdout.write('\u001b[?1049l\u001b[?25h'); + } catch { + // ignore + } + }; + + // Ensure we restore even if the process exits unexpectedly while the picker is up. + process.once('exit', restore); + + return restore; +} + +export async function selectCodexResumeSession(opts: { + workingDirectory: string; + allowAll?: boolean; + limit?: number; +}): Promise { + const entries = await listCodexResumeSessions({ + workingDirectory: opts.workingDirectory, + allowAll: opts.allowAll, + limit: opts.limit, + }); + + if (entries.length === 0) { + console.log('No saved Codex sessions found for this directory.'); + return null; + } + + return await new Promise((resolve) => { + let hasResolved = false; + const restoreScreen = enterAltScreen(process.stdout); + let app: ReturnType | null = null; + + const cleanupSignals: Array<() => void> = []; + const registerSignal = (signal: NodeJS.Signals, handler: () => void) => { + process.once(signal, handler); + cleanupSignals.push(() => process.removeListener(signal, handler)); + }; + + const cleanup = () => { + cleanupSignals.forEach((fn) => fn()); + if (app) { + app.unmount(); + } + restoreScreen(); + }; + + const onSelect = (entry: CodexResumeEntry) => { + if (hasResolved) return; + hasResolved = true; + cleanup(); + resolve(entry); + }; + + const onCancel = () => { + if (hasResolved) return; + hasResolved = true; + cleanup(); + resolve(null); + }; + + const onSignal = () => { + if (hasResolved) return; + hasResolved = true; + cleanup(); + resolve(null); + }; + + registerSignal('SIGINT', onSignal); + registerSignal('SIGTERM', onSignal); + + try { + app = render( + React.createElement(CodexResumeSelector, { + entries, + showAll: Boolean(opts.allowAll), + onSelect, + onCancel, + }), + { exitOnCtrlC: false, patchConsole: false } + ); + } catch (error) { + cleanup(); + throw error; + } + }); +} diff --git a/src/codex/utils/rolloutScanner.ts b/src/codex/utils/rolloutScanner.ts new file mode 100644 index 00000000..8a1a3706 --- /dev/null +++ b/src/codex/utils/rolloutScanner.ts @@ -0,0 +1,793 @@ +import os from 'node:os'; +import { join, resolve } from 'node:path'; +import { readdir, readFile, stat, open } from 'node:fs/promises'; +import { randomUUID } from 'node:crypto'; + +import { logger } from '@/ui/logger'; +import { InvalidateSync } from '@/utils/sync'; +import { startFileWatcher } from '@/modules/watcher/startFileWatcher'; +import { sanitizeInkText } from '@/utils/inkSanitize'; + +interface RolloutScannerOptions { + workingDirectory: string; + onCodexMessage: (message: any) => void; + allowAll?: boolean; + resumeSessionId?: string; + onActiveSessionFile?: (file: string, sessionId: string | undefined) => void; +} + +export interface CodexResumeEntry { + id: string; + preview: string; + updatedAt?: Date; + cwd?: string; + gitBranch?: string; + path: string; +} + +interface FileState { + offset: number; + buffer: string; + cwd?: string; + sessionId?: string; + accepted: boolean; +} + +export async function createCodexRolloutScanner(opts: RolloutScannerOptions) { + const codexHomeDir = process.env.CODEX_HOME || join(os.homedir(), '.codex'); + const sessionsDir = join(codexHomeDir, 'sessions'); + const normalizedCwd = resolve(opts.workingDirectory); + const startTimeMs = Date.now(); + + const fileStates = new Map(); + const watchers = new Map void>(); + + const sync = new InvalidateSync(async () => { + const files = await listJsonlFiles(sessionsDir); + + for (const file of files) { + if (!fileStates.has(file)) { + await initializeFile(file); + } + } + + for (const [file] of fileStates) { + await processFileUpdates(file); + } + + for (const file of files) { + if (!watchers.has(file)) { + watchers.set(file, startFileWatcher(file, () => sync.invalidate())); + } + } + }); + + await sync.invalidateAndAwait(); + const intervalId = setInterval(() => sync.invalidate(), 3000); + + return { + cleanup: async () => { + clearInterval(intervalId); + for (const stop of watchers.values()) { + stop(); + } + watchers.clear(); + await sync.invalidateAndAwait(); + sync.stop(); + }, + }; + + async function initializeFile(file: string) { + let state: FileState = { + offset: 0, + buffer: '', + accepted: false, + }; + + const contents = await readFileSafe(file); + if (contents === null) { + return; + } + + const { lines, trailing } = splitLines(contents); + state.buffer = trailing; + + for (const line of lines) { + handleLine(file, state, line, true); + } + + const fileStat = await statSafe(file); + if (fileStat) { + state.offset = fileStat.size; + } + + fileStates.set(file, state); + } + + async function processFileUpdates(file: string) { + const state = fileStates.get(file); + if (!state) return; + + const fileStat = await statSafe(file); + if (!fileStat) return; + + if (fileStat.size <= state.offset) { + return; + } + + const chunk = await readFromOffset(file, state.offset); + if (!chunk) return; + + state.offset = fileStat.size; + + const combined = state.buffer + chunk; + const { lines, trailing } = splitLines(combined); + state.buffer = trailing; + + for (const line of lines) { + handleLine(file, state, line, false); + } + } + + function handleLine(file: string, state: FileState, line: string, initial: boolean) { + if (!line.trim()) return; + + let record: any; + try { + record = JSON.parse(line); + } catch { + return; + } + + if (record?.type === 'session_meta') { + const cwd = record?.payload?.cwd ?? record?.payload?.meta?.cwd; + const sessionId = record?.payload?.id ?? record?.payload?.meta?.id; + if (cwd) { + state.cwd = cwd; + } + if (sessionId) { + state.sessionId = sessionId; + } + + const matchesCwd = opts.allowAll || (state.cwd && resolve(state.cwd) === normalizedCwd); + const matchesResume = opts.resumeSessionId && sessionId && sessionId === opts.resumeSessionId; + const isRecent = !opts.resumeSessionId && isRecentTimestamp(record?.timestamp ?? record?.payload?.timestamp); + + if (matchesResume || (matchesCwd && isRecent)) { + state.accepted = true; + logger.debug(`[codex-rollout] Tracking session ${state.sessionId ?? 'unknown'} at ${file}`); + opts.onActiveSessionFile?.(file, state.sessionId); + } else if (state.cwd) { + state.accepted = false; + } + return; + } + + if (!state.accepted) { + const matchesCwd = opts.allowAll || (state.cwd && resolve(state.cwd) === normalizedCwd); + if (matchesCwd && shouldEmitRecord(record)) { + state.accepted = true; + logger.debug(`[codex-rollout] Tracking session ${state.sessionId ?? 'unknown'} at ${file}`); + opts.onActiveSessionFile?.(file, state.sessionId); + } else { + return; + } + } + + if (initial && !shouldEmitRecord(record)) { + return; + } + + emitFromRecord(record); + } + + function shouldEmitRecord(record: any): boolean { + const timestamp = record?.timestamp; + if (!timestamp) return false; + const ts = Date.parse(timestamp); + if (Number.isNaN(ts)) return false; + return ts >= startTimeMs - 1000; + } + + function isRecentTimestamp(timestamp: string | undefined): boolean { + if (!timestamp) return false; + const ts = Date.parse(timestamp); + if (Number.isNaN(ts)) return false; + return ts >= startTimeMs - 1000; + } + + function emitFromRecord(record: any) { + if (!record || typeof record !== 'object') return; + + switch (record.type) { + case 'response_item': + handleResponseItem(record.payload); + return; + case 'event_msg': + handleEventMsg(record.payload); + return; + default: + return; + } + } + + function handleEventMsg(event: any) { + if (!event || typeof event !== 'object') return; + if (event.type === 'token_count') { + opts.onCodexMessage({ ...event, id: randomUUID() }); + } + } + + function handleResponseItem(item: any) { + if (!item || typeof item !== 'object') return; + + switch (item.type) { + case 'message': { + if (item.role !== 'assistant' || !Array.isArray(item.content)) { + return; + } + const text = extractText(item.content, true); + if (!text) return; + opts.onCodexMessage({ + type: 'message', + message: text, + id: randomUUID(), + }); + return; + } + case 'function_call': { + const callId = item.call_id; + if (!callId) return; + const input = parseJsonInput(item.arguments); + opts.onCodexMessage({ + type: 'tool-call', + name: item.name, + callId, + input, + id: randomUUID(), + }); + return; + } + case 'function_call_output': { + const callId = item.call_id; + if (!callId) return; + opts.onCodexMessage({ + type: 'tool-call-result', + callId, + output: item.output, + id: randomUUID(), + }); + return; + } + case 'custom_tool_call': { + const callId = item.call_id; + if (!callId) return; + const input = parseJsonInput(item.input); + opts.onCodexMessage({ + type: 'tool-call', + name: item.name, + callId, + input, + id: randomUUID(), + }); + return; + } + case 'custom_tool_call_output': { + const callId = item.call_id; + if (!callId) return; + opts.onCodexMessage({ + type: 'tool-call-result', + callId, + output: item.output, + id: randomUUID(), + }); + return; + } + case 'local_shell_call': { + const callId = item.call_id ?? randomUUID(); + const action = item.action; + if (action?.type !== 'exec') return; + opts.onCodexMessage({ + type: 'tool-call', + name: 'CodexBash', + callId, + input: { + command: action.command, + cwd: action.working_directory, + timeout_ms: action.timeout_ms, + env: action.env, + user: action.user, + }, + id: randomUUID(), + }); + return; + } + case 'web_search_call': { + opts.onCodexMessage({ + type: 'tool-call', + name: 'web_search', + callId: randomUUID(), + input: item.action, + id: randomUUID(), + }); + return; + } + default: + return; + } + } +} + +export async function findLatestCodexRolloutForCwd( + workingDirectory: string, + allowAll: boolean, + opts?: { preferMtime?: boolean } +): Promise { + const codexHomeDir = process.env.CODEX_HOME || join(os.homedir(), '.codex'); + const sessionsDir = join(codexHomeDir, 'sessions'); + const normalizedCwd = resolve(workingDirectory); + const files = await listJsonlFiles(sessionsDir); + + let best: { file: string; ts: number } | null = null; + + for (const file of files) { + const meta = await readSessionMeta(file); + if (!meta) continue; + if (!allowAll && meta.cwd && resolve(meta.cwd) !== normalizedCwd) { + continue; + } + const ts = opts?.preferMtime + ? (await statSafe(file))?.mtimeMs ?? 0 + : parseRolloutTimestamp(file) ?? (await statSafe(file))?.mtimeMs ?? 0; + if (!best || ts > best.ts) { + best = { file, ts }; + } + } + + return best?.file ?? null; +} + +export async function findSessionFileById(sessionId: string): Promise { + const codexHomeDir = process.env.CODEX_HOME || join(os.homedir(), '.codex'); + const sessionsDir = join(codexHomeDir, 'sessions'); + const files = await listJsonlFiles(sessionsDir); + + for (const file of files) { + const meta = await readSessionMeta(file); + if (meta?.id === sessionId) { + return file; + } + } + + return null; +} + +export async function listCodexResumeSessions(opts: { + workingDirectory: string; + allowAll?: boolean; + limit?: number; +}): Promise { + const codexHomeDir = process.env.CODEX_HOME || join(os.homedir(), '.codex'); + const sessionsDir = join(codexHomeDir, 'sessions'); + const normalizedCwd = resolve(opts.workingDirectory); + const files = await listJsonlFiles(sessionsDir); + + const scored = await Promise.all( + files.map(async (file) => { + const stats = await statSafe(file); + const parsedTs = parseRolloutTimestamp(file); + const ts = parsedTs ?? stats?.mtimeMs ?? 0; + return { file, ts, mtimeMs: stats?.mtimeMs ?? null }; + }) + ); + + scored.sort((a, b) => b.ts - a.ts); + + const limit = opts.limit ?? 200; + const entries: CodexResumeEntry[] = []; + + for (const candidate of scored) { + if (entries.length >= limit) break; + const summary = await readSessionSummary(candidate.file); + if (!summary?.id) continue; + // Match Codex's `list_threads`: only include sessions that have + // session metadata AND a user event within the head-record scan window. + if (!summary.sawSessionMeta || !summary.sawUserEvent) continue; + if (!opts.allowAll) { + if (!summary.cwd) continue; + if (resolve(summary.cwd) !== normalizedCwd) continue; + } + const updatedAt = summary.updatedAt + ?? (candidate.mtimeMs ? new Date(candidate.mtimeMs) : undefined) + ?? (candidate.ts ? new Date(candidate.ts) : undefined); + const preview = normalizePreview(summary.preview); + entries.push({ + id: summary.id, + preview: preview || '(no message yet)', + updatedAt, + cwd: summary.cwd, + gitBranch: summary.gitBranch, + path: candidate.file, + }); + } + + return entries; +} + +export async function readSessionMeta(file: string): Promise<{ id?: string; cwd?: string } | null> { + return readSessionMetaInternal(file); +} + +export async function buildRolloutHistoryPrompt(opts: { + file: string; + maxChars?: number; + maxMessages?: number; +}): Promise { + const maxChars = opts.maxChars ?? 8000; + const maxMessages = opts.maxMessages ?? 24; + const contents = await readFileSafe(opts.file); + if (!contents) return null; + + const { lines } = splitLines(contents); + const messages: { role: 'user' | 'assistant'; text: string }[] = []; + + for (const line of lines) { + if (!line.trim()) continue; + let record: any; + try { + record = JSON.parse(line); + } catch { + continue; + } + const itemType = record?.type; + if (itemType === 'response_item') { + const payload = record.payload; + if (payload?.type === 'message' && Array.isArray(payload.content)) { + const role = payload.role === 'user' ? 'user' : 'assistant'; + const text = extractText(payload.content, role === 'assistant'); + if (text) { + messages.push({ role, text }); + } + } else if (payload?.type === 'compacted' && payload.message) { + messages.push({ role: 'assistant', text: payload.message }); + } + } else if (itemType === 'event_msg') { + const payload = record.payload; + if (payload?.type === 'user_message' && payload.message) { + messages.push({ role: 'user', text: payload.message }); + } else if (payload?.type === 'agent_message' && payload.message) { + messages.push({ role: 'assistant', text: payload.message }); + } + } + } + + if (messages.length === 0) return null; + + const trimmed = messages.slice(-maxMessages); + let body = ''; + for (const message of trimmed) { + const label = message.role === 'user' ? 'User' : 'Assistant'; + body += `${label}: ${message.text.trim()}\n`; + if (body.length > maxChars) { + body = body.slice(-maxChars); + break; + } + } + + if (!body.trim()) return null; + + return `Conversation so far:\\n${body.trim()}\\n\\nNew user message:\\n`; +} + +async function listJsonlFiles(root: string): Promise { + let entries: string[] = []; + let dirents: import('node:fs').Dirent[]; + try { + dirents = await readdir(root, { withFileTypes: true }) as import('node:fs').Dirent[]; + } catch { + return entries; + } + + for (const dirent of dirents) { + const full = join(root, dirent.name); + if (dirent.isDirectory()) { + entries = entries.concat(await listJsonlFiles(full)); + } else if (dirent.isFile() && full.endsWith('.jsonl')) { + entries.push(full); + } + } + + return entries; +} + +function splitLines(text: string): { lines: string[]; trailing: string } { + const parts = text.split('\n'); + let trailing = ''; + if (!text.endsWith('\n')) { + trailing = parts.pop() ?? ''; + } else if (parts.length && parts[parts.length - 1] === '') { + parts.pop(); + } + return { lines: parts, trailing }; +} + +function extractText(content: any[], preferOutput: boolean): string { + const texts: string[] = []; + for (const item of content) { + if (!item || typeof item !== 'object') continue; + if (preferOutput && item.type === 'output_text' && item.text) { + texts.push(item.text); + } else if (!preferOutput && item.type === 'input_text' && item.text) { + texts.push(item.text); + } + } + if (texts.length === 0 && preferOutput) { + for (const item of content) { + if (item?.text) { + texts.push(item.text); + } + } + } + return texts.join(''); +} + +function parseJsonInput(value: string): unknown { + if (typeof value !== 'string') { + return value; + } + try { + return JSON.parse(value); + } catch { + return value; + } +} + +async function readFileSafe(file: string): Promise { + try { + return await readFile(file, 'utf8'); + } catch { + return null; + } +} + +async function statSafe(file: string) { + try { + return await stat(file); + } catch { + return null; + } +} + +async function readFromOffset(file: string, offset: number): Promise { + let handle; + try { + handle = await open(file, 'r'); + const { size } = await handle.stat(); + if (size <= offset) { + return ''; + } + const length = size - offset; + const buffer = Buffer.alloc(length); + await handle.read(buffer, 0, length, offset); + return buffer.toString('utf8'); + } catch { + return null; + } finally { + await handle?.close().catch(() => undefined); + } +} + +async function readSessionMetaInternal(file: string): Promise<{ id?: string; cwd?: string } | null> { + const contents = await readFileSafe(file); + if (!contents) return null; + const { lines } = splitLines(contents); + for (const line of lines) { + if (!line.trim()) continue; + try { + const record = JSON.parse(line); + if (record?.type === 'session_meta') { + const payload = record?.payload ?? {}; + return { + id: payload?.id ?? payload?.meta?.id, + cwd: payload?.cwd ?? payload?.meta?.cwd, + }; + } + } catch { + continue; + } + } + return null; +} + +async function readSessionSummary(file: string): Promise<{ + id?: string; + cwd?: string; + gitBranch?: string; + preview?: string; + updatedAt?: Date; + sawSessionMeta: boolean; + sawUserEvent: boolean; +}> { + const head = await readHeadBytes(file, 1024 * 1024); + if (!head) return { sawSessionMeta: false, sawUserEvent: false }; + const { lines } = splitLines(head); + let id: string | undefined; + let cwd: string | undefined; + let gitBranch: string | undefined; + let preview: string | undefined; + let sawSessionMeta = false; + let sawUserEvent = false; + + // Mirror Codex's head scan behavior: only consider the first N JSONL records. + // Codex uses this to decide which rollouts are "real" resumable threads. + const headRecords = parseHeadRecords(lines, 10); + + for (const record of headRecords) { + if (record?.type === 'session_meta') { + const payload = record?.payload ?? {}; + const meta = payload?.meta ?? {}; + id = payload?.id ?? meta?.id ?? id; + cwd = payload?.cwd ?? meta?.cwd ?? cwd; + gitBranch = payload?.git?.branch ?? meta?.git?.branch ?? gitBranch; + sawSessionMeta = true; + continue; + } + + // Codex considers a rollout valid if it sees a user event in the head scan. + // Codex's `list_threads` uses the presence of a `user_message` event (not a `response_item`). + if (record?.type === 'event_msg' && record?.payload?.type === 'user_message') { + sawUserEvent = true; + } + } + + // Match Codex's picker: use the first meaningful user input as the preview. + // Skip AGENTS.md bootstrap and other non-user prompts like . + preview = readHeadPreviewMessageFromRecords(headRecords); + + return { id, cwd, gitBranch, preview, sawSessionMeta, sawUserEvent }; +} + +function parseRolloutTimestamp(file: string): number | null { + const match = file.match(/rollout-(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})-/); + if (!match) return null; + const raw = match[1]; + const iso = raw.replace(/-(\d{2})-(\d{2})$/, ':$1:$2'); + const date = new Date(`${iso}Z`); + const ts = date.getTime(); + return Number.isNaN(ts) ? null : ts; +} + +async function readHeadBytes(file: string, maxBytes: number): Promise { + let handle; + try { + handle = await open(file, 'r'); + const stats = await handle.stat(); + if (stats.size <= 0) return ''; + const length = Math.min(stats.size, maxBytes); + const buffer = Buffer.alloc(length); + await handle.read(buffer, 0, length, 0); + return buffer.toString('utf8'); + } catch { + return null; + } finally { + await handle?.close().catch(() => undefined); + } +} + +async function readTailBytes(file: string, maxBytes: number): Promise { + let handle; + try { + handle = await open(file, 'r'); + const stats = await handle.stat(); + if (stats.size <= 0) return ''; + const length = Math.min(stats.size, maxBytes); + const offset = Math.max(0, stats.size - length); + const buffer = Buffer.alloc(length); + await handle.read(buffer, 0, length, offset); + return buffer.toString('utf8'); + } catch { + return null; + } finally { + await handle?.close().catch(() => undefined); + } +} + +function looksLikeAgentsBootstrap(text: string): boolean { + const trimmed = text.trimStart(); + return ( + trimmed.startsWith('# AGENTS.md instructions') || + trimmed.startsWith('AGENTS.md instructions') || + trimmed.includes('') + ); +} + +function looksLikeEnvironmentBootstrap(text: string): boolean { + const trimmed = text.trimStart(); + return trimmed.startsWith(''); +} + +function readHeadPreviewMessage(lines: string[]): string | undefined { + for (const line of lines) { + if (!line.trim()) continue; + let record: any; + try { + record = JSON.parse(line); + } catch { + continue; + } + + const itemType = record?.type; + if (itemType === 'response_item') { + const payload = record?.payload; + if (payload?.type === 'message' && payload?.role === 'user' && Array.isArray(payload?.content)) { + const raw = extractText(payload.content, false); + const normalized = normalizePreview(raw); + if (!normalized) continue; + if (looksLikeAgentsBootstrap(normalized)) continue; + if (looksLikeEnvironmentBootstrap(normalized)) continue; + return normalized; + } + } else if (itemType === 'event_msg') { + const payload = record?.payload; + if (payload?.type === 'user_message' && typeof payload?.message === 'string') { + const normalized = normalizePreview(payload.message); + if (!normalized) continue; + if (looksLikeAgentsBootstrap(normalized)) continue; + if (looksLikeEnvironmentBootstrap(normalized)) continue; + return normalized; + } + } + } + + return undefined; +} + +function parseHeadRecords(lines: string[], maxRecords: number): any[] { + const records: any[] = []; + for (const line of lines) { + if (records.length >= maxRecords) break; + if (!line.trim()) continue; + try { + records.push(JSON.parse(line)); + } catch { + continue; + } + } + return records; +} + +function readHeadPreviewMessageFromRecords(records: any[]): string | undefined { + for (const record of records) { + const itemType = record?.type; + if (itemType === 'response_item') { + const payload = record?.payload; + if (payload?.type === 'message' && payload?.role === 'user' && Array.isArray(payload?.content)) { + const raw = extractText(payload.content, false); + const normalized = normalizePreview(raw); + if (!normalized) continue; + if (looksLikeAgentsBootstrap(normalized)) continue; + if (looksLikeEnvironmentBootstrap(normalized)) continue; + return normalized; + } + } else if (itemType === 'event_msg') { + const payload = record?.payload; + if (payload?.type === 'user_message' && typeof payload?.message === 'string') { + const normalized = normalizePreview(payload.message); + if (!normalized) continue; + if (looksLikeAgentsBootstrap(normalized)) continue; + if (looksLikeEnvironmentBootstrap(normalized)) continue; + return normalized; + } + } + } + + return undefined; +} + +function normalizePreview(text?: string): string | undefined { + if (!text) return undefined; + const sanitized = sanitizeInkText(text); + return sanitized || undefined; +} + +// stripAnsiAndControls replaced by sanitizeInkText diff --git a/src/index.ts b/src/index.ts index c7ec6b15..19085726 100644 --- a/src/index.ts +++ b/src/index.ts @@ -88,16 +88,65 @@ import { execFileSync } from 'node:child_process' // Parse startedBy argument let startedBy: 'daemon' | 'terminal' | undefined = undefined; + let resumeArgs: string[] | undefined = undefined; + let startingMode: 'local' | 'remote' | undefined = undefined; + + // Support `happy codex resume [args...]` as a passthrough to `codex resume` + if (args[1] === 'resume') { + resumeArgs = ['resume', ...args.slice(2)]; + } + + // Support `happy codex --resume [session|latest]` as alias for (let i = 1; i < args.length; i++) { if (args[i] === '--started-by') { startedBy = args[++i] as 'daemon' | 'terminal'; } + if (args[i] === '--starting-mode') { + const next = args[++i]; + if (next === 'local' || next === 'remote') { + startingMode = next; + } + } + if (args[i] === '--resume' && !resumeArgs) { + const next = args[i + 1]; + if (next && !next.startsWith('--')) { + if (next === 'latest') { + resumeArgs = ['resume', '--last']; + } else { + resumeArgs = ['resume', next]; + } + i += 1; + } else { + resumeArgs = ['resume']; + } + } + } + + if (resumeArgs?.[0] === 'resume') { + const { extractResumeSessionId } = await import('@/codex/utils/resume'); + const resumeSessionId = extractResumeSessionId(resumeArgs); + const wantsLast = resumeArgs.includes('--last'); + const allowAll = resumeArgs.includes('--all'); + if (resumeArgs.includes('--all')) { + resumeArgs = resumeArgs.filter((arg) => arg !== '--all'); + } + if (!resumeSessionId && !wantsLast) { + const { selectCodexResumeSession } = await import('@/codex/utils/resumePicker'); + const selection = await selectCodexResumeSession({ + workingDirectory: process.cwd(), + allowAll, + }); + if (!selection) { + return; + } + resumeArgs = ['resume', selection.id]; + } } const { credentials } = await authAndSetupMachineIfNeeded(); - await runCodex({credentials, startedBy}); + await runCodex({ credentials, startedBy, resumeArgs, startingMode }); // Do not force exit here; allow instrumentation to show lingering handles } catch (error) { console.error(chalk.red('Error:'), error instanceof Error ? error.message : 'Unknown error') diff --git a/src/ui/ink/CodexDisplay.tsx b/src/ui/ink/CodexDisplay.tsx index f05f4b66..0528d34f 100644 --- a/src/ui/ink/CodexDisplay.tsx +++ b/src/ui/ink/CodexDisplay.tsx @@ -6,12 +6,16 @@ interface CodexDisplayProps { messageBuffer: MessageBuffer logPath?: string onExit?: () => void + mode: 'local' | 'remote' + onSubmit?: (message: string) => void + onRequestSwitch?: (mode: 'local' | 'remote') => void } -export const CodexDisplay: React.FC = ({ messageBuffer, logPath, onExit }) => { +export const CodexDisplay: React.FC = ({ messageBuffer, logPath, onExit, mode, onSubmit, onRequestSwitch }) => { const [messages, setMessages] = useState([]) const [confirmationMode, setConfirmationMode] = useState(false) const [actionInProgress, setActionInProgress] = useState(false) + const [inputBuffer, setInputBuffer] = useState('') const confirmationTimeoutRef = useRef(null) const { stdout } = useStdout() const terminalWidth = stdout.columns || 80 @@ -74,6 +78,37 @@ export const CodexDisplay: React.FC = ({ messageBuffer, logPa if (confirmationMode) { resetConfirmation() } + + // Allow space to request switch to local when in remote mode + if (mode === 'remote' && input === ' ') { + onRequestSwitch?.('local'); + return; + } + + // Local input handling + if (mode === 'local') { + if (key.return) { + const trimmed = inputBuffer.trim() + if (trimmed.length > 0) { + if (trimmed === '/remote') { + onRequestSwitch?.('remote') + } else if (trimmed === '/local') { + onRequestSwitch?.('local') + } else { + onSubmit?.(inputBuffer) + } + } + setInputBuffer('') + return + } + if (key.backspace || key.delete) { + setInputBuffer((prev) => prev.slice(0, -1)) + return + } + if (input) { + setInputBuffer((prev) => prev + input) + } + } }, [confirmationMode, actionInProgress, onExit, setConfirmationWithTimeout, resetConfirmation])) const getMessageColor = (type: BufferedMessage['type']): string => { @@ -162,8 +197,16 @@ export const CodexDisplay: React.FC = ({ messageBuffer, logPa 🤖 Codex Agent Running • Ctrl-C to exit + + Control: {mode === 'local' ? 'Local' : 'Remote'} + )} + {mode === 'local' && ( + + > {inputBuffer || ''} + + )} {process.env.DEBUG && logPath && ( Debug logs: {logPath} @@ -173,4 +216,4 @@ export const CodexDisplay: React.FC = ({ messageBuffer, logPa ) -} \ No newline at end of file +} diff --git a/src/ui/ink/CodexResumeSelector.tsx b/src/ui/ink/CodexResumeSelector.tsx new file mode 100644 index 00000000..51cc0482 --- /dev/null +++ b/src/ui/ink/CodexResumeSelector.tsx @@ -0,0 +1,178 @@ +import React, { useMemo, useState, useEffect } from 'react'; +import { Box, Text, useInput, useStdout } from 'ink'; + +import type { CodexResumeEntry } from '@/codex/utils/rolloutScanner'; +import { sanitizeInkText, truncateInkText } from '@/utils/inkSanitize'; + +interface CodexResumeSelectorProps { + entries: CodexResumeEntry[]; + showAll: boolean; + onSelect: (entry: CodexResumeEntry) => void; + onCancel: () => void; +} + +const MAX_PREVIEW = 120; + +export const CodexResumeSelector: React.FC = ({ + entries, + showAll, + onSelect, + onCancel, +}) => { + const [query, setQuery] = useState(''); + const [selectedIndex, setSelectedIndex] = useState(0); + const { stdout } = useStdout(); + + const safeEntries = useMemo(() => { + return entries.map((entry) => ({ + entry, + preview: sanitizeInkText(entry.preview ?? ''), + branch: sanitizeInkText(entry.gitBranch ?? '-'), + cwd: sanitizeInkText(entry.cwd ?? '-'), + id: sanitizeInkText(entry.id ?? ''), + })); + }, [entries]); + + const filtered = useMemo(() => { + const normalized = sanitizeInkText(query).toLowerCase(); + if (!normalized) return safeEntries; + return safeEntries.filter((item) => { + const haystack = [item.preview, item.branch, item.cwd, item.id] + .filter(Boolean) + .join(' ') + .toLowerCase(); + return haystack.includes(normalized); + }); + }, [safeEntries, query]); + + useEffect(() => { + if (selectedIndex >= filtered.length) { + setSelectedIndex(Math.max(0, filtered.length - 1)); + } + }, [filtered.length, selectedIndex]); + + useInput((input, key) => { + if (key.upArrow) { + setSelectedIndex((prev) => Math.max(0, prev - 1)); + return; + } + if (key.downArrow) { + setSelectedIndex((prev) => Math.min(filtered.length - 1, prev + 1)); + return; + } + if (key.return) { + const entry = filtered[selectedIndex]; + if (entry) { + onSelect(entry.entry); + } + return; + } + if (key.escape || (key.ctrl && input === 'c')) { + onCancel(); + return; + } + if (key.backspace || key.delete) { + setQuery((prev) => prev.slice(0, -1)); + setSelectedIndex(0); + return; + } + if (!key.ctrl && !key.meta && !key.shift && input && input.length === 1) { + setQuery((prev) => prev + input); + setSelectedIndex(0); + } + }); + + const rows = useMemo(() => { + return filtered.map((item) => { + const updated = formatRelativeTime(item.entry.updatedAt); + const preview = truncateInkText(item.preview, MAX_PREVIEW); + return { entry: item.entry, updated, branch: item.branch, cwd: item.cwd, preview }; + }); + }, [filtered]); + + const maxUpdated = Math.max('Updated'.length, ...rows.map((row) => row.updated.length)); + const maxBranch = Math.max('Branch'.length, ...rows.map((row) => row.branch.length)); + const maxCwd = Math.max('CWD'.length, ...rows.map((row) => row.cwd.length)); + const columns = stdout?.columns ?? null; + const maxPreviewWidth = useMemo(() => { + if (!columns) return MAX_PREVIEW; + // Row format: + // `${prefix} ${updated.padEnd(maxUpdated)} ${branch.padEnd(maxBranch)} ${cwd.padEnd(maxCwd)} ${preview}` + // so everything before preview consumes a fixed number of terminal columns. + const prefixAndSpaces = 2; // "> " + const betweenUpdatedAndBranch = 2; // two spaces + const betweenBranchAndCwdOrPreview = 1; // one space + const cwdSegment = showAll ? maxCwd + 1 : 0; // plus trailing space + const beforePreview = + prefixAndSpaces + + maxUpdated + + betweenUpdatedAndBranch + + maxBranch + + betweenBranchAndCwdOrPreview + + cwdSegment; + + // Leave at least a small preview so the UX isn't blank, and cap to a reasonable max. + return Math.min(MAX_PREVIEW, Math.max(10, columns - beforePreview)); + }, [columns, maxBranch, maxCwd, maxUpdated, showAll]); + + const totalRows = rows.length; + const usableRows = Math.max(5, (stdout?.rows ?? 24) - 6); + const start = Math.max( + 0, + Math.min(selectedIndex, Math.max(0, totalRows - usableRows)) + ); + const visible = rows.slice(start, start + usableRows); + + return ( + + Resume a previous session + {query ? `Search: ${sanitizeInkText(query)}` : 'Type to search'} + + + + {pad('Updated', maxUpdated)} {pad('Branch', maxBranch)}{' '} + {showAll ? `${pad('CWD', maxCwd)} ` : ''}Conversation + + {visible.length === 0 ? ( + No matching sessions. + ) : ( + visible.map((row, index) => { + const absoluteIndex = start + index; + const selected = absoluteIndex === selectedIndex; + const prefix = selected ? '>' : ' '; + const preview = truncateInkText(row.preview, maxPreviewWidth); + return ( + + {prefix} {pad(row.updated, maxUpdated)} {pad(row.branch, maxBranch)}{' '} + {showAll ? `${pad(row.cwd, maxCwd)} ` : ''}{preview} + + ); + }) + )} + + + + Up/Down to navigate, Enter to resume, Esc to cancel + + + ); +}; + +function pad(value: string, width: number): string { + if (value.length >= width) return value; + return value + ' '.repeat(width - value.length); +} + +function formatRelativeTime(date?: Date): string { + if (!date) return '-'; + const diffMs = Date.now() - date.getTime(); + const seconds = Math.max(0, Math.floor(diffMs / 1000)); + if (seconds < 60) return `${seconds} seconds ago`; + const minutes = Math.floor(seconds / 60); + if (minutes < 60) return `${minutes} minutes ago`; + const hours = Math.floor(minutes / 60); + if (hours < 24) return `${hours} hours ago`; + const days = Math.floor(hours / 24); + if (days < 30) return `${days} days ago`; + return date.toLocaleDateString('en-US', { month: 'short', day: 'numeric', year: 'numeric' }); +} diff --git a/src/ui/ink/RemoteModeDisplay.tsx b/src/ui/ink/RemoteModeDisplay.tsx index 2a569148..56fbdef3 100644 --- a/src/ui/ink/RemoteModeDisplay.tsx +++ b/src/ui/ink/RemoteModeDisplay.tsx @@ -7,9 +7,10 @@ interface RemoteModeDisplayProps { logPath?: string onExit?: () => void onSwitchToLocal?: () => void + title?: string } -export const RemoteModeDisplay: React.FC = ({ messageBuffer, logPath, onExit, onSwitchToLocal }) => { +export const RemoteModeDisplay: React.FC = ({ messageBuffer, logPath, onExit, onSwitchToLocal, title }) => { const [messages, setMessages] = useState([]) const [confirmationMode, setConfirmationMode] = useState<'exit' | 'switch' | null>(null) const [actionInProgress, setActionInProgress] = useState<'exiting' | 'switching' | null>(null) @@ -131,7 +132,7 @@ export const RemoteModeDisplay: React.FC = ({ messageBuf overflow="hidden" > - 📡 Remote Mode - Claude Messages + 📡 Remote Mode - {(title ?? 'Claude')} Messages {'─'.repeat(Math.min(terminalWidth - 4, 60))} @@ -199,4 +200,4 @@ export const RemoteModeDisplay: React.FC = ({ messageBuf ) -} \ No newline at end of file +} diff --git a/src/utils/BasePermissionHandler.ts b/src/utils/BasePermissionHandler.ts index 362a9ed5..3980e30e 100644 --- a/src/utils/BasePermissionHandler.ts +++ b/src/utils/BasePermissionHandler.ts @@ -47,6 +47,8 @@ export abstract class BasePermissionHandler { protected pendingRequests = new Map(); protected session: ApiSessionClient; private isResetting = false; + private messageListener: ((msg: unknown) => void) | null = null; + private messageListenerSession: ApiSessionClient | null = null; /** * Returns the log prefix for this handler. @@ -69,55 +71,97 @@ export abstract class BasePermissionHandler { this.setupRpcHandler(); } + private normalizePermissionResponse(raw: unknown): PermissionResponse | null { + if (!raw || typeof raw !== 'object') return null; + const record = raw as Record; + + const id = (record.id ?? record.permissionId) as unknown; + const approved = (record.approved ?? record.permissionApproved) as unknown; + const decision = record.decision as unknown; + + if (typeof id !== 'string') return null; + if (typeof approved !== 'boolean') return null; + + const normalized: PermissionResponse = { id, approved }; + if (decision === 'approved' || decision === 'approved_for_session' || decision === 'denied' || decision === 'abort') { + normalized.decision = decision; + } + + return normalized; + } + + private async handlePermissionResponse(raw: unknown): Promise { + const response = this.normalizePermissionResponse(raw); + if (!response) return; + + const pending = this.pendingRequests.get(response.id); + if (!pending) { + logger.debug(`${this.getLogPrefix()} Permission request not found or already resolved`, { id: response.id }); + return; + } + + // Remove from pending + this.pendingRequests.delete(response.id); + + // Resolve the permission request + const result: PermissionResult = response.approved + ? { decision: response.decision === 'approved_for_session' ? 'approved_for_session' : 'approved' } + : { decision: response.decision === 'denied' ? 'denied' : 'abort' }; + + pending.resolve(result); + + // Move request to completed in agent state + this.session.updateAgentState((currentState) => { + const request = currentState.requests?.[response.id]; + if (!request) return currentState; + + const { [response.id]: _, ...remainingRequests } = currentState.requests || {}; + + let res = { + ...currentState, + requests: remainingRequests, + completedRequests: { + ...currentState.completedRequests, + [response.id]: { + ...request, + completedAt: Date.now(), + status: response.approved ? 'approved' : 'denied', + decision: result.decision + } + } + } satisfies AgentState; + return res; + }); + + logger.debug(`${this.getLogPrefix()} Permission ${response.approved ? 'approved' : 'denied'} for ${pending.toolName}`); + } + /** * Setup RPC handler for permission responses. */ protected setupRpcHandler(): void { + // Remove previous listener when swapping sessions to avoid leaks / duplicate resolution. + if (this.messageListener && this.messageListenerSession) { + this.messageListenerSession.off('message', this.messageListener); + this.messageListener = null; + this.messageListenerSession = null; + } + this.session.rpcHandlerManager.registerHandler( 'permission', async (response) => { - const pending = this.pendingRequests.get(response.id); - if (!pending) { - logger.debug(`${this.getLogPrefix()} Permission request not found or already resolved`); - return; - } - - // Remove from pending - this.pendingRequests.delete(response.id); - - // Resolve the permission request - const result: PermissionResult = response.approved - ? { decision: response.decision === 'approved_for_session' ? 'approved_for_session' : 'approved' } - : { decision: response.decision === 'denied' ? 'denied' : 'abort' }; - - pending.resolve(result); - - // Move request to completed in agent state - this.session.updateAgentState((currentState) => { - const request = currentState.requests?.[response.id]; - if (!request) return currentState; - - const { [response.id]: _, ...remainingRequests } = currentState.requests || {}; - - let res = { - ...currentState, - requests: remainingRequests, - completedRequests: { - ...currentState.completedRequests, - [response.id]: { - ...request, - completedAt: Date.now(), - status: response.approved ? 'approved' : 'denied', - decision: result.decision - } - } - } satisfies AgentState; - return res; - }); - - logger.debug(`${this.getLogPrefix()} Permission ${response.approved ? 'approved' : 'denied'} for ${pending.toolName}`); + await this.handlePermissionResponse(response); } ); + + // Compatibility path: some clients may deliver permission responses as normal session messages + // rather than RPC calls. Handle those too to avoid deadlocking tool elicitation. + this.messageListener = async (msg: unknown) => { + // Intentionally quiet unless it looks like a permission response. + await this.handlePermissionResponse(msg); + }; + this.session.on('message', this.messageListener); + this.messageListenerSession = this.session; } /** diff --git a/src/utils/MessageQueue2.test.ts b/src/utils/MessageQueue2.test.ts index a9306325..ccf10383 100644 --- a/src/utils/MessageQueue2.test.ts +++ b/src/utils/MessageQueue2.test.ts @@ -347,6 +347,26 @@ describe('MessageQueue2', () => { expect(result?.message).toBe('immediate'); }); + it('should resolve pending waiters when reset is called', async () => { + const queue = new MessageQueue2(mode => mode); + + let resolved = false; + const waitPromise = queue.waitForMessagesAndGetAsString().then(result => { + resolved = true; + return result; + }); + + expect(resolved).toBe(false); + + queue.reset(); + + await new Promise(resolve => setTimeout(resolve, 0)); + + expect(resolved).toBe(true); + const result = await waitPromise; + expect(result).toBeNull(); + }); + it('should batch messages pushed with pushImmediate normally', async () => { const queue = new MessageQueue2<{ type: string }>((mode) => mode.type); @@ -456,4 +476,4 @@ describe('MessageQueue2', () => { expect(batch3?.message).toBe('after-isolated'); expect(batch3?.mode.type).toBe('B'); }); -}); \ No newline at end of file +}); diff --git a/src/utils/MessageQueue2.ts b/src/utils/MessageQueue2.ts index 45f254ed..ef08e2d3 100644 --- a/src/utils/MessageQueue2.ts +++ b/src/utils/MessageQueue2.ts @@ -184,8 +184,12 @@ export class MessageQueue2 { this.queue = []; this.closed = false; - // Clear waiter without calling it since we're not closing - this.waiter = null; + // Resolve any pending waiter so callers don't hang indefinitely + if (this.waiter) { + const waiter = this.waiter; + this.waiter = null; + waiter(false); + } } /** @@ -333,4 +337,4 @@ export class MessageQueue2 { logger.debug('[MessageQueue2] Waiting for messages...'); }); } -} \ No newline at end of file +} diff --git a/src/utils/__tests__/inkSanitize.test.ts b/src/utils/__tests__/inkSanitize.test.ts new file mode 100644 index 00000000..d695f3a4 --- /dev/null +++ b/src/utils/__tests__/inkSanitize.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it } from 'vitest'; + +import { sanitizeInkText, truncateInkText } from '../inkSanitize'; + +describe('sanitizeInkText', () => { + it('strips ANSI/control/bidi/zero-width and normalizes whitespace', () => { + const raw = [ + 'Hello', + '\u001b[31mRED\u001b[0m', + '\u202Eevil\u202C', + '\u200B', + '\u0000', + '\t', + 'world', + ].join(' '); + + expect(sanitizeInkText(raw)).toBe('Hello RED evil world'); + }); +}); + +describe('truncateInkText', () => { + it('truncates long strings with ellipsis', () => { + expect(truncateInkText('1234567890', 5)).toBe('12...'); + }); + + it('handles very small max values', () => { + expect(truncateInkText('abcdef', 2)).toBe('ab'); + }); +}); diff --git a/src/utils/inkSanitize.ts b/src/utils/inkSanitize.ts new file mode 100644 index 00000000..06c2e9e9 --- /dev/null +++ b/src/utils/inkSanitize.ts @@ -0,0 +1,21 @@ +const ANSI_ESCAPE_REGEX = /\u001B\[[0-?]*[ -/]*[@-~]|\u001B\][^\u0007]*(?:\u0007|\u001B\\)/g; +const CONTROL_REGEX = /[\u0000-\u001F\u007F-\u009F]/g; +const BIDI_REGEX = /[\u200E\u200F\u061C\u202A-\u202E\u2066-\u2069]/g; +const ZERO_WIDTH_REGEX = /[\u200B-\u200D\uFEFF]/g; + +export function sanitizeInkText(input: string | undefined): string { + if (!input) return ''; + const stripped = input + .replace(ANSI_ESCAPE_REGEX, '') + .replace(CONTROL_REGEX, '') + .replace(BIDI_REGEX, '') + .replace(ZERO_WIDTH_REGEX, ''); + return stripped.replace(/\s+/g, ' ').trim(); +} + +export function truncateInkText(text: string, max: number): string { + if (!text) return ''; + if (text.length <= max) return text; + if (max <= 3) return text.slice(0, max); + return `${text.slice(0, Math.max(0, max - 3))}...`; +}