diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 14adc0f..0a859ff 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -86,6 +86,8 @@ export class StreamConsumption extends StreamBase { * @param options.atoms - By default, only `ok` values are serialised, however enabling this * will serialise all values. * + * @note this will skip `undefined` values as they cannot be serialised. + * * @see {@link Stream#toReadable} if serialisation is not required * @group Consumption */ @@ -109,6 +111,11 @@ export class StreamConsumption extends StreamBase { continue; } + // Skip undefined values (they cannot be serialised into JSON) + if (atom.value === undefined) { + continue; + } + if (sentItems > 0) { if (options?.single) { // Monitor for multiple values being sent when only one is desired diff --git a/test/consumption.test.ts b/test/consumption.test.ts index 30dcbb3..b90ce68 100644 --- a/test/consumption.test.ts +++ b/test/consumption.test.ts @@ -3,6 +3,32 @@ import $ from "../src"; import { Readable } from "node:stream"; describe.concurrent("stream consumption", () => { + describe.concurrent("serialise", () => { + test("simple number values", async ({ expect }) => { + expect.assertions(1); + + const jsonStream = $.from([1, 2, 3]).serialise(); + const json = await streamToString(jsonStream); + expect(json).toEqual("[1,2,3]"); + }); + + test("null values", async ({ expect }) => { + expect.assertions(1); + + const jsonStream = $.from([1, null, 3]).serialise(); + const json = await streamToString(jsonStream); + expect(json).toEqual("[1,null,3]"); + }); + + test("skip undefined values", async ({ expect }) => { + expect.assertions(1); + + const jsonStream = $.from([1, undefined, 3]).serialise(); + const json = await streamToString(jsonStream); + expect(json).toEqual("[1,3]"); + }); + }); + describe.concurrent("toArray", () => { test("values", async ({ expect }) => { expect.assertions(1); @@ -215,6 +241,12 @@ function promisifyStream(stream: Readable): Promise { }); } +async function streamToString(stream: Readable): Promise { + const textDecoder = new TextDecoder(); + const chunks = await stream.toArray(); + return chunks.map((chunk) => textDecoder.decode(chunk)).join(""); +} + async function emptyStream(stream: Readable): Promise<{ data: unknown[]; errors: unknown[] }> { const errors: unknown[] = []; const data: unknown[] = [];