Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 218 additions & 33 deletions async/unstable_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

import { Deque } from "@std/data-structures/unstable-deque";

const RESOLVED: Promise<void> = Promise.resolve();

/** Internal node for the FIFO sender waiting queue. */
interface SenderNode<T> {
value: T;
res: () => void;
rej: (reason: unknown) => void;
cancelled: boolean;
}

/** Internal node for the FIFO receiver waiting queue. */
interface ReceiverNode<T> {
res: (value: T) => void;
rej: (reason: unknown) => void;
cancelled: boolean;
}

/**
Expand Down Expand Up @@ -80,6 +84,37 @@ export class ChannelClosedError extends Error {
}
}

/**
* Result of a non-blocking {@linkcode Channel.tryReceive} call. Discriminate
* on the `state` field:
*
* - `"ok"` — a value was available and is provided in `value`.
* - `"empty"` — the channel is open but no value is immediately available.
* - `"closed"` — the channel has been closed and no buffered values remain.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
* @typeParam T The type of the value received from the channel.
*/
export type ChannelReceiveResult<T> =
| { state: "ok"; value: T }
| { state: "empty" }
| { state: "closed" };

/**
* Options for blocking {@linkcode Channel} operations.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*/
export interface ChannelOptions {
/**
* An {@linkcode AbortSignal} to cancel a pending `send` or `receive`.
* When the signal is aborted, the operation rejects with the signal's
* {@linkcode AbortSignal.reason}.
*/
signal?: AbortSignal;
}

/**
* An async channel for communicating between concurrent tasks with optional
* bounded buffering and backpressure.
Expand Down Expand Up @@ -166,26 +201,60 @@ export class Channel<T>
* ch.close();
* ```
*
* @example Cancelling with an AbortSignal
* ```ts
* import { Channel } from "@std/async/unstable-channel";
* import { assertRejects } from "@std/assert";
*
* const ch = new Channel<number>();
* const controller = new AbortController();
* const p = ch.send(42, { signal: controller.signal });
* controller.abort(new Error("cancelled"));
* await assertRejects(() => p, Error, "cancelled");
* ```
*
* @param value The value to send into the channel.
* @param options Optional settings for the send operation.
* @throws {ChannelClosedError} If the channel is closed. The error's
* `value` property carries the unsent value for recovery.
*/
send(value: T): Promise<void> {
send(value: T, options?: ChannelOptions): Promise<void> {
if (this.#closed) {
return Promise.reject(
new ChannelClosedError("Cannot send to a closed channel", value),
);
}

if (this.#deliverToReceiver(value)) return Promise.resolve();
if (this.#deliverToReceiver(value)) return RESOLVED;

if (this.#buffer.length < this.#capacity) {
this.#buffer.pushBack(value);
return Promise.resolve();
return RESOLVED;
}

if (options?.signal?.aborted) {
return Promise.reject(options.signal.reason);
}

return new Promise<void>((res, rej) => {
this.#senders.pushBack({ value, res, rej });
const node: SenderNode<T> = { value, res, rej, cancelled: false };
this.#senders.pushBack(node);
const signal = options?.signal;
if (signal) {
const onAbort = () => {
node.cancelled = true;
rej(signal.reason);
};
signal.addEventListener("abort", onAbort, { once: true });
node.res = () => {
signal.removeEventListener("abort", onAbort);
res();
};
node.rej = (reason: unknown) => {
signal.removeEventListener("abort", onAbort);
rej(reason);
};
}
});
}

Expand All @@ -205,18 +274,58 @@ export class Channel<T>
* ch.close();
* ```
*
* @example Cancelling with an AbortSignal
* ```ts
* import { Channel } from "@std/async/unstable-channel";
* import { assertRejects } from "@std/assert";
*
* const ch = new Channel<number>();
* const controller = new AbortController();
* const p = ch.receive({ signal: controller.signal });
* controller.abort(new Error("cancelled"));
* await assertRejects(() => p, Error, "cancelled");
* ```
*
* @param options Optional settings for the receive operation.
* @returns A promise that resolves with the next value from the channel.
* @throws {ChannelClosedError} If the channel is closed and empty (no
* `value` property). If `close(reason)` was called, rejects with
* `reason` instead.
*/
receive(): Promise<T> {
receive(options?: ChannelOptions): Promise<T> {
if (this.#buffer.length > 0) return Promise.resolve(this.#dequeue());
if (!this.#senders.isEmpty()) return Promise.resolve(this.#takeSender());

const sender = this.#nextSender();
if (sender) {
sender.res();
return Promise.resolve(sender.value);
}

if (this.#closed) return Promise.reject(this.#receiveError());

if (options?.signal?.aborted) {
return Promise.reject(options.signal.reason);
}

return new Promise<T>((res, rej) => {
this.#receivers.pushBack({ res, rej });
const node: ReceiverNode<T> = { res, rej, cancelled: false };
this.#receivers.pushBack(node);
const signal = options?.signal;
if (signal) {
const onAbort = () => {
node.cancelled = true;
rej(signal.reason);
};
signal.addEventListener("abort", onAbort, { once: true });
node.res = (value: T) => {
signal.removeEventListener("abort", onAbort);
res(value);
};
node.rej = (reason: unknown) => {
signal.removeEventListener("abort", onAbort);
rej(reason);
};
}
});
}

Expand Down Expand Up @@ -248,13 +357,13 @@ export class Channel<T>
}

/**
* Non-blocking receive. The discriminated union avoids ambiguity when `T`
* itself can be `undefined`.
* Non-blocking receive. Discriminate on the `state` field to determine the
* outcome without ambiguity, even when `T` itself can be `undefined`.
*
* @returns `{ ok: true, value }` if a value was available, or
* `{ ok: false }` if the buffer is empty or the channel is closed. A
* close reason passed to {@linkcode Channel.close} is not surfaced here;
* use {@linkcode Channel.receive} to observe it.
* @returns A {@linkcode ChannelReceiveResult} — `{ state: "ok", value }`
* if a value was available, `{ state: "empty" }` if the channel is open
* but no value is ready, or `{ state: "closed" }` if the channel has
* been closed and no buffered values remain.
*
* @example Usage
* ```ts
Expand All @@ -263,16 +372,23 @@ export class Channel<T>
*
* const ch = new Channel<number>(1);
* await ch.send(42);
* assertEquals(ch.tryReceive(), { ok: true, value: 42 });
* assertEquals(ch.tryReceive(), { ok: false });
* assertEquals(ch.tryReceive(), { state: "ok", value: 42 });
* assertEquals(ch.tryReceive(), { state: "empty" });
* ch.close();
* assertEquals(ch.tryReceive(), { state: "closed" });
* ```
*/
tryReceive(): { ok: true; value: T } | { ok: false } {
if (this.#buffer.length > 0) return { ok: true, value: this.#dequeue() };
if (!this.#senders.isEmpty()) {
return { ok: true, value: this.#takeSender() };
tryReceive(): ChannelReceiveResult<T> {
if (this.#buffer.length > 0) {
return { state: "ok", value: this.#dequeue() };
}
const sender = this.#nextSender();
if (sender) {
sender.res();
return { state: "ok", value: sender.value };
}
return { ok: false };
if (this.#closed) return { state: "closed" };
return { state: "empty" };
}

/**
Expand All @@ -290,11 +406,26 @@ export class Channel<T>
* ch.close();
* assert(ch.closed);
* ```
*/
close(): void;
/**
* Closes the channel with a reason. All pending and future `receive()`
* calls reject with `reason` after draining buffered values. Pending
* `send()` calls reject with {@linkcode ChannelClosedError}. Idempotent.
*
* @param args If a reason argument is provided, all pending and future
* `receive()` calls reject with that reason (after draining buffered
* values). Enables error propagation from producer to consumer.
* @example Usage
* ```ts
* import { Channel } from "@std/async/unstable-channel";
* import { assertRejects } from "@std/assert";
*
* const ch = new Channel<number>();
* ch.close(new Error("upstream failure"));
* await assertRejects(() => ch.receive(), Error, "upstream failure");
* ```
*
* @param reason The reason to reject pending and future receivers with.
*/
close(reason: unknown): void;
close(...args: [reason: unknown] | []): void {
if (this.#closed) return;
this.#closed = true;
Expand All @@ -305,6 +436,7 @@ export class Channel<T>

let sender: SenderNode<T> | undefined;
while ((sender = this.#senders.popFront()) !== undefined) {
if (sender.cancelled) continue;
sender.rej(
new ChannelClosedError(
"Cannot send to a closed channel",
Expand All @@ -315,6 +447,7 @@ export class Channel<T>

let receiver: ReceiverNode<T> | undefined;
while ((receiver = this.#receivers.popFront()) !== undefined) {
if (receiver.cancelled) continue;
receiver.rej(this.#receiveError());
}
}
Expand Down Expand Up @@ -415,6 +548,47 @@ export class Channel<T>
}
}

/**
* Creates a {@linkcode ReadableStream} that yields values from this
* channel. The stream closes when the channel closes after draining
* buffered values. If the channel was closed with a reason, the stream
* errors with that reason. Cancelling the stream closes the channel.
*
* @example Usage
* ```ts
* import { Channel } from "@std/async/unstable-channel";
* import { assertEquals } from "@std/assert";
*
* const ch = new Channel<number>(4);
* await ch.send(1);
* await ch.send(2);
* ch.close();
*
* const values = await Array.fromAsync(ch.toReadableStream());
* assertEquals(values, [1, 2]);
* ```
*
* @returns A readable stream of channel values.
*/
toReadableStream(): ReadableStream<T> {
return new ReadableStream<T>({
pull: async (controller) => {
try {
controller.enqueue(await this.receive());
} catch (e) {
if (e instanceof ChannelClosedError && !this.#hasCloseReason) {
controller.close();
} else {
controller.error(e);
}
}
},
cancel: () => {
this.close();
},
});
}

/**
* Calls {@linkcode Channel.close}. Enables `using` for automatic cleanup.
*
Expand Down Expand Up @@ -448,12 +622,30 @@ export class Channel<T>
*/
[Symbol.asyncDispose](): Promise<void> {
this.close();
return Promise.resolve();
return RESOLVED;
}

/** Pops the next non-cancelled sender from the queue. */
#nextSender(): SenderNode<T> | undefined {
let sender: SenderNode<T> | undefined;
while ((sender = this.#senders.popFront()) !== undefined) {
if (!sender.cancelled) return sender;
}
return undefined;
}

/** Pops the next non-cancelled receiver from the queue. */
#nextReceiver(): ReceiverNode<T> | undefined {
let receiver: ReceiverNode<T> | undefined;
while ((receiver = this.#receivers.popFront()) !== undefined) {
if (!receiver.cancelled) return receiver;
}
return undefined;
}

/** Hands `value` to the next waiting receiver, if any. */
#deliverToReceiver(value: T): boolean {
const receiver = this.#receivers.popFront();
const receiver = this.#nextReceiver();
if (!receiver) return false;
receiver.res(value);
return true;
Expand All @@ -465,21 +657,14 @@ export class Channel<T>
*/
#dequeue(): T {
const value = this.#buffer.popFront()!;
const sender = this.#senders.popFront();
const sender = this.#nextSender();
if (sender) {
this.#buffer.pushBack(sender.value);
sender.res();
}
return value;
}

/** Takes a value directly from the head of the sender queue (unbuffered path). */
#takeSender(): T {
const sender = this.#senders.popFront()!;
sender.res();
return sender.value;
}

#receiveError(): unknown {
if (this.#hasCloseReason) return this.#closeReason;
return new ChannelClosedError("Cannot receive from a closed channel");
Expand Down
Loading
Loading