diff --git a/.changeset/tiny-tables-mix.md b/.changeset/tiny-tables-mix.md new file mode 100644 index 0000000..5cfc909 --- /dev/null +++ b/.changeset/tiny-tables-mix.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +create `single` consumption method diff --git a/src/index.ts b/src/index.ts index c95b12a..d59bda7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,4 +17,6 @@ export type { // Re-export useful utility types export type { MaybePromise, Truthy, CallbackOrStream, NodeCallback } from "./util"; +export { WindpipeConsumptionError } from "./stream"; + export default Stream; diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 0a859ff..195641e 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -3,6 +3,12 @@ import { isOk, type Atom } from "../atom"; import { StreamBase } from "./base"; import { exhaust } from "../util"; +export class WindpipeConsumptionError extends Error { + static noItems() { + return new WindpipeConsumptionError("no items found whilst consuming stream"); + } +} + export class StreamConsumption extends StreamBase { /** * Create an iterator that will emit each atom in the stream. @@ -53,6 +59,47 @@ export class StreamConsumption extends StreamBase { }; } + /** + * Pull the stream once and remove the first item. This will not consume the rest of the + * stream. + * + * @note This method can only be called once on a given stream. + */ + single(options: { atom: true; optional: true }): Promise | undefined>; + single(options: { atom: true; optional?: false }): Promise>; + single(options: { atom?: false; optional: true }): Promise; + single(options?: { atom?: false; optional?: false }): Promise; + async single({ + atom = false, + optional = false, + }: { + atom?: boolean; + optional?: boolean; + } = {}): Promise | undefined> { + const it = this[Symbol.asyncIterator](); + + const { value, done } = await it.next(); + + if (done) { + if (optional) { + // Fine to return undefined + return undefined; + } + + throw WindpipeConsumptionError.noItems(); + } + + if (atom) { + return value; + } + + if (isOk(value)) { + return value.value; + } + + throw value.value; + } + /** * Iterate through each atom in the stream, and return them as a single array. * diff --git a/src/stream/index.ts b/src/stream/index.ts index 68e7f1f..ab78f89 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -13,6 +13,7 @@ import { import { HigherOrderStream } from "./higher-order"; export type { StreamEnd } from "./base"; +export { WindpipeConsumptionError } from "./consumption"; /** * @template T - Type of the 'values' on the stream. diff --git a/test/consumption.test.ts b/test/consumption.test.ts index b90ce68..3c3c885 100644 --- a/test/consumption.test.ts +++ b/test/consumption.test.ts @@ -1,5 +1,5 @@ -import { describe, test } from "vitest"; -import $ from "../src"; +import { describe, test, vi } from "vitest"; +import $, { WindpipeConsumptionError } from "../src"; import { Readable } from "node:stream"; describe.concurrent("stream consumption", () => { @@ -29,6 +29,145 @@ describe.concurrent("stream consumption", () => { }); }); + describe.concurrent("single", () => { + describe("single ok atom", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single()).resolves.toEqual(1); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ optional: false })).resolves.toEqual(1); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ optional: true })).resolves.toEqual(1); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ atom: false })).resolves.toEqual(1); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.ok(1)); + + expect(s.single({ atom: true })).resolves.toEqual($.ok(1)); + }); + }); + + describe("single error atom", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single()).rejects.toEqual(1); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ optional: false })).rejects.toEqual(1); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ optional: true })).rejects.toEqual(1); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ atom: false })).rejects.toEqual(1); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.of($.error(1)); + + expect(s.single({ atom: true })).resolves.toEqual($.error(1)); + }); + }); + + describe("empty stream", () => { + test("with no params", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single()).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with optional false", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ optional: false })).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with optional true", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ optional: true })).resolves.toEqual(undefined); + }); + + test("with atom false", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ atom: false })).rejects.toThrow(WindpipeConsumptionError); + }); + + test("with atom true", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]); + + expect(s.single({ atom: true })).rejects.toThrow(WindpipeConsumptionError); + }); + }); + + test("single pull", async ({ expect }) => { + expect.assertions(2); + + const fn = vi.fn().mockReturnValue(Promise.resolve(1)); + + const s = $.fromNext(fn); + + expect(s.single()).resolves.toEqual(1); + expect(fn).toBeCalledTimes(1); + }); + }); + describe.concurrent("toArray", () => { test("values", async ({ expect }) => { expect.assertions(1);