diff --git a/packages/db-client/package.json b/packages/db-client/package.json index 36c9cdc2..5a00ac8d 100644 --- a/packages/db-client/package.json +++ b/packages/db-client/package.json @@ -46,7 +46,7 @@ }, "dependencies": { "@grpc/grpc-js": "^1.12.4", - "@kurrent/bridge": "^0.1.0", + "@kurrent/bridge": "^0.1.1", "@types/debug": "^4.1.12", "@types/google-protobuf": "^3.15.12", "@types/node": "^22.10.2", diff --git a/packages/db-client/src/streams/readAll.ts b/packages/db-client/src/streams/readAll.ts index ae75873b..d8132a69 100644 --- a/packages/db-client/src/streams/readAll.ts +++ b/packages/db-client/src/streams/readAll.ts @@ -3,6 +3,7 @@ import type { ReadPosition, Direction, AllStreamResolvedEvent, + Filter, } from "../types"; import { FORWARDS, START } from "../constants"; import { Client } from "../Client"; @@ -34,6 +35,10 @@ export interface ReadAllOptions extends BaseOptions { * @defaultValue FORWARDS */ direction?: Direction; + /** + * Filters events or streams based upon a predicate. + */ + filter?: Filter; } declare module "../Client" { @@ -66,6 +71,7 @@ Client.prototype.readAll = function ( direction, requiresLeader: baseOptions.requiresLeader ?? true, credentials: baseOptions.credentials, + filter: baseOptions.filter, }; let stream; diff --git a/packages/test/src/streams/readAll.test.ts b/packages/test/src/streams/readAll.test.ts index ff45ff27..bd1509a3 100644 --- a/packages/test/src/streams/readAll.test.ts +++ b/packages/test/src/streams/readAll.test.ts @@ -7,6 +7,7 @@ import { AllStreamBinaryRecordedEvent, LinkEvent, KurrentDBClient, + streamNameFilter, } from "@kurrent/kurrentdb-client"; describe("readAll", () => { @@ -45,6 +46,27 @@ describe("readAll", () => { expect(notSystemStreams[0]).toBe(STREAM_NAME_A); }); + test("with filter", async () => { + let count = 0; + const notSystemStreams = []; + + for await (const { event } of client.readAll({ + filter: streamNameFilter({ + prefixes: [STREAM_NAME_A], + }), + })) { + count++; + + if (event && !event.streamId.startsWith("$")) { + notSystemStreams.push(event.streamId); + } + } + + expect(count).toEqual(4); + expect(notSystemStreams.length).toEqual(4); + expect(Array.from(new Set(notSystemStreams))).toEqual([STREAM_NAME_A]); + }); + test("from position", async () => { let eventToExtract!: AllStreamResolvedEvent; diff --git a/yarn.lock b/yarn.lock index ec925801..b73ca9bf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1048,6 +1048,13 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge-darwin-arm64@npm:0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge-darwin-arm64@npm:0.1.1" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + "@kurrent/bridge-darwin-x64@npm:0.1.0": version: 0.1.0 resolution: "@kurrent/bridge-darwin-x64@npm:0.1.0" @@ -1055,6 +1062,13 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge-darwin-x64@npm:0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge-darwin-x64@npm:0.1.1" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + "@kurrent/bridge-linux-arm64-gnu@npm:0.1.0": version: 0.1.0 resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.1.0" @@ -1062,6 +1076,13 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge-linux-arm64-gnu@npm:0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.1.1" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + "@kurrent/bridge-linux-x64-gnu@npm:0.1.0": version: 0.1.0 resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.1.0" @@ -1069,6 +1090,13 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge-linux-x64-gnu@npm:0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.1.1" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + "@kurrent/bridge-win32-x64-msvc@npm:0.1.0": version: 0.1.0 resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.1.0" @@ -1076,6 +1104,13 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge-win32-x64-msvc@npm:0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.1.1" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + "@kurrent/bridge@npm:^0.1.0": version: 0.1.0 resolution: "@kurrent/bridge@npm:0.1.0" @@ -1101,12 +1136,37 @@ __metadata: languageName: node linkType: hard +"@kurrent/bridge@npm:^0.1.1": + version: 0.1.1 + resolution: "@kurrent/bridge@npm:0.1.1" + dependencies: + "@kurrent/bridge-darwin-arm64": "npm:0.1.1" + "@kurrent/bridge-darwin-x64": "npm:0.1.1" + "@kurrent/bridge-linux-arm64-gnu": "npm:0.1.1" + "@kurrent/bridge-linux-x64-gnu": "npm:0.1.1" + "@kurrent/bridge-win32-x64-msvc": "npm:0.1.1" + "@neon-rs/load": "npm:^0.1.82" + dependenciesMeta: + "@kurrent/bridge-darwin-arm64": + optional: true + "@kurrent/bridge-darwin-x64": + optional: true + "@kurrent/bridge-linux-arm64-gnu": + optional: true + "@kurrent/bridge-linux-x64-gnu": + optional: true + "@kurrent/bridge-win32-x64-msvc": + optional: true + checksum: 10c0/4a4e20d1b57b52634cb984d8fa42ec8cc1082a0858a9159e14688b31944e3116fcc0678e6ef637ecc1448c9db6b396d14d17c51eb34ae6e1467b916884cad92a + languageName: node + linkType: hard + "@kurrent/kurrentdb-client@workspace:^, @kurrent/kurrentdb-client@workspace:packages/db-client": version: 0.0.0-use.local resolution: "@kurrent/kurrentdb-client@workspace:packages/db-client" dependencies: "@grpc/grpc-js": "npm:^1.12.4" - "@kurrent/bridge": "npm:^0.1.0" + "@kurrent/bridge": "npm:^0.1.1" "@types/debug": "npm:^4.1.12" "@types/google-protobuf": "npm:^3.15.12" "@types/node": "npm:^22.10.2"