Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/happy-cli/src/claude/claudeRemoteLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { EnhancedMode } from "./loop";
import { RawJSONLines } from "@/claude/types";
import { OutgoingMessageQueue } from "./utils/OutgoingMessageQueue";
import { getToolName } from "./utils/getToolName";
import { createNonBlockingStdout } from "@/utils/nonBlockingStdout";

interface PermissionsField {
date: number;
Expand All @@ -36,6 +37,9 @@ export async function claudeRemoteLauncher(session: Session): Promise<'switch' |

if (hasTTY) {
console.clear();
// Use non-blocking stdout wrapper to prevent event loop blocking
// when tmux detaches and the PTY buffer fills up
const inkStdout = createNonBlockingStdout();
inkInstance = render(React.createElement(RemoteModeDisplay, {
messageBuffer,
logPath: process.env.DEBUG ? session.logPath : undefined,
Expand All @@ -54,7 +58,8 @@ export async function claudeRemoteLauncher(session: Session): Promise<'switch' |
}
}), {
exitOnCtrlC: false,
patchConsole: false
patchConsole: false,
stdout: inkStdout
});
}

Expand Down
5 changes: 4 additions & 1 deletion packages/happy-cli/src/codex/runCodex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { notifyDaemonSessionStarted } from "@/daemon/controlClient";
import { registerKillSessionHandler } from "@/claude/registerKillSessionHandler";
import { delay } from "@/utils/time";
import { stopCaffeinate } from "@/utils/caffeinate";
import { createNonBlockingStdout } from "@/utils/nonBlockingStdout";
import { connectionState } from '@/utils/serverConnectionErrors';
import { setupOfflineReconnection } from '@/utils/setupOfflineReconnection';
import type { ApiSessionClient } from '@/api/apiSession';
Expand Down Expand Up @@ -325,6 +326,7 @@ export async function runCodex(opts: {

if (hasTTY) {
console.clear();
const inkStdout = createNonBlockingStdout();
inkInstance = render(React.createElement(CodexDisplay, {
messageBuffer,
logPath: process.env.DEBUG ? logger.getLogPath() : undefined,
Expand All @@ -336,7 +338,8 @@ export async function runCodex(opts: {
}
}), {
exitOnCtrlC: false,
patchConsole: false
patchConsole: false,
stdout: inkStdout
});
}

Expand Down
5 changes: 4 additions & 1 deletion packages/happy-cli/src/gemini/runGemini.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { MessageBuffer } from '@/ui/ink/messageBuffer';
import { notifyDaemonSessionStarted } from '@/daemon/controlClient';
import { registerKillSessionHandler } from '@/claude/registerKillSessionHandler';
import { stopCaffeinate } from '@/utils/caffeinate';
import { createNonBlockingStdout } from '@/utils/nonBlockingStdout';
import { connectionState } from '@/utils/serverConnectionErrors';
import { setupOfflineReconnection } from '@/utils/setupOfflineReconnection';
import type { ApiSessionClient } from '@/api/apiSession';
Expand Down Expand Up @@ -468,9 +469,11 @@ export async function runGemini(opts: {
});
};

const inkStdout = createNonBlockingStdout();
inkInstance = render(React.createElement(DisplayComponent), {
exitOnCtrlC: false,
patchConsole: false
patchConsole: false,
stdout: inkStdout
});

// Send initial model to UI so it displays correctly from start
Expand Down
221 changes: 221 additions & 0 deletions packages/happy-cli/src/utils/nonBlockingStdout.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'
import { Writable } from 'node:stream'

// Mock the logger before importing the module under test
vi.mock('@/ui/logger', () => ({
logger: {
debug: vi.fn(),
}
}))

import { createNonBlockingStdout } from './nonBlockingStdout'

describe('createNonBlockingStdout', () => {
let originalWrite: typeof process.stdout.write
let originalWritableNeedDrain: boolean
let originalColumns: number | undefined
let originalRows: number | undefined
let writtenChunks: string[]
let mockWriteReturn: boolean

beforeEach(() => {
writtenChunks = []
mockWriteReturn = true

// Save originals
originalWrite = process.stdout.write
originalWritableNeedDrain = process.stdout.writableNeedDrain
originalColumns = process.stdout.columns
originalRows = process.stdout.rows

// Mock process.stdout.write
process.stdout.write = vi.fn((...args: any[]) => {
const chunk = args[0]
writtenChunks.push(typeof chunk === 'string' ? chunk : chunk.toString())
return mockWriteReturn
}) as any

// Make writableNeedDrain configurable
Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: false,
writable: true,
configurable: true,
})

// Mock columns and rows (not available in test env without a real TTY)
Object.defineProperty(process.stdout, 'columns', {
value: 120,
writable: true,
configurable: true,
})
Object.defineProperty(process.stdout, 'rows', {
value: 40,
writable: true,
configurable: true,
})
})

afterEach(() => {
process.stdout.write = originalWrite
Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: originalWritableNeedDrain,
configurable: true,
})
Object.defineProperty(process.stdout, 'columns', {
value: originalColumns,
configurable: true,
})
Object.defineProperty(process.stdout, 'rows', {
value: originalRows,
configurable: true,
})
})

it('should forward writes when no backpressure', () => {
const stream = createNonBlockingStdout()
stream.write('hello')

expect(writtenChunks).toEqual(['hello'])
})

it('should drop writes when writableNeedDrain is true', () => {
const stream = createNonBlockingStdout()

// Simulate backpressure
Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: true,
writable: true,
configurable: true,
})

const result = stream.write('should be dropped')

expect(writtenChunks).toEqual([])
expect(result).toBe(true) // Returns true so Ink doesn't queue
})

it('should call callback even when dropping writes', () => {
const stream = createNonBlockingStdout()
const cb = vi.fn()

Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: true,
writable: true,
configurable: true,
})

stream.write('dropped', cb)

expect(cb).toHaveBeenCalled()
expect(writtenChunks).toEqual([])
})

it('should proxy columns from process.stdout', () => {
const stream = createNonBlockingStdout()

expect(stream.columns).toBe(120)
})

it('should proxy rows from process.stdout', () => {
const stream = createNonBlockingStdout()

expect(stream.rows).toBe(40)
})

it('should proxy isTTY from process.stdout', () => {
const stream = createNonBlockingStdout()

expect(stream.isTTY).toBe(process.stdout.isTTY)
})

it('should resume writes after backpressure clears', () => {
const stream = createNonBlockingStdout()

// Write normally
stream.write('first')
expect(writtenChunks).toEqual(['first'])

// Enter backpressure
Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: true,
writable: true,
configurable: true,
})

stream.write('dropped')
expect(writtenChunks).toEqual(['first'])

// Clear backpressure
Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: false,
writable: true,
configurable: true,
})

stream.write('resumed')
expect(writtenChunks).toEqual(['first', 'resumed'])
})

it('should handle write with encoding parameter', () => {
const stream = createNonBlockingStdout()
stream.write('hello', 'utf8')

expect(writtenChunks).toEqual(['hello'])
})

it('should drop multiple writes during backpressure', () => {
const stream = createNonBlockingStdout()

Object.defineProperty(process.stdout, 'writableNeedDrain', {
value: true,
writable: true,
configurable: true,
})

stream.write('a')
stream.write('b')
stream.write('c')

expect(writtenChunks).toEqual([])
})

it('should enter dropping mode when write returns false and resume on drain', () => {
const stream = createNonBlockingStdout()
mockWriteReturn = false

stream.write('first')
expect(writtenChunks).toEqual(['first'])

// Simulate drain event — should reset dropping state
process.stdout.emit('drain')

mockWriteReturn = true
stream.write('after-drain')
expect(writtenChunks).toEqual(['first', 'after-drain'])
})

it('should not accumulate drain listeners on repeated write-returns-false', () => {
const stream = createNonBlockingStdout()
const onceSpy = vi.spyOn(process.stdout, 'once')
mockWriteReturn = false

stream.write('a')
stream.write('b')
stream.write('c')

const drainCalls = onceSpy.mock.calls.filter(([event]) => event === 'drain')
expect(drainCalls).toHaveLength(1)

onceSpy.mockRestore()
})

it('should handle callback as second argument on normal path', () => {
const stream = createNonBlockingStdout()
const cb = vi.fn()

stream.write('hello', cb)

expect(writtenChunks).toEqual(['hello'])
expect(process.stdout.write).toHaveBeenCalledWith('hello', cb)
})
})
78 changes: 78 additions & 0 deletions packages/happy-cli/src/utils/nonBlockingStdout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Non-blocking stdout wrapper for Ink rendering.
*
* When tmux detaches, the PTY buffer fills up and process.stdout.write()
* blocks synchronously, freezing the Node.js event loop. This wrapper
* drops writes when backpressure is detected instead of blocking.
*
* Used by Ink's render() stdout option to prevent multiple Happy instances
* from stalling in detached tmux sessions.
*
* See: https://github.com/slopus/happy/issues/533
*/

import { WriteStream } from 'node:tty';
import { logger } from '@/ui/logger';

export function createNonBlockingStdout(): WriteStream {
let dropping = false;
let droppedWrites = 0;

// Create a proxy around process.stdout that intercepts write()
// to drop data when backpressure is detected (e.g. tmux detached).
// All other properties/methods delegate to the real stdout.
const proxy = new Proxy(process.stdout, {
get(target, prop, receiver) {
if (prop === 'write') {
return function write(chunk: any, encodingOrCallback?: any, callback?: any) {
// Resolve overloaded arguments
let encoding: BufferEncoding | undefined;
let cb: ((err?: Error | null) => void) | undefined;
if (typeof encodingOrCallback === 'function') {
cb = encodingOrCallback;
} else {
encoding = encodingOrCallback;
cb = callback;
}

// If stdout already has backpressure, drop the write.
// Returns true so Ink doesn't apply its own backpressure handling.
if (target.writableNeedDrain) {
if (!dropping) {
dropping = true;
logger.debug('[nonBlockingStdout] Backpressure detected, dropping writes (tmux likely detached)');
}
droppedWrites++;
cb?.();
return true;
}

const ok = encoding
? target.write(chunk, encoding, cb)
: target.write(chunk, cb);

if (!ok) {
if (!dropping) {
dropping = true;
logger.debug('[nonBlockingStdout] Write returned false, will drop until drain');
target.once('drain', () => {
logger.debug(`[nonBlockingStdout] Drain received, resuming writes (dropped ${droppedWrites} writes while detached)`);
dropping = false;
droppedWrites = 0;
});
}
} else if (dropping) {
logger.debug(`[nonBlockingStdout] Writes resumed (dropped ${droppedWrites} writes while detached)`);
dropping = false;
droppedWrites = 0;
}

return ok;
};
}
return Reflect.get(target, prop, receiver);
}
});

return proxy;
}