diff --git a/.changeset/itchy-seahorses-help.md b/.changeset/itchy-seahorses-help.md new file mode 100644 index 0000000..6d81f24 --- /dev/null +++ b/.changeset/itchy-seahorses-help.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +allow different error types when existing stream has `never` error diff --git a/src/index.ts b/src/index.ts index a8b083a..c95b12a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,6 @@ export type { } from "./atom"; // Re-export useful utility types -export type { MaybePromise, Truthy, CallbackOrStream } from "./util"; +export type { MaybePromise, Truthy, CallbackOrStream, NodeCallback } from "./util"; export default Stream; diff --git a/src/stream/base.ts b/src/stream/base.ts index c52f62d..1d6e88a 100644 --- a/src/stream/base.ts +++ b/src/stream/base.ts @@ -1,7 +1,7 @@ import { normalise, type Atom, type MaybeAtom, error, exception } from "../atom"; import { Stream } from "."; import { Readable, Writable } from "stream"; -import { createNodeCallback, newSignal } from "../util"; +import { createNodeCallback, newSignal, type NodeCallback } from "../util"; /** * Unique type to represent the stream end marker. @@ -47,7 +47,7 @@ export class StreamBase { * * @group Creation */ - static from( + static from( value: | Promise> | Iterator> @@ -97,7 +97,7 @@ export class StreamBase { * * @group Creation */ - static fromCallback(cb: (next: (error: E, value: T) => unknown) => void): Stream { + static fromCallback(cb: (next: NodeCallback) => void): Stream { // Set up a next function const [promise, next] = createNodeCallback(); @@ -115,7 +115,7 @@ export class StreamBase { * * @group Creation */ - static fromPromise(promise: Promise>): Stream { + static fromPromise(promise: Promise>): Stream { let awaited = false; return Stream.fromNext(async () => { @@ -136,7 +136,7 @@ export class StreamBase { * * @group Creation */ - static fromIterator( + static fromIterator( iterator: Iterator> | AsyncIterator>, ): Stream { return Stream.fromNext(async () => { @@ -159,7 +159,7 @@ export class StreamBase { * * @group Creation */ - static fromIterable( + static fromIterable( iterable: Iterable> | AsyncIterable>, ): Stream { if (Symbol.iterator in iterable) { @@ -179,7 +179,7 @@ export class StreamBase { * * @group Creation */ - static fromArray(array: MaybeAtom[]): Stream { + static fromArray(array: MaybeAtom[]): Stream { // Clone the array so that shifting elements doesn't impact the original array. array = [...array]; @@ -197,7 +197,7 @@ export class StreamBase { * * @group Creation */ - static fromNext(next: () => Promise | StreamEnd>): Stream { + static fromNext(next: () => Promise | StreamEnd>): Stream { return new Stream( new Readable({ objectMode: true, @@ -232,7 +232,7 @@ export class StreamBase { * * @group Creation */ - static fromPusher(): { + static fromPusher(): { stream: Stream; push: (value: MaybeAtom) => void; done: () => void; @@ -297,7 +297,7 @@ export class StreamBase { * * @group Creation */ - static of(value: MaybeAtom): Stream { + static of(value: MaybeAtom): Stream { let consumed = false; return Stream.fromNext(async () => { if (!consumed) { @@ -339,7 +339,7 @@ export class StreamBase { * Create a stream and corresponding writable Node stream, where any writes to the writable * Node stream will be emitted on the returned stream. */ - static writable(): { stream: Stream; writable: Writable } { + static writable(): { stream: Stream; writable: Writable } { const buffer: (Atom | StreamEnd)[] = []; const queue: ((value: Atom | StreamEnd) => void)[] = []; diff --git a/src/stream/higher-order.ts b/src/stream/higher-order.ts index a2ff452..efabaab 100644 --- a/src/stream/higher-order.ts +++ b/src/stream/higher-order.ts @@ -14,6 +14,8 @@ function reject(value: T): { reject: T } { type FilterResult = { accept: A } | { reject: R }; +type IfNever = [T] extends [never] ? A : B; + export class HigherOrderStream extends StreamTransforms { /** * Base implementation of `flat*` operations. In general, all of these methods will filter over @@ -250,13 +252,13 @@ export class HigherOrderStream extends StreamTransforms { * * @group Higher Order */ - otherwise(cbOrStream: CallbackOrStream): Stream { + otherwise>(cbOrStream: CallbackOrStream): Stream { return this.consume(async function* (it) { // Count the items being emitted from the iterator let count = 0; for await (const atom of it) { count += 1; - yield atom; + yield atom as Atom; } // If nothing was emitted, then create the stream and emit it diff --git a/src/util.ts b/src/util.ts index c7b272a..62c471b 100644 --- a/src/util.ts +++ b/src/util.ts @@ -31,6 +31,8 @@ export async function exhaust(iterable: AsyncIterable) { } } +export type NodeCallback = (err: E | null, value?: T) => void; + /** * Creates a `next` function and associated promise to promise-ify a node style callback. The * `next` function must be passed as the callback to a function, and the resulting error or value @@ -40,7 +42,7 @@ export async function exhaust(iterable: AsyncIterable) { * promise, whilst the value of the callback (second parameter) will be emitted as an `Ok` atom on * the promise. */ -export function createNodeCallback(): [Promise>, (error: E, value: T) => void] { +export function createNodeCallback(): [Promise>, NodeCallback] { // Resolve function to be hoisted out of the promise let resolve: (atom: Atom) => void; @@ -50,10 +52,10 @@ export function createNodeCallback(): [Promise>, (error: E, val }); // Create the next callback - const next = (err: E, value: T) => { + const next: NodeCallback = (err, value) => { if (err) { resolve(Stream.error(err)); - } else { + } else if (value) { resolve(Stream.ok(value)); } }; diff --git a/test/benchmarks/index.bench.ts b/test/benchmarks/index.bench.ts index 82a1d68..99a90dc 100644 --- a/test/benchmarks/index.bench.ts +++ b/test/benchmarks/index.bench.ts @@ -52,7 +52,15 @@ describe("simple transform operations", () => { describe("sample data operations", () => { bench("windpipe", async () => { - await Stream.from([ + await Stream.from< + { + name: string; + id: number; + permissions: { read: boolean; write: boolean }; + balance: number; + }, + string + >([ { name: "test user 1", id: 1, diff --git a/test/creation.test.ts b/test/creation.test.ts index d29ddd0..e685ec2 100644 --- a/test/creation.test.ts +++ b/test/creation.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, test, vi } from "vitest"; -import $ from "../src"; +import $, { type NodeCallback } from "../src"; import { Readable } from "stream"; describe("stream creation", () => { @@ -100,12 +100,9 @@ describe("stream creation", () => { * @param success - Whether the method should succeed or fail. * @param cb - Node-style callback to pass error or value to. */ - function someNodeCallback( - success: boolean, - cb: (error: string | undefined, value?: number) => void, - ) { + function someNodeCallback(success: boolean, cb: NodeCallback) { if (success) { - cb(undefined, 123); + cb(null, 123); } else { cb("an error"); } @@ -114,7 +111,7 @@ describe("stream creation", () => { test("value returned from callback", async ({ expect }) => { expect.assertions(1); - const s = $.fromCallback((next) => { + const s = $.fromCallback((next) => { someNodeCallback(true, next); }); @@ -124,7 +121,7 @@ describe("stream creation", () => { test("error returned from callback", async ({ expect }) => { expect.assertions(1); - const s = $.fromCallback((next) => { + const s = $.fromCallback((next) => { someNodeCallback(false, next); }); diff --git a/test/higher-order.test.ts b/test/higher-order.test.ts index d0e83e9..6b3a3dd 100644 --- a/test/higher-order.test.ts +++ b/test/higher-order.test.ts @@ -125,6 +125,13 @@ describe.concurrent("higher order streams", () => { expect(await s.toArray({ atoms: true })).toEqual([$.exception("some error", [])]); }); + + test("stream with never error", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]).otherwise($.ofError("some error")); + expect(await s.toArray({ atoms: true })).toEqual([$.error("some error")]); + }); }); describe.concurrent("cachedFlatMap", () => {