Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/itchy-seahorses-help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

allow different error types when existing stream has `never` error
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ export type {
} from "./atom";

// Re-export useful utility types
export type { MaybePromise, Truthy, CallbackOrStream } from "./util";
export type { MaybePromise, Truthy, CallbackOrStream, NodeCallback } from "./util";

export default Stream;
22 changes: 11 additions & 11 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { normalise, type Atom, type MaybeAtom, error, exception } from "../atom";
import { Stream } from ".";
import { Readable, Writable } from "stream";
import { createNodeCallback, newSignal } from "../util";
import { createNodeCallback, newSignal, type NodeCallback } from "../util";

/**
* Unique type to represent the stream end marker.
Expand Down Expand Up @@ -47,7 +47,7 @@ export class StreamBase {
*
* @group Creation
*/
static from<T, E>(
static from<T, E = never>(
value:
| Promise<MaybeAtom<T, E>>
| Iterator<MaybeAtom<T, E>>
Expand Down Expand Up @@ -97,7 +97,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromCallback<T, E>(cb: (next: (error: E, value: T) => unknown) => void): Stream<T, E> {
static fromCallback<T, E = never>(cb: (next: NodeCallback<T, E>) => void): Stream<T, E> {
// Set up a next function
const [promise, next] = createNodeCallback<T, E>();

Expand All @@ -115,7 +115,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromPromise<T, E>(promise: Promise<MaybeAtom<T, E>>): Stream<T, E> {
static fromPromise<T, E = never>(promise: Promise<MaybeAtom<T, E>>): Stream<T, E> {
let awaited = false;

return Stream.fromNext(async () => {
Expand All @@ -136,7 +136,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromIterator<T, E>(
static fromIterator<T, E = never>(
iterator: Iterator<MaybeAtom<T, E>> | AsyncIterator<MaybeAtom<T, E>>,
): Stream<T, E> {
return Stream.fromNext(async () => {
Expand All @@ -159,7 +159,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromIterable<T, E>(
static fromIterable<T, E = never>(
iterable: Iterable<MaybeAtom<T, E>> | AsyncIterable<MaybeAtom<T, E>>,
): Stream<T, E> {
if (Symbol.iterator in iterable) {
Expand All @@ -179,7 +179,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromArray<T, E>(array: MaybeAtom<T, E>[]): Stream<T, E> {
static fromArray<T, E = never>(array: MaybeAtom<T, E>[]): Stream<T, E> {
// Clone the array so that shifting elements doesn't impact the original array.
array = [...array];

Expand All @@ -197,7 +197,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromNext<T, E>(next: () => Promise<MaybeAtom<T, E> | StreamEnd>): Stream<T, E> {
static fromNext<T, E = never>(next: () => Promise<MaybeAtom<T, E> | StreamEnd>): Stream<T, E> {
return new Stream(
new Readable({
objectMode: true,
Expand Down Expand Up @@ -232,7 +232,7 @@ export class StreamBase {
*
* @group Creation
*/
static fromPusher<T, E>(): {
static fromPusher<T, E = never>(): {
stream: Stream<T, E>;
push: (value: MaybeAtom<T, E>) => void;
done: () => void;
Expand Down Expand Up @@ -297,7 +297,7 @@ export class StreamBase {
*
* @group Creation
*/
static of<T, E>(value: MaybeAtom<T, E>): Stream<T, E> {
static of<T, E = never>(value: MaybeAtom<T, E>): Stream<T, E> {
let consumed = false;
return Stream.fromNext(async () => {
if (!consumed) {
Expand Down Expand Up @@ -339,7 +339,7 @@ export class StreamBase {
* Create a stream and corresponding writable Node stream, where any writes to the writable
* Node stream will be emitted on the returned stream.
*/
static writable<T, E>(): { stream: Stream<T, E>; writable: Writable } {
static writable<T, E = never>(): { stream: Stream<T, E>; writable: Writable } {
const buffer: (Atom<T, E> | StreamEnd)[] = [];
const queue: ((value: Atom<T, E> | StreamEnd) => void)[] = [];

Expand Down
6 changes: 4 additions & 2 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ function reject<T>(value: T): { reject: T } {

type FilterResult<A, R> = { accept: A } | { reject: R };

type IfNever<T, A, B> = [T] extends [never] ? A : B;

export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
/**
* Base implementation of `flat*` operations. In general, all of these methods will filter over
Expand Down Expand Up @@ -250,13 +252,13 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
*
* @group Higher Order
*/
otherwise(cbOrStream: CallbackOrStream<T, E>): Stream<T, E> {
otherwise<F extends IfNever<E, unknown, E>>(cbOrStream: CallbackOrStream<T, F>): Stream<T, F> {
return this.consume(async function* (it) {
// Count the items being emitted from the iterator
let count = 0;
for await (const atom of it) {
count += 1;
yield atom;
yield atom as Atom<T, F>;
}

// If nothing was emitted, then create the stream and emit it
Expand Down
8 changes: 5 additions & 3 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export async function exhaust(iterable: AsyncIterable<unknown>) {
}
}

export type NodeCallback<T, E> = (err: E | null, value?: T) => void;

/**
* Creates a `next` function and associated promise to promise-ify a node style callback. The
* `next` function must be passed as the callback to a function, and the resulting error or value
Expand All @@ -40,7 +42,7 @@ export async function exhaust(iterable: AsyncIterable<unknown>) {
* promise, whilst the value of the callback (second parameter) will be emitted as an `Ok` atom on
* the promise.
*/
export function createNodeCallback<T, E>(): [Promise<Atom<T, E>>, (error: E, value: T) => void] {
export function createNodeCallback<T, E>(): [Promise<Atom<T, E>>, NodeCallback<T, E>] {
// Resolve function to be hoisted out of the promise
let resolve: (atom: Atom<T, E>) => void;

Expand All @@ -50,10 +52,10 @@ export function createNodeCallback<T, E>(): [Promise<Atom<T, E>>, (error: E, val
});

// Create the next callback
const next = (err: E, value: T) => {
const next: NodeCallback<T, E> = (err, value) => {
if (err) {
resolve(Stream.error(err));
} else {
} else if (value) {
resolve(Stream.ok(value));
}
};
Expand Down
10 changes: 9 additions & 1 deletion test/benchmarks/index.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ describe("simple transform operations", () => {

describe("sample data operations", () => {
bench("windpipe", async () => {
await Stream.from([
await Stream.from<
{
name: string;
id: number;
permissions: { read: boolean; write: boolean };
balance: number;
},
string
>([
{
name: "test user 1",
id: 1,
Expand Down
13 changes: 5 additions & 8 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { afterEach, beforeEach, describe, test, vi } from "vitest";
import $ from "../src";
import $, { type NodeCallback } from "../src";
import { Readable } from "stream";

describe("stream creation", () => {
Expand Down Expand Up @@ -100,12 +100,9 @@ describe("stream creation", () => {
* @param success - Whether the method should succeed or fail.
* @param cb - Node-style callback to pass error or value to.
*/
function someNodeCallback(
success: boolean,
cb: (error: string | undefined, value?: number) => void,
) {
function someNodeCallback(success: boolean, cb: NodeCallback<number, string>) {
if (success) {
cb(undefined, 123);
cb(null, 123);
} else {
cb("an error");
}
Expand All @@ -114,7 +111,7 @@ describe("stream creation", () => {
test("value returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
const s = $.fromCallback<number, string>((next) => {
someNodeCallback(true, next);
});

Expand All @@ -124,7 +121,7 @@ describe("stream creation", () => {
test("error returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
const s = $.fromCallback<number, string>((next) => {
someNodeCallback(false, next);
});

Expand Down
7 changes: 7 additions & 0 deletions test/higher-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ describe.concurrent("higher order streams", () => {

expect(await s.toArray({ atoms: true })).toEqual([$.exception("some error", [])]);
});

test("stream with never error", async ({ expect }) => {
expect.assertions(1);

const s = $.from<number, never>([]).otherwise($.ofError("some error"));
expect(await s.toArray({ atoms: true })).toEqual([$.error("some error")]);
});
});

describe.concurrent("cachedFlatMap", () => {
Expand Down