Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export class StreamConsumption<T, E> extends StreamBase {
* @param options.atoms - By default, only `ok` values are serialised, however enabling this
* will serialise all values.
*
* @note this will skip `undefined` values as they cannot be serialised.
*
* @see {@link Stream#toReadable} if serialisation is not required
* @group Consumption
*/
Expand All @@ -109,6 +111,11 @@ export class StreamConsumption<T, E> extends StreamBase {
continue;
}

// Skip undefined values (they cannot be serialised into JSON)
if (atom.value === undefined) {
continue;
}

if (sentItems > 0) {
if (options?.single) {
// Monitor for multiple values being sent when only one is desired
Expand Down
32 changes: 32 additions & 0 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,32 @@ import $ from "../src";
import { Readable } from "node:stream";

describe.concurrent("stream consumption", () => {
describe.concurrent("serialise", () => {
test("simple number values", async ({ expect }) => {
expect.assertions(1);

const jsonStream = $.from([1, 2, 3]).serialise();
const json = await streamToString(jsonStream);
expect(json).toEqual("[1,2,3]");
});

test("null values", async ({ expect }) => {
expect.assertions(1);

const jsonStream = $.from([1, null, 3]).serialise();
const json = await streamToString(jsonStream);
expect(json).toEqual("[1,null,3]");
});

test("skip undefined values", async ({ expect }) => {
expect.assertions(1);

const jsonStream = $.from([1, undefined, 3]).serialise();
const json = await streamToString(jsonStream);
expect(json).toEqual("[1,3]");
});
});

describe.concurrent("toArray", () => {
test("values", async ({ expect }) => {
expect.assertions(1);
Expand Down Expand Up @@ -215,6 +241,12 @@ function promisifyStream(stream: Readable): Promise<unknown[]> {
});
}

async function streamToString(stream: Readable): Promise<string> {
const textDecoder = new TextDecoder();
const chunks = await stream.toArray();
return chunks.map((chunk) => textDecoder.decode(chunk)).join("");
}

async function emptyStream(stream: Readable): Promise<{ data: unknown[]; errors: unknown[] }> {
const errors: unknown[] = [];
const data: unknown[] = [];
Expand Down
Loading