From 5d20101e719b8d884276086c69695a6b116e1c76 Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Wed, 12 Feb 2025 09:41:48 +1100 Subject: [PATCH 1/2] feat: add the `single` consumption method --- .changeset/tiny-tables-mix.md | 5 ++ src/index.ts | 2 + src/stream/consumption.ts | 47 +++++++++++ src/stream/index.ts | 1 + test/consumption.test.ts | 143 +++++++++++++++++++++++++++++++++- 5 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 .changeset/tiny-tables-mix.md diff --git a/.changeset/tiny-tables-mix.md b/.changeset/tiny-tables-mix.md new file mode 100644 index 0000000..5cfc909 --- /dev/null +++ b/.changeset/tiny-tables-mix.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +create `single` consumption method diff --git a/src/index.ts b/src/index.ts index c95b12a..d59bda7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,4 +17,6 @@ export type { // Re-export useful utility types export type { MaybePromise, Truthy, CallbackOrStream, NodeCallback } from "./util"; +export { WindpipeConsumptionError } from "./stream"; + export default Stream; diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 0a859ff..6d5fd79 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -3,6 +3,12 @@ import { isOk, type Atom } from "../atom"; import { StreamBase } from "./base"; import { exhaust } from "../util"; +export class WindpipeConsumptionError extends Error { + static noItems() { + return new WindpipeConsumptionError("no items found whilst consuming stream"); + } +} + export class StreamConsumption extends StreamBase { /** * Create an iterator that will emit each atom in the stream. @@ -53,6 +59,47 @@ export class StreamConsumption extends StreamBase { }; } + /** + * Pull the stream once and remove the first item. This will not consume the rest of the + * stream. + * + * @note This method can only be called once. + */ + single(options: { atom: true; optional: true }): Promise | undefined>; + single(options: { atom: true; optional?: false }): Promise>; + single(options: { atom?: false; optional: true }): Promise; + single(options?: { atom?: false; optional?: false }): Promise; + async single({ + atom = false, + optional = false, + }: { + atom?: boolean; + optional?: boolean; + } = {}): Promise | undefined> { + const it = this[Symbol.asyncIterator](); + + const { value, done } = await it.next(); + + if (done) { + if (optional) { + // Fine to return undefined + return undefined; + } + + throw WindpipeConsumptionError.noItems(); + } + + if (atom) { + return value; + } + + if (isOk(value)) { + return value.value; + } + + throw value.value; + } + /** * Iterate through each atom in the stream, and return them as a single array. * diff --git a/src/stream/index.ts b/src/stream/index.ts index 68e7f1f..ab78f89 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -13,6 +13,7 @@ import { import { HigherOrderStream } from "./higher-order"; export type { StreamEnd } from "./base"; +export { WindpipeConsumptionError } from "./consumption"; /** * @template T - Type of the 'values' on the stream. diff --git a/test/consumption.test.ts b/test/consumption.test.ts index b90ce68..3c3c885 100644 --- a/test/consumption.test.ts +++ b/test/consumption.test.ts @@ -1,5 +1,5 @@ -import { describe, test } from "vitest"; -import $ from "../src"; +import { describe, test, vi } from "vitest"; +import $, { WindpipeConsumptionError } from "../src"; import { Readable } from "node:stream"; describe.concurrent("stream consumption", () => { @@ -29,6 +29,145 @@ describe.concurrent("stream consumption", () => { }); }); + describe.concurrent("single", () => { + describe("single ok atom", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single()).resolves.toEqual(1); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ optional: false })).resolves.toEqual(1); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ optional: true })).resolves.toEqual(1); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ atom: false })).resolves.toEqual(1); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ atom: true })).resolves.toEqual($.ok(1)); + }); + }); + + describe("single error atom", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single()).rejects.toEqual(1); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ optional: false })).rejects.toEqual(1); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ optional: true })).rejects.toEqual(1); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ atom: false })).rejects.toEqual(1); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ atom: true })).resolves.toEqual($.error(1)); + }); + }); + + describe("empty stream", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single()).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ optional: false })).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ optional: true })).resolves.toEqual(undefined); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ atom: false })).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ atom: true })).rejects.toThrow(WindpipeConsumptionError); + }); + }); + + test("single pull", async ({ expect }) => { + expect.assertions(2); + + const fn = vi.fn().mockReturnValue(Promise.resolve(1)); + + const s = $.fromNext(fn); + + expect(s.single()).resolves.toEqual(1); + expect(fn).toBeCalledTimes(1); + }); + }); + describe.concurrent("toArray", () => { test("values", async ({ expect }) => { expect.assertions(1); From 55dad68ba154e1099508739921b2071ba985df5d Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Wed, 12 Feb 2025 10:40:37 +1100 Subject: [PATCH 2/2] Update src/stream/consumption.ts Co-authored-by: Ewan Breakey --- src/stream/consumption.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 6d5fd79..195641e 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -63,7 +63,7 @@ export class StreamConsumption extends StreamBase { * Pull the stream once and remove the first item. This will not consume the rest of the * stream. * - * @note This method can only be called once. + * @note This method can only be called once on a given stream. */ single(options: { atom: true; optional: true }): Promise | undefined>; single(options: { atom: true; optional?: false }): Promise>;