From 6a3e70f43457eb5171f43766b86893b96df12828 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Sun, 5 Apr 2026 15:24:34 +0200 Subject: [PATCH 1/2] fix --- async/unstable_channel.ts | 251 ++++++++++++++++++++++++++++----- async/unstable_channel_test.ts | 180 +++++++++++++++++++++-- 2 files changed, 387 insertions(+), 44 deletions(-) diff --git a/async/unstable_channel.ts b/async/unstable_channel.ts index b82837a685bd..48e1d445ee17 100644 --- a/async/unstable_channel.ts +++ b/async/unstable_channel.ts @@ -3,17 +3,21 @@ import { Deque } from "@std/data-structures/unstable-deque"; +const RESOLVED: Promise = Promise.resolve(); + /** Internal node for the FIFO sender waiting queue. */ interface SenderNode { value: T; res: () => void; rej: (reason: unknown) => void; + cancelled: boolean; } /** Internal node for the FIFO receiver waiting queue. */ interface ReceiverNode { res: (value: T) => void; rej: (reason: unknown) => void; + cancelled: boolean; } /** @@ -80,6 +84,37 @@ export class ChannelClosedError extends Error { } } +/** + * Result of a non-blocking {@linkcode Channel.tryReceive} call. Discriminate + * on the `state` field: + * + * - `"ok"` — a value was available and is provided in `value`. + * - `"empty"` — the channel is open but no value is immediately available. + * - `"closed"` — the channel has been closed and no buffered values remain. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * @typeParam T The type of the value received from the channel. + */ +export type ChannelReceiveResult = + | { state: "ok"; value: T } + | { state: "empty" } + | { state: "closed" }; + +/** + * Options for blocking {@linkcode Channel} operations. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + */ +export interface ChannelOptions { + /** + * An {@linkcode AbortSignal} to cancel a pending `send` or `receive`. + * When the signal is aborted, the operation rejects with the signal's + * {@linkcode AbortSignal.reason}. + */ + signal?: AbortSignal; +} + /** * An async channel for communicating between concurrent tasks with optional * bounded buffering and backpressure. @@ -166,26 +201,60 @@ export class Channel * ch.close(); * ``` * + * @example Cancelling with an AbortSignal + * ```ts + * import { Channel } from "@std/async/unstable-channel"; + * import { assertRejects } from "@std/assert"; + * + * const ch = new Channel(); + * const controller = new AbortController(); + * const p = ch.send(42, { signal: controller.signal }); + * controller.abort(new Error("cancelled")); + * await assertRejects(() => p, Error, "cancelled"); + * ``` + * * @param value The value to send into the channel. + * @param options Optional settings for the send operation. * @throws {ChannelClosedError} If the channel is closed. The error's * `value` property carries the unsent value for recovery. */ - send(value: T): Promise { + send(value: T, options?: ChannelOptions): Promise { if (this.#closed) { return Promise.reject( new ChannelClosedError("Cannot send to a closed channel", value), ); } - if (this.#deliverToReceiver(value)) return Promise.resolve(); + if (this.#deliverToReceiver(value)) return RESOLVED; if (this.#buffer.length < this.#capacity) { this.#buffer.pushBack(value); - return Promise.resolve(); + return RESOLVED; + } + + if (options?.signal?.aborted) { + return Promise.reject(options.signal.reason); } return new Promise((res, rej) => { - this.#senders.pushBack({ value, res, rej }); + const node: SenderNode = { value, res, rej, cancelled: false }; + this.#senders.pushBack(node); + const signal = options?.signal; + if (signal) { + const onAbort = () => { + node.cancelled = true; + rej(signal.reason); + }; + signal.addEventListener("abort", onAbort, { once: true }); + node.res = () => { + signal.removeEventListener("abort", onAbort); + res(); + }; + node.rej = (reason: unknown) => { + signal.removeEventListener("abort", onAbort); + rej(reason); + }; + } }); } @@ -205,18 +274,58 @@ export class Channel * ch.close(); * ``` * + * @example Cancelling with an AbortSignal + * ```ts + * import { Channel } from "@std/async/unstable-channel"; + * import { assertRejects } from "@std/assert"; + * + * const ch = new Channel(); + * const controller = new AbortController(); + * const p = ch.receive({ signal: controller.signal }); + * controller.abort(new Error("cancelled")); + * await assertRejects(() => p, Error, "cancelled"); + * ``` + * + * @param options Optional settings for the receive operation. * @returns A promise that resolves with the next value from the channel. * @throws {ChannelClosedError} If the channel is closed and empty (no * `value` property). If `close(reason)` was called, rejects with * `reason` instead. */ - receive(): Promise { + receive(options?: ChannelOptions): Promise { if (this.#buffer.length > 0) return Promise.resolve(this.#dequeue()); - if (!this.#senders.isEmpty()) return Promise.resolve(this.#takeSender()); + + const sender = this.#nextSender(); + if (sender) { + sender.res(); + return Promise.resolve(sender.value); + } + if (this.#closed) return Promise.reject(this.#receiveError()); + if (options?.signal?.aborted) { + return Promise.reject(options.signal.reason); + } + return new Promise((res, rej) => { - this.#receivers.pushBack({ res, rej }); + const node: ReceiverNode = { res, rej, cancelled: false }; + this.#receivers.pushBack(node); + const signal = options?.signal; + if (signal) { + const onAbort = () => { + node.cancelled = true; + rej(signal.reason); + }; + signal.addEventListener("abort", onAbort, { once: true }); + node.res = (value: T) => { + signal.removeEventListener("abort", onAbort); + res(value); + }; + node.rej = (reason: unknown) => { + signal.removeEventListener("abort", onAbort); + rej(reason); + }; + } }); } @@ -248,13 +357,13 @@ export class Channel } /** - * Non-blocking receive. The discriminated union avoids ambiguity when `T` - * itself can be `undefined`. + * Non-blocking receive. Discriminate on the `state` field to determine the + * outcome without ambiguity, even when `T` itself can be `undefined`. * - * @returns `{ ok: true, value }` if a value was available, or - * `{ ok: false }` if the buffer is empty or the channel is closed. A - * close reason passed to {@linkcode Channel.close} is not surfaced here; - * use {@linkcode Channel.receive} to observe it. + * @returns A {@linkcode ChannelReceiveResult} — `{ state: "ok", value }` + * if a value was available, `{ state: "empty" }` if the channel is open + * but no value is ready, or `{ state: "closed" }` if the channel has + * been closed and no buffered values remain. * * @example Usage * ```ts @@ -263,16 +372,23 @@ export class Channel * * const ch = new Channel(1); * await ch.send(42); - * assertEquals(ch.tryReceive(), { ok: true, value: 42 }); - * assertEquals(ch.tryReceive(), { ok: false }); + * assertEquals(ch.tryReceive(), { state: "ok", value: 42 }); + * assertEquals(ch.tryReceive(), { state: "empty" }); + * ch.close(); + * assertEquals(ch.tryReceive(), { state: "closed" }); * ``` */ - tryReceive(): { ok: true; value: T } | { ok: false } { - if (this.#buffer.length > 0) return { ok: true, value: this.#dequeue() }; - if (!this.#senders.isEmpty()) { - return { ok: true, value: this.#takeSender() }; + tryReceive(): ChannelReceiveResult { + if (this.#buffer.length > 0) { + return { state: "ok", value: this.#dequeue() }; + } + const sender = this.#nextSender(); + if (sender) { + sender.res(); + return { state: "ok", value: sender.value }; } - return { ok: false }; + if (this.#closed) return { state: "closed" }; + return { state: "empty" }; } /** @@ -290,11 +406,26 @@ export class Channel * ch.close(); * assert(ch.closed); * ``` + */ + close(): void; + /** + * Closes the channel with a reason. All pending and future `receive()` + * calls reject with `reason` after draining buffered values. Pending + * `send()` calls reject with {@linkcode ChannelClosedError}. Idempotent. * - * @param args If a reason argument is provided, all pending and future - * `receive()` calls reject with that reason (after draining buffered - * values). Enables error propagation from producer to consumer. + * @example Usage + * ```ts + * import { Channel } from "@std/async/unstable-channel"; + * import { assertRejects } from "@std/assert"; + * + * const ch = new Channel(); + * ch.close(new Error("upstream failure")); + * await assertRejects(() => ch.receive(), Error, "upstream failure"); + * ``` + * + * @param reason The reason to reject pending and future receivers with. */ + close(reason: unknown): void; close(...args: [reason: unknown] | []): void { if (this.#closed) return; this.#closed = true; @@ -305,6 +436,7 @@ export class Channel let sender: SenderNode | undefined; while ((sender = this.#senders.popFront()) !== undefined) { + if (sender.cancelled) continue; sender.rej( new ChannelClosedError( "Cannot send to a closed channel", @@ -315,6 +447,7 @@ export class Channel let receiver: ReceiverNode | undefined; while ((receiver = this.#receivers.popFront()) !== undefined) { + if (receiver.cancelled) continue; receiver.rej(this.#receiveError()); } } @@ -415,6 +548,47 @@ export class Channel } } + /** + * Creates a {@linkcode ReadableStream} that yields values from this + * channel. The stream closes when the channel closes after draining + * buffered values. If the channel was closed with a reason, the stream + * errors with that reason. Cancelling the stream closes the channel. + * + * @example Usage + * ```ts + * import { Channel } from "@std/async/unstable-channel"; + * import { assertEquals } from "@std/assert"; + * + * const ch = new Channel(4); + * await ch.send(1); + * await ch.send(2); + * ch.close(); + * + * const values = await Array.fromAsync(ch.toReadableStream()); + * assertEquals(values, [1, 2]); + * ``` + * + * @returns A readable stream of channel values. + */ + toReadableStream(): ReadableStream { + return new ReadableStream({ + pull: async (controller) => { + try { + controller.enqueue(await this.receive()); + } catch (e) { + if (e instanceof ChannelClosedError && !this.#hasCloseReason) { + controller.close(); + } else { + controller.error(e); + } + } + }, + cancel: () => { + this.close(); + }, + }); + } + /** * Calls {@linkcode Channel.close}. Enables `using` for automatic cleanup. * @@ -448,12 +622,30 @@ export class Channel */ [Symbol.asyncDispose](): Promise { this.close(); - return Promise.resolve(); + return RESOLVED; + } + + /** Pops the next non-cancelled sender from the queue. */ + #nextSender(): SenderNode | undefined { + let sender: SenderNode | undefined; + while ((sender = this.#senders.popFront()) !== undefined) { + if (!sender.cancelled) return sender; + } + return undefined; + } + + /** Pops the next non-cancelled receiver from the queue. */ + #nextReceiver(): ReceiverNode | undefined { + let receiver: ReceiverNode | undefined; + while ((receiver = this.#receivers.popFront()) !== undefined) { + if (!receiver.cancelled) return receiver; + } + return undefined; } /** Hands `value` to the next waiting receiver, if any. */ #deliverToReceiver(value: T): boolean { - const receiver = this.#receivers.popFront(); + const receiver = this.#nextReceiver(); if (!receiver) return false; receiver.res(value); return true; @@ -465,7 +657,7 @@ export class Channel */ #dequeue(): T { const value = this.#buffer.popFront()!; - const sender = this.#senders.popFront(); + const sender = this.#nextSender(); if (sender) { this.#buffer.pushBack(sender.value); sender.res(); @@ -473,13 +665,6 @@ export class Channel return value; } - /** Takes a value directly from the head of the sender queue (unbuffered path). */ - #takeSender(): T { - const sender = this.#senders.popFront()!; - sender.res(); - return sender.value; - } - #receiveError(): unknown { if (this.#hasCloseReason) return this.#closeReason; return new ChannelClosedError("Cannot receive from a closed channel"); diff --git a/async/unstable_channel_test.ts b/async/unstable_channel_test.ts index 0b74b86a0316..24aa56671264 100644 --- a/async/unstable_channel_test.ts +++ b/async/unstable_channel_test.ts @@ -270,29 +270,38 @@ Deno.test("Channel.trySend() delivers to waiting receiver", async () => { Deno.test("Channel.tryReceive() returns value when buffered", async () => { const ch = new Channel(2); await ch.send(1); - const result = ch.tryReceive(); - assertEquals(result, { ok: true, value: 1 }); + assertEquals(ch.tryReceive(), { state: "ok", value: 1 }); }); -Deno.test("Channel.tryReceive() returns ok:false when empty or closed", () => { - const empty = new Channel(2); - assertEquals(empty.tryReceive(), { ok: false }); +Deno.test("Channel.tryReceive() returns empty when no value is available", () => { + const ch = new Channel(2); + assertEquals(ch.tryReceive(), { state: "empty" }); +}); - const closed = new Channel(); - closed.close(); - assertEquals(closed.tryReceive(), { ok: false }); +Deno.test("Channel.tryReceive() returns closed on closed empty channel", () => { + const ch = new Channel(); + ch.close(); + assertEquals(ch.tryReceive(), { state: "closed" }); +}); + +Deno.test("Channel.tryReceive() drains buffer before reporting closed", async () => { + const ch = new Channel(2); + await ch.send(1); + ch.close(); + assertEquals(ch.tryReceive(), { state: "ok", value: 1 }); + assertEquals(ch.tryReceive(), { state: "closed" }); }); Deno.test("Channel.tryReceive() handles undefined as a valid value", async () => { const ch = new Channel(1); await ch.send(undefined); - assertEquals(ch.tryReceive(), { ok: true, value: undefined }); + assertEquals(ch.tryReceive(), { state: "ok", value: undefined }); }); Deno.test("Channel.tryReceive() drains from waiting sender on unbuffered channel", async () => { const ch = new Channel(); const sendPromise = ch.send(7); - assertEquals(ch.tryReceive(), { ok: true, value: 7 }); + assertEquals(ch.tryReceive(), { state: "ok", value: 7 }); await sendPromise; }); @@ -300,12 +309,109 @@ Deno.test("Channel.tryReceive() promotes blocked sender into buffer", async () = const ch = new Channel(1); await ch.send(1); const p = ch.send(2); - assertEquals(ch.tryReceive(), { ok: true, value: 1 }); + assertEquals(ch.tryReceive(), { state: "ok", value: 1 }); assertEquals(ch.size, 1); await p; assertEquals(await ch.receive(), 2); }); +// -- AbortSignal -- + +Deno.test("Channel.send() rejects when signal is already aborted", async () => { + const ch = new Channel(); + await assertRejects( + () => ch.send(42, { signal: AbortSignal.abort("stopped") }), + ); + assertFalse(ch.closed); +}); + +Deno.test("Channel.send() rejects when signal aborts while waiting", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.send(42, { signal: controller.signal }); + controller.abort(new Error("cancelled")); + await assertRejects(() => p, Error, "cancelled"); +}); + +Deno.test("Channel.send() ignores signal on immediate delivery", async () => { + const ch = new Channel(1); + const controller = new AbortController(); + await ch.send(42, { signal: controller.signal }); + assertEquals(ch.size, 1); + controller.abort(); +}); + +Deno.test("Channel.send() delivers to receiver even with signal attached", async () => { + const ch = new Channel(); + const recvP = ch.receive(); + const controller = new AbortController(); + await ch.send(42, { signal: controller.signal }); + assertEquals(await recvP, 42); +}); + +Deno.test("Channel.receive() rejects when signal is already aborted", async () => { + const ch = new Channel(); + await assertRejects( + () => ch.receive({ signal: AbortSignal.abort("stopped") }), + ); + assertFalse(ch.closed); +}); + +Deno.test("Channel.receive() rejects when signal aborts while waiting", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.receive({ signal: controller.signal }); + controller.abort(new Error("cancelled")); + await assertRejects(() => p, Error, "cancelled"); +}); + +Deno.test("Channel.receive() ignores signal on immediate delivery", async () => { + const ch = new Channel(1); + await ch.send(42); + const controller = new AbortController(); + assertEquals(await ch.receive({ signal: controller.signal }), 42); + controller.abort(); +}); + +Deno.test("Channel.close() rejects signal-attached sender with ChannelClosedError", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.send(7, { signal: controller.signal }); + ch.close(); + const err = await assertRejects(() => p, ChannelClosedError); + assertEquals(err.value, 7); +}); + +Deno.test("Channel.close() rejects signal-attached receiver", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.receive({ signal: controller.signal }); + ch.close(); + await assertRejects(() => p, ChannelClosedError); +}); + +Deno.test("Channel.send() skips cancelled sender during delivery", async () => { + const ch = new Channel(); + const c1 = new AbortController(); + const p1 = ch.send(1, { signal: c1.signal }); + const p2 = ch.send(2); + c1.abort(new Error("abort1")); + await assertRejects(() => p1, Error, "abort1"); + assertEquals(await ch.receive(), 2); + await p2; +}); + +Deno.test("Channel.receive() skips cancelled receiver during delivery", async () => { + const ch = new Channel(); + const c1 = new AbortController(); + const p1 = ch.receive({ signal: c1.signal }); + const p2 = ch.receive(); + c1.abort(new Error("abort1")); + await assertRejects(() => p1, Error, "abort1"); + await ch.send(42); + assertEquals(await p2, 42); +}); + // -- Async iteration -- Deno.test("Channel async iteration drains until closed", async () => { @@ -375,6 +481,58 @@ Deno.test("Channel async iteration works with concurrent producer", async () => assertEquals(values, [0, 1, 2, 3, 4]); }); +// -- toReadableStream -- + +Deno.test("Channel.toReadableStream() yields buffered values then closes", async () => { + const ch = new Channel(4); + await ch.send(1); + await ch.send(2); + ch.close(); + const values = await Array.fromAsync(ch.toReadableStream()); + assertEquals(values, [1, 2]); +}); + +Deno.test("Channel.toReadableStream() works with concurrent producer", async () => { + const ch = new Channel(2); + + (async () => { + for (let i = 0; i < 5; i++) { + await ch.send(i); + } + ch.close(); + })(); + + const values = await Array.fromAsync(ch.toReadableStream()); + assertEquals(values, [0, 1, 2, 3, 4]); +}); + +Deno.test("Channel.toReadableStream() errors on close with reason", async () => { + const ch = new Channel(2); + await ch.send(1); + const reason = new Error("fail"); + ch.close(reason); + + const values: number[] = []; + let caught: unknown; + try { + for await (const v of ch.toReadableStream()) { + values.push(v); + } + } catch (e) { + caught = e; + } + assertEquals(values, [1]); + assertInstanceOf(caught, Error); + assertEquals((caught as Error).message, "fail"); +}); + +Deno.test("Channel.toReadableStream() cancel closes the channel", async () => { + const ch = new Channel(2); + const stream = ch.toReadableStream(); + await stream.cancel(); + assert(ch.closed); +}); + // -- Disposable -- Deno.test("Channel[Symbol.dispose]() closes the channel", () => { From a3761201c1163bf4db9c969de6ad6c41885b8477 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Sun, 5 Apr 2026 16:25:34 +0200 Subject: [PATCH 2/2] coverage --- async/unstable_channel_test.ts | 41 ++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/async/unstable_channel_test.ts b/async/unstable_channel_test.ts index 24aa56671264..a9771192707c 100644 --- a/async/unstable_channel_test.ts +++ b/async/unstable_channel_test.ts @@ -412,6 +412,47 @@ Deno.test("Channel.receive() skips cancelled receiver during delivery", async () assertEquals(await p2, 42); }); +Deno.test("Channel.send() resolves signal-attached sender when receiver arrives", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.send(42, { signal: controller.signal }); + assertEquals(await ch.receive(), 42); + await p; + assertFalse(ch.closed); +}); + +Deno.test("Channel.receive() resolves signal-attached receiver when sender arrives", async () => { + const ch = new Channel(); + const controller = new AbortController(); + const p = ch.receive({ signal: controller.signal }); + await ch.send(99); + assertEquals(await p, 99); + assertFalse(ch.closed); +}); + +Deno.test("Channel.close() skips cancelled sender in drain loop", async () => { + const ch = new Channel(); + const c1 = new AbortController(); + const p1 = ch.send(1, { signal: c1.signal }); + const p2 = ch.send(2); + c1.abort(new Error("abort-before-close")); + await assertRejects(() => p1, Error, "abort-before-close"); + ch.close(); + const err = await assertRejects(() => p2, ChannelClosedError); + assertEquals(err.value, 2); +}); + +Deno.test("Channel.close() skips cancelled receiver in drain loop", async () => { + const ch = new Channel(); + const c1 = new AbortController(); + const p1 = ch.receive({ signal: c1.signal }); + const p2 = ch.receive(); + c1.abort(new Error("abort-before-close")); + await assertRejects(() => p1, Error, "abort-before-close"); + ch.close(); + await assertRejects(() => p2, ChannelClosedError); +}); + // -- Async iteration -- Deno.test("Channel async iteration drains until closed", async () => {