Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ export class AcpClient {

const input = Writable.toWeb(child.stdin);
const output = Readable.toWeb(child.stdout) as ReadableStream<Uint8Array>;
const stream = this.createTappedStream(ndJsonStream(input, output));
const filteredOutput = this.filterAcpOutputStream(output);
const stream = this.createTappedStream(ndJsonStream(input, filteredOutput));

const connection = new ClientSideConnection(
() => ({
Expand Down Expand Up @@ -1066,6 +1067,19 @@ export class AcpClient {
return this.suppressReplaySessionUpdateMessages && isSessionUpdateNotification(message);
};

const isValidAcpMessage = (value: AnyMessage): boolean => {
if (!value || typeof value !== "object") {
return false;
}
return (
"jsonrpc" in value ||
"method" in value ||
"id" in value ||
"result" in value ||
"error" in value
);
};

const readable = new ReadableStream<AnyMessage>({
async start(controller) {
const reader = base.readable.getReader();
Expand All @@ -1078,6 +1092,9 @@ export class AcpClient {
if (!value) {
continue;
}
if (!isValidAcpMessage(value)) {
continue;
}
if (!shouldSuppressInboundReplaySessionUpdate(value)) {
onAcpOutputMessage()?.("inbound", value);
onAcpMessage()?.("inbound", value);
Expand Down Expand Up @@ -1674,4 +1691,77 @@ export class AcpClient {

throw new Error(`Timed out waiting for session replay drain after ${normalizedTimeoutMs}ms`);
}

private filterAcpOutputStream(output: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> {
const textDecoder = new TextDecoder();
const textEncoder = new TextEncoder();
let buffer = "";
let nonJsonLineCount = 0;
const maxNonJsonLines = 10;

return new ReadableStream<Uint8Array>({
async start(controller) {
const reader = output.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
// Flush any remaining buffered content at EOF
if (buffer.trim().length > 0) {
try {
JSON.parse(buffer.trim());
controller.enqueue(textEncoder.encode(buffer + "\n"));
} catch {
nonJsonLineCount += 1;
if (nonJsonLineCount > maxNonJsonLines) {
throw new Error(
`Agent stdout exceeded ${maxNonJsonLines} non-JSON lines without completing ACP handshake. ` +
"This indicates the adapter does not support stdio ACP mode.",
);
}
}
}
break;
}
if (!value) {
continue;
}

buffer += textDecoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";

for (const line of lines) {
const trimmedLine = line.trim();
if (!trimmedLine) {
continue;
}

// Try to parse as JSON to check if it's a valid ACP message
try {
JSON.parse(trimmedLine);
// If it parses successfully, it's valid JSON - pass it through
const outputLine = trimmedLine + "\n";
controller.enqueue(textEncoder.encode(outputLine));
nonJsonLineCount = 0;
} catch {
// If it fails to parse, it's likely a log message - skip it silently
// This prevents "[iFlow ACP Agent] ..." messages from causing parse errors
nonJsonLineCount += 1;
if (nonJsonLineCount > maxNonJsonLines) {
throw new Error(
`Agent stdout exceeded ${maxNonJsonLines} non-JSON lines without completing ACP handshake. ` +
"This indicates the adapter does not support stdio ACP mode.",
);
}
}
}
}
} finally {
reader.releaseLock();
controller.close();
}
},
});
}
}