From 6df5c30fa46ccffaef9e2a7aa2c242c9b6a98efa Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 24 Jan 2025 17:34:05 +0500 Subject: [PATCH 1/9] feat(services): add pub-sub --- services/pub-sub/.npmignore | 1 + services/pub-sub/README.md | 137 +++++++++++++++++++++++++++ services/pub-sub/package.json | 34 +++++++ services/pub-sub/src/index.ts | 3 + services/pub-sub/src/pubSub.tests.ts | 87 +++++++++++++++++ services/pub-sub/src/pubSub.ts | 108 +++++++++++++++++++++ services/pub-sub/tsconfig.build.json | 4 + services/pub-sub/tsconfig.json | 11 +++ 8 files changed, 385 insertions(+) create mode 100644 services/pub-sub/.npmignore create mode 100644 services/pub-sub/README.md create mode 100644 services/pub-sub/package.json create mode 100644 services/pub-sub/src/index.ts create mode 100644 services/pub-sub/src/pubSub.tests.ts create mode 100644 services/pub-sub/src/pubSub.ts create mode 100644 services/pub-sub/tsconfig.build.json create mode 100644 services/pub-sub/tsconfig.json diff --git a/services/pub-sub/.npmignore b/services/pub-sub/.npmignore new file mode 100644 index 00000000..85de9cf9 --- /dev/null +++ b/services/pub-sub/.npmignore @@ -0,0 +1 @@ +src diff --git a/services/pub-sub/README.md b/services/pub-sub/README.md new file mode 100644 index 00000000..0713649d --- /dev/null +++ b/services/pub-sub/README.md @@ -0,0 +1,137 @@ +# `@byndyusoft-ui/pub-sub` + +> A performant Pub/Sub interface with controlled instance management + +### Installation + +```bash +npm i @byndyusoft-ui/pub-sub +``` + +## Usage + +#### Import the class + +```ts +import PubSub from '@byndyusoft-ui/pub-sub'; +``` + +#### Define your channels +Create a type that defines the channels and their corresponding callback signatures. + +```ts +type ChannelsType = { + addTodo: (data: TodoType) => void; + removeTodo: (todoId: number) => void; + removeAll: () => void; +}; +``` + +#### Create an instance +Use the `getInstance` method to create or retrieve a singleton instance of `PubSub`. + +```ts +const pubSubInstance = PubSub.getInstance(); +``` + +#### Subscribe and unsubscribe to a channel +Remove a specific callback from a channel to stop receiving notifications. + +```ts +const addTodoCallback = (data: TodoType) => { + console.log('Added new todo:', data); +}; + +const removeTodoCallback = (todoId: number) => { + console.log(`Removed todo: ${todoId}`); +}; + +const removeAllCallback = () => { + console.log('All todos deleted'); +}; + +// subscribe +pubSubInstance.subscribe('addTodo', addTodoCallback); +pubSubInstance.subscribe('removeTodo', removeTodoCallback); +pubSubInstance.subscribe('removeAll', removeAllCallback); + +// unsubscribe +pubSubInstance.unsubscribe('addTodo', addTodoCallback); +pubSubInstance.unsubscribe('removeTodo', removeTodoCallback); +pubSubInstance.unsubscribe('removeAll', removeAllCallback); + +``` + +#### Publish to a channel + +```ts +pubSubInstance.publish('addTodo', { id: 1, text: 'Some todo'}); +pubSubInstance.publish('removeTodo', 1); +pubSubInstance.publish('removeAll'); +``` + +#### Publish asynchronously +Use publishAsync to publish data and handle asynchronous subscribers. + +```ts + +pubSubInstance.subscribe('asyncMessage', async (data) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + console.log(`Async received: ${data}`); +}); + +await pubSubInstance.publishAsync('asyncMessage', 'This is asynchronous!'); +``` + +#### Reset all subscriptions +Clear all channels and their associated subscribers. + +```ts +pubSubInstance.reset(); +``` + +#### Singleton Instances +`PubSub` supports multiple named singleton instances using instanceKey. +This allows you to create isolated instances for different parts of your application. + +> Note: If instanceKey is not provided, the instance with the default name `"_default"` will be used. + +Usage example: +```ts +import PubSub from '@byndyusoft-ui/pub-sub'; + +// Get the instance with the default name "_default" +const defaultPubSub = PubSub.getInstance(); + +// Get an instance with a custom name +const customPubSub = PubSub.getInstance('custom'); + +// Instances are isolated from each other +defaultPubSub.subscribe('event', (data) => { + console.log(`Default instance: ${data}`); +}); + +customPubSub.subscribe('event', (data) => { + console.log(`Custom instance: ${data}`); +}); + +// Publish events in different instances +defaultPubSub.publish('event', 'Hello from default!'); +customPubSub.publish('event', 'Hello from custom!'); +``` + +#### Adapter for Interfaces +If you're using `interface` instead of `type`, you can use the helper type +`ChannelsRecordAdapter` to ensure compatibility with the index signature: + +```ts +import { type ChannelsRecordAdapter } from '@byndyusoft-ui/pub-sub' + +interface TodoChannels { + addTodo: (data: TodoType) => void; + removeTodo: (todoId: number) => void; + removeAll: () => void; +} + +const pubSubInstance = PubSub.getInstance>(); +``` diff --git a/services/pub-sub/package.json b/services/pub-sub/package.json new file mode 100644 index 00000000..b6ccbb9e --- /dev/null +++ b/services/pub-sub/package.json @@ -0,0 +1,34 @@ +{ + "name": "@byndyusoft-ui/pub-sub", + "version": "0.0.1", + "description": "Byndyusoft UI Service", + "keywords": [ + "byndyusoft", + "byndyusoft-ui", + "channels", + "publish", + "subscribe", + "Pub/Sub" + ], + "author": "Gleb Fomin ", + "homepage": "https://github.com/Byndyusoft/ui/tree/master/services/pub-sub#readme", + "license": "Apache-2.0", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "repository": { + "type": "git", + "url": "git+https://github.com/Byndyusoft/ui.git" + }, + "scripts": { + "build": "tsc --project tsconfig.build.json", + "clean": "rimraf dist", + "lint": "eslint src --config ../../eslint.config.js", + "test": "jest --config ../../jest.config.js --roots services/pub-sub/src" + }, + "bugs": { + "url": "https://github.com/Byndyusoft/ui/issues" + }, + "publishConfig": { + "access": "public" + } +} diff --git a/services/pub-sub/src/index.ts b/services/pub-sub/src/index.ts new file mode 100644 index 00000000..71ac867a --- /dev/null +++ b/services/pub-sub/src/index.ts @@ -0,0 +1,3 @@ +export { default } from './pubSub'; + +export type { ChannelsRecordAdapter } from './pubSub'; diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts new file mode 100644 index 00000000..caccc734 --- /dev/null +++ b/services/pub-sub/src/pubSub.tests.ts @@ -0,0 +1,87 @@ +import PubSub from './pubSub'; + +type TChannels = { + testChannel: (data?: string) => void; + asyncChannel: (data?: string) => Promise; +}; + +describe('services/pub-sub', () => { + let pubSub: PubSub; + + beforeEach(() => { + pubSub = PubSub.getInstance(); + }); + + afterEach(() => { + pubSub.reset(); + }); + + test('should create a new instance and get the same instance for the same key', () => { + const instance1 = PubSub.getInstance('instance1'); + const instance2 = PubSub.getInstance('instance1'); + const instance3 = PubSub.getInstance('instance2'); + + expect(instance1).toBe(instance2); + expect(instance1).not.toBe(instance3); + }); + + test('should subscribe and publish to a channel', () => { + const callback = jest.fn(); + pubSub.subscribe('testChannel', callback); + + pubSub.publish('testChannel', 'Hello, World!'); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith('Hello, World!'); + }); + + test('should not call callback if no subscribers', () => { + const callback = jest.fn(); + pubSub.publish('testChannel'); + + expect(callback).not.toHaveBeenCalled(); + }); + + test('should unsubscribe from a channel', () => { + const callback = jest.fn(); + pubSub.subscribe('testChannel', callback); + pubSub.unsubscribe('testChannel', callback); + + pubSub.publish('testChannel'); + + expect(callback).not.toHaveBeenCalled(); + }); + + test('should warn if no subscribers are present for a channel', () => { + console.warn = jest.fn(); + + pubSub.publish('testChannel', 'No one is listening'); + + expect(console.warn).toHaveBeenCalledWith('No subscribers for channel: testChannel'); + }); + + test('should handle async subscribe callbacks', async () => { + const asyncCallback = jest.fn().mockResolvedValue(undefined); + pubSub.subscribe('asyncChannel', asyncCallback); + + await pubSub.publishAsync('asyncChannel', 'Async data'); + + expect(asyncCallback).toHaveBeenCalledTimes(1); + expect(asyncCallback).toHaveBeenCalledWith('Async data'); + }); + + test('should reset all subscriptions', () => { + const callback1 = jest.fn(); + const callback2 = jest.fn(); + + pubSub.subscribe('testChannel', callback1); + pubSub.subscribe('testChannel', callback2); + + pubSub.reset(); + + pubSub.publish('testChannel'); + + expect(callback1).not.toHaveBeenCalled(); + expect(callback2).not.toHaveBeenCalled(); + }); +}); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts new file mode 100644 index 00000000..35a9030e --- /dev/null +++ b/services/pub-sub/src/pubSub.ts @@ -0,0 +1,108 @@ +type TDefaultChannels = Record void>; + +type TChannelMap = Map< + keyof ChannelsRecord, + Set +>; + +type TPubSubInstances = Map; + +type TChannelData = Parameters< + ChannelsRecord[ChannelKey] +>[0]; + +type ChannelsRecordAdapter = { [K in keyof T]: T[K] }; + +const DEFAULT_NAME_INSTANCE = '_default'; + +class PubSub { + private static instances: TPubSubInstances = new Map(); + private channels: TChannelMap = new Map(); + + private constructor() {} + + /** + * Getting an instance of a class. + */ + static getInstance( + instanceKey: string = DEFAULT_NAME_INSTANCE + ): PubSub { + if (!this.instances.get(instanceKey)) { + this.instances.set(instanceKey, new PubSub()); + } + + return this.instances.get(instanceKey) as PubSub; + } + + /** + * Subscribe to the channel. + */ + subscribe( + channel: ChannelKey, + callback: ChannelsRecord[ChannelKey] + ): void { + if (!this.channels.has(channel)) { + this.channels.set(channel, new Set()); + } + (this.channels.get(channel) as Set).add(callback); + } + + /** + * Unsubscribe from the channel. + */ + unsubscribe( + channel: ChannelKey, + callback: ChannelsRecord[ChannelKey] + ): void { + const channelSet = this.channels.get(channel); + if (channelSet) { + channelSet.delete(callback); + if (channelSet.size === 0) { + this.channels.delete(channel); + } + } + } + + /** + * Publishing to the channel. + */ + publish( + channel: ChannelKey, + data?: TChannelData + ): void { + const channelSet = this.channels.get(channel); + if (channelSet) { + for (const callback of channelSet) { + callback(data); + } + } else { + console.warn(`No subscribers for channel: ${channel as string}`); + } + } + + async publishAsync( + channel: ChannelKey, + data?: TChannelData + ): Promise { + const channelSet = this.channels.get(channel); + if (channelSet) { + for (const callback of channelSet) { + if (callback) { + await callback(data); + } + } + } else { + console.warn(`No subscribers for channel: ${channel as string}`); + } + } + + /** + * Reset all subscriptions. + */ + reset(): void { + this.channels.clear(); + } +} + +export type { ChannelsRecordAdapter }; +export default PubSub; diff --git a/services/pub-sub/tsconfig.build.json b/services/pub-sub/tsconfig.build.json new file mode 100644 index 00000000..b4b36060 --- /dev/null +++ b/services/pub-sub/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["src/*.tests.ts"] +} diff --git a/services/pub-sub/tsconfig.json b/services/pub-sub/tsconfig.json new file mode 100644 index 00000000..5b7870da --- /dev/null +++ b/services/pub-sub/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "declaration": true, + "declarationDir": "dist", + "outDir": "dist", + "module": "commonjs", + "target": "es6" + }, + "include": ["src"] +} From 9140df76d75d2ffa6d47aeb44d0816603e3092b8 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Mon, 27 Jan 2025 09:42:24 +0500 Subject: [PATCH 2/9] chore(pub-sub): move types to a separate file --- services/pub-sub/src/index.ts | 2 +- services/pub-sub/src/pubSub.ts | 16 +--------------- services/pub-sub/src/pubSub.types.ts | 14 ++++++++++++++ 3 files changed, 16 insertions(+), 16 deletions(-) create mode 100644 services/pub-sub/src/pubSub.types.ts diff --git a/services/pub-sub/src/index.ts b/services/pub-sub/src/index.ts index 71ac867a..ebf70700 100644 --- a/services/pub-sub/src/index.ts +++ b/services/pub-sub/src/index.ts @@ -1,3 +1,3 @@ export { default } from './pubSub'; -export type { ChannelsRecordAdapter } from './pubSub'; +export type { ChannelsRecordAdapter } from './pubSub.types'; diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index 35a9030e..a2b83178 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -1,17 +1,4 @@ -type TDefaultChannels = Record void>; - -type TChannelMap = Map< - keyof ChannelsRecord, - Set ->; - -type TPubSubInstances = Map; - -type TChannelData = Parameters< - ChannelsRecord[ChannelKey] ->[0]; - -type ChannelsRecordAdapter = { [K in keyof T]: T[K] }; +import { TChannelData, TChannelMap, TDefaultChannels, TPubSubInstances } from './pubSub.types'; const DEFAULT_NAME_INSTANCE = '_default'; @@ -104,5 +91,4 @@ class PubSub { } } -export type { ChannelsRecordAdapter }; export default PubSub; diff --git a/services/pub-sub/src/pubSub.types.ts b/services/pub-sub/src/pubSub.types.ts new file mode 100644 index 00000000..df78ebb8 --- /dev/null +++ b/services/pub-sub/src/pubSub.types.ts @@ -0,0 +1,14 @@ +export type TDefaultChannels = Record void>; + +export type TChannelMap = Map< + keyof ChannelsRecord, + Set +>; + +export type TPubSubInstances = Map; + +export type TChannelData = Parameters< + ChannelsRecord[ChannelKey] +>[0]; + +export type ChannelsRecordAdapter = { [K in keyof T]: T[K] }; From 194d918d829bf3052c4405668367add0c4c46169 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 15:02:51 +0500 Subject: [PATCH 3/9] feat(pub-sub): remove singleton and streamline class/types --- services/pub-sub/README.md | 63 +--------------------------- services/pub-sub/src/index.ts | 2 - services/pub-sub/src/pubSub.tests.ts | 15 +------ services/pub-sub/src/pubSub.ts | 42 ++++++------------- services/pub-sub/src/pubSub.types.ts | 17 ++++---- 5 files changed, 23 insertions(+), 116 deletions(-) diff --git a/services/pub-sub/README.md b/services/pub-sub/README.md index 0713649d..67ccf5e2 100644 --- a/services/pub-sub/README.md +++ b/services/pub-sub/README.md @@ -28,37 +28,23 @@ type ChannelsType = { ``` #### Create an instance -Use the `getInstance` method to create or retrieve a singleton instance of `PubSub`. - ```ts -const pubSubInstance = PubSub.getInstance(); +const pubSubInstance = new PubSub(); ``` #### Subscribe and unsubscribe to a channel -Remove a specific callback from a channel to stop receiving notifications. ```ts const addTodoCallback = (data: TodoType) => { console.log('Added new todo:', data); }; -const removeTodoCallback = (todoId: number) => { - console.log(`Removed todo: ${todoId}`); -}; - -const removeAllCallback = () => { - console.log('All todos deleted'); -}; // subscribe pubSubInstance.subscribe('addTodo', addTodoCallback); -pubSubInstance.subscribe('removeTodo', removeTodoCallback); -pubSubInstance.subscribe('removeAll', removeAllCallback); // unsubscribe pubSubInstance.unsubscribe('addTodo', addTodoCallback); -pubSubInstance.unsubscribe('removeTodo', removeTodoCallback); -pubSubInstance.unsubscribe('removeAll', removeAllCallback); ``` @@ -66,8 +52,6 @@ pubSubInstance.unsubscribe('removeAll', removeAllCallback); ```ts pubSubInstance.publish('addTodo', { id: 1, text: 'Some todo'}); -pubSubInstance.publish('removeTodo', 1); -pubSubInstance.publish('removeAll'); ``` #### Publish asynchronously @@ -90,48 +74,3 @@ Clear all channels and their associated subscribers. pubSubInstance.reset(); ``` -#### Singleton Instances -`PubSub` supports multiple named singleton instances using instanceKey. -This allows you to create isolated instances for different parts of your application. - -> Note: If instanceKey is not provided, the instance with the default name `"_default"` will be used. - -Usage example: -```ts -import PubSub from '@byndyusoft-ui/pub-sub'; - -// Get the instance with the default name "_default" -const defaultPubSub = PubSub.getInstance(); - -// Get an instance with a custom name -const customPubSub = PubSub.getInstance('custom'); - -// Instances are isolated from each other -defaultPubSub.subscribe('event', (data) => { - console.log(`Default instance: ${data}`); -}); - -customPubSub.subscribe('event', (data) => { - console.log(`Custom instance: ${data}`); -}); - -// Publish events in different instances -defaultPubSub.publish('event', 'Hello from default!'); -customPubSub.publish('event', 'Hello from custom!'); -``` - -#### Adapter for Interfaces -If you're using `interface` instead of `type`, you can use the helper type -`ChannelsRecordAdapter` to ensure compatibility with the index signature: - -```ts -import { type ChannelsRecordAdapter } from '@byndyusoft-ui/pub-sub' - -interface TodoChannels { - addTodo: (data: TodoType) => void; - removeTodo: (todoId: number) => void; - removeAll: () => void; -} - -const pubSubInstance = PubSub.getInstance>(); -``` diff --git a/services/pub-sub/src/index.ts b/services/pub-sub/src/index.ts index ebf70700..5bb500b0 100644 --- a/services/pub-sub/src/index.ts +++ b/services/pub-sub/src/index.ts @@ -1,3 +1 @@ export { default } from './pubSub'; - -export type { ChannelsRecordAdapter } from './pubSub.types'; diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index caccc734..464bd0b3 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -6,25 +6,12 @@ type TChannels = { }; describe('services/pub-sub', () => { - let pubSub: PubSub; - - beforeEach(() => { - pubSub = PubSub.getInstance(); - }); + const pubSub = new PubSub(); afterEach(() => { pubSub.reset(); }); - test('should create a new instance and get the same instance for the same key', () => { - const instance1 = PubSub.getInstance('instance1'); - const instance2 = PubSub.getInstance('instance1'); - const instance3 = PubSub.getInstance('instance2'); - - expect(instance1).toBe(instance2); - expect(instance1).not.toBe(instance3); - }); - test('should subscribe and publish to a channel', () => { const callback = jest.fn(); pubSub.subscribe('testChannel', callback); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index a2b83178..74857c6c 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -1,26 +1,8 @@ -import { TChannelData, TChannelMap, TDefaultChannels, TPubSubInstances } from './pubSub.types'; +import { TChannelData, TChannelMap, TDefaultChannels } from './pubSub.types'; -const DEFAULT_NAME_INSTANCE = '_default'; - -class PubSub { - private static instances: TPubSubInstances = new Map(); +class PubSub> { private channels: TChannelMap = new Map(); - private constructor() {} - - /** - * Getting an instance of a class. - */ - static getInstance( - instanceKey: string = DEFAULT_NAME_INSTANCE - ): PubSub { - if (!this.instances.get(instanceKey)) { - this.instances.set(instanceKey, new PubSub()); - } - - return this.instances.get(instanceKey) as PubSub; - } - /** * Subscribe to the channel. */ @@ -63,7 +45,7 @@ class PubSub { callback(data); } } else { - console.warn(`No subscribers for channel: ${channel as string}`); + console.warn(`No subscribers for channel: ${String(channel)}`); } } @@ -72,15 +54,15 @@ class PubSub { data?: TChannelData ): Promise { const channelSet = this.channels.get(channel); - if (channelSet) { - for (const callback of channelSet) { - if (callback) { - await callback(data); - } - } - } else { - console.warn(`No subscribers for channel: ${channel as string}`); + + if (!channelSet) { + console.warn(`No subscribers for channel: ${String(channel)}`); + return; } + + const promises = Array.from(channelSet).map(callback => Promise.resolve(callback(data))); + + await Promise.all(promises); } /** @@ -92,3 +74,5 @@ class PubSub { } export default PubSub; + +const instance = new PubSub(); diff --git a/services/pub-sub/src/pubSub.types.ts b/services/pub-sub/src/pubSub.types.ts index df78ebb8..f77fbb40 100644 --- a/services/pub-sub/src/pubSub.types.ts +++ b/services/pub-sub/src/pubSub.types.ts @@ -1,14 +1,13 @@ -export type TDefaultChannels = Record void>; +type TChannelHandler = (data?: any) => void; -export type TChannelMap = Map< +export type TDefaultChannels = { [K in keyof ChannelsRecord]: TChannelHandler }; + +export type TChannelMap> = Map< keyof ChannelsRecord, Set >; -export type TPubSubInstances = Map; - -export type TChannelData = Parameters< - ChannelsRecord[ChannelKey] ->[0]; - -export type ChannelsRecordAdapter = { [K in keyof T]: T[K] }; +export type TChannelData< + ChannelsRecord extends TDefaultChannels, + ChannelKey extends keyof ChannelsRecord +> = Parameters[0]; From 73fb12d0ea57df4c1f5a3a732a0c0571c0535c14 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 15:08:48 +0500 Subject: [PATCH 4/9] feat(pub-sub): added `unsubscribeAll` method --- services/pub-sub/src/pubSub.tests.ts | 16 ++++++++++++++++ services/pub-sub/src/pubSub.ts | 13 +++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index 464bd0b3..4954fb2b 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -71,4 +71,20 @@ describe('services/pub-sub', () => { expect(callback1).not.toHaveBeenCalled(); expect(callback2).not.toHaveBeenCalled(); }); + + test('should unsubscribe all callbacks for all channels using unsubscribeAll', () => { + const callback1 = jest.fn(); + const callback2 = jest.fn(); + + pubSub.subscribe('testChannel', callback1); + pubSub.subscribe('asyncChannel', callback2); + + pubSub.unsubscribeAll(); + + pubSub.publish('testChannel', 'Test data'); + pubSub.publish('asyncChannel', 'Test data'); + + expect(callback1).not.toHaveBeenCalled(); + expect(callback2).not.toHaveBeenCalled(); + }); }); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index 74857c6c..4d78e7ff 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -32,6 +32,17 @@ class PubSub> { } } + /** + * Unsubscribe all callbacks for a specific channel or all channels. + */ + unsubscribeAll = (channel?: ChannelKey): void => { + if (channel) { + this.channels.delete(channel); + } else { + this.channels.clear(); + } + }; + /** * Publishing to the channel. */ @@ -74,5 +85,3 @@ class PubSub> { } export default PubSub; - -const instance = new PubSub(); From 130b64a4b0d68c18826c8c6a22bfed46975c0680 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 15:33:44 +0500 Subject: [PATCH 5/9] feat(pub-sub): added `subscribeOnce` method --- services/pub-sub/src/pubSub.tests.ts | 14 ++++++++++++++ services/pub-sub/src/pubSub.ts | 15 +++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index 4954fb2b..cf046bef 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -87,4 +87,18 @@ describe('services/pub-sub', () => { expect(callback1).not.toHaveBeenCalled(); expect(callback2).not.toHaveBeenCalled(); }); + + test('should call subscribeOnce callback only once', () => { + const callback = jest.fn(); + pubSub.subscribeOnce('testChannel', callback); + + // First publish should trigger the callback. + pubSub.publish('testChannel', 'Test message 1'); + + // Subsequent publish should not trigger the callback. + pubSub.publish('testChannel', 'Test message 2'); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith('Test message 1'); + }); }); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index 4d78e7ff..9be968b0 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -32,6 +32,21 @@ class PubSub> { } } + /** + * After the first execution, the callback is automatically unsubscribed. + */ + subscribeOnce( + channel: ChannelKey, + callback: ChannelsRecord[ChannelKey] + ): void { + const onceCallback: ChannelsRecord[ChannelKey] = ((data?: TChannelData) => { + this.unsubscribe(channel, onceCallback); + return callback(data); + }) as ChannelsRecord[ChannelKey]; + + this.subscribe(channel, onceCallback); + } + /** * Unsubscribe all callbacks for a specific channel or all channels. */ From d8e72d2874db4d6c71d41521e8556583125dd017 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 16:20:43 +0500 Subject: [PATCH 6/9] feat(pub-sub): added `allSubscribes` method --- services/pub-sub/src/pubSub.tests.ts | 20 ++++++++++++++++++++ services/pub-sub/src/pubSub.ts | 12 +++++++++++- services/pub-sub/src/pubSub.types.ts | 5 +++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index cf046bef..e9ffc1b8 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -101,4 +101,24 @@ describe('services/pub-sub', () => { expect(callback).toHaveBeenCalledTimes(1); expect(callback).toHaveBeenCalledWith('Test message 1'); }); + + test('should return all subscriptions info', () => { + const callback1 = jest.fn(); + const callback2 = jest.fn(); + + pubSub.subscribe('testChannel', callback1); + pubSub.subscribe('testChannel', callback2); + pubSub.subscribe('asyncChannel', callback1); + + const result = pubSub.allSubscribes(); + + const testChannelInfo = result.find(item => item.channel === 'testChannel'); + const asyncChannelInfo = result.find(item => item.channel === 'asyncChannel'); + + expect(testChannelInfo).toBeDefined(); + expect(testChannelInfo!.subscribers).toBe(2); + + expect(asyncChannelInfo).toBeDefined(); + expect(asyncChannelInfo!.subscribers).toBe(1); + }); }); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index 9be968b0..8f2db3e1 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -1,4 +1,4 @@ -import { TChannelData, TChannelMap, TDefaultChannels } from './pubSub.types'; +import { TAllSubscribesResult, TChannelData, TChannelMap, TDefaultChannels } from './pubSub.types'; class PubSub> { private channels: TChannelMap = new Map(); @@ -91,6 +91,16 @@ class PubSub> { await Promise.all(promises); } + /** + * Returns an array containing information about all current subscriptions. + */ + allSubscribes(): TAllSubscribesResult { + return Array.from(this.channels.entries()).map(([channel, subscribers]) => ({ + channel, + subscribers: subscribers.size + })); + } + /** * Reset all subscriptions. */ diff --git a/services/pub-sub/src/pubSub.types.ts b/services/pub-sub/src/pubSub.types.ts index f77fbb40..e1cba6f1 100644 --- a/services/pub-sub/src/pubSub.types.ts +++ b/services/pub-sub/src/pubSub.types.ts @@ -11,3 +11,8 @@ export type TChannelData< ChannelsRecord extends TDefaultChannels, ChannelKey extends keyof ChannelsRecord > = Parameters[0]; + +export type TAllSubscribesResult = Array<{ + channel: keyof ChannelsRecord; + subscribers: number; +}>; From c04f8adaeee52dca55c925119fd065a8ec7d0832 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 16:26:24 +0500 Subject: [PATCH 7/9] chore(pub-sub): tests --- services/pub-sub/src/pubSub.tests.ts | 12 +++++++----- services/pub-sub/src/pubSub.types.ts | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index e9ffc1b8..82a839b2 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -1,12 +1,12 @@ import PubSub from './pubSub'; -type TChannels = { +interface IChannels { testChannel: (data?: string) => void; asyncChannel: (data?: string) => Promise; -}; +} describe('services/pub-sub', () => { - const pubSub = new PubSub(); + const pubSub = new PubSub(); afterEach(() => { pubSub.reset(); @@ -116,9 +116,11 @@ describe('services/pub-sub', () => { const asyncChannelInfo = result.find(item => item.channel === 'asyncChannel'); expect(testChannelInfo).toBeDefined(); - expect(testChannelInfo!.subscribers).toBe(2); + + expect(testChannelInfo?.subscribers).toBe(2); expect(asyncChannelInfo).toBeDefined(); - expect(asyncChannelInfo!.subscribers).toBe(1); + + expect(asyncChannelInfo?.subscribers).toBe(1); }); }); diff --git a/services/pub-sub/src/pubSub.types.ts b/services/pub-sub/src/pubSub.types.ts index e1cba6f1..02011afe 100644 --- a/services/pub-sub/src/pubSub.types.ts +++ b/services/pub-sub/src/pubSub.types.ts @@ -1,3 +1,4 @@ +// eslint-disable-next-line @typescript-eslint/no-explicit-any type TChannelHandler = (data?: any) => void; export type TDefaultChannels = { [K in keyof ChannelsRecord]: TChannelHandler }; From 9304573804c2d12d9e3564cce404f15e16eed020 Mon Sep 17 00:00:00 2001 From: Gleb Fomin Date: Fri, 21 Feb 2025 16:34:41 +0500 Subject: [PATCH 8/9] chore(pub-sub): updated readme --- services/pub-sub/README.md | 55 +++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/services/pub-sub/README.md b/services/pub-sub/README.md index 67ccf5e2..510ba67f 100644 --- a/services/pub-sub/README.md +++ b/services/pub-sub/README.md @@ -24,6 +24,8 @@ type ChannelsType = { addTodo: (data: TodoType) => void; removeTodo: (todoId: number) => void; removeAll: () => void; + // For async callbacks: + asyncMessage: (data: string) => Promise; }; ``` @@ -32,28 +34,58 @@ type ChannelsType = { const pubSubInstance = new PubSub(); ``` -#### Subscribe and unsubscribe to a channel - +#### Subscribe & Unsubscribe +Basic Subscription ```ts const addTodoCallback = (data: TodoType) => { console.log('Added new todo:', data); }; - // subscribe pubSubInstance.subscribe('addTodo', addTodoCallback); // unsubscribe pubSubInstance.unsubscribe('addTodo', addTodoCallback); +``` + +#### One-Time Subscription +Use `subscribeOnce` to subscribe to an event that should be handled only once: +```ts +pubSubInstance.subscribeOnce('addTodo', (data) => { + console.log('This callback will only be executed once:', data); +}); ``` -#### Publish to a channel +#### Unsubscribe All +Remove all callbacks from a specific channel or from all channels: + +```ts +// Unsubscribe all from a specific channel +pubSubInstance.unsubscribeAll('addTodo'); + +// Unsubscribe all from all channels +pubSubInstance.unsubscribeAll(); +``` + +#### Publish Events + +Synchronous Publish ```ts pubSubInstance.publish('addTodo', { id: 1, text: 'Some todo'}); ``` +Asynchronous Publish +Use `publishAsync` to publish data and wait for asynchronous subscribers: + +```ts +pubSubInstance.subscribe('asyncMessage', async (data) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + console.log(`Async received: ${data}`); +}); +``` + #### Publish asynchronously Use publishAsync to publish data and handle asynchronous subscribers. @@ -67,8 +99,19 @@ pubSubInstance.subscribe('asyncMessage', async (data) => { await pubSubInstance.publishAsync('asyncMessage', 'This is asynchronous!'); ``` -#### Reset all subscriptions -Clear all channels and their associated subscribers. + +#### Get All Subscriptions +For debugging or monitoring, you can retrieve current subscriptions: + +```ts +const subscriptions = pubSubInstance.allSubscribes(); +console.log(subscriptions); +// Output example: +// [ { channel: 'addTodo', subscribers: 2 }, { channel: 'asyncMessage', subscribers: 1 } ] +``` + +#### Reset Subscriptions +Clear all channels and their subscribers: ```ts pubSubInstance.reset(); From 87f20f4e9b3aec9364f3c542d20b48ff4c137800 Mon Sep 17 00:00:00 2001 From: Nikolai Kuznetsov Date: Tue, 4 Mar 2025 18:36:55 +0500 Subject: [PATCH 9/9] =?UTF-8?q?=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3=20PubSub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 8 ++ services/pub-sub/src/pubSub.tests.ts | 25 ++--- services/pub-sub/src/pubSub.ts | 134 +++++++++------------------ services/pub-sub/src/pubSub.types.ts | 19 ---- 4 files changed, 58 insertions(+), 128 deletions(-) delete mode 100644 services/pub-sub/src/pubSub.types.ts diff --git a/package-lock.json b/package-lock.json index 6852563a..657956e2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2319,6 +2319,10 @@ "resolved": "components/portal", "link": true }, + "node_modules/@byndyusoft-ui/pub-sub": { + "resolved": "services/pub-sub", + "link": true + }, "node_modules/@byndyusoft-ui/reset-css": { "resolved": "styles/reset-css", "link": true @@ -23675,6 +23679,10 @@ "version": "0.3.0", "license": "Apache-2.0" }, + "services/pub-sub": { + "version": "0.0.1", + "license": "Apache-2.0" + }, "styles/keyframes-css": { "name": "@byndyusoft-ui/keyframes-css", "version": "0.0.1", diff --git a/services/pub-sub/src/pubSub.tests.ts b/services/pub-sub/src/pubSub.tests.ts index 82a839b2..92450f2d 100644 --- a/services/pub-sub/src/pubSub.tests.ts +++ b/services/pub-sub/src/pubSub.tests.ts @@ -1,12 +1,7 @@ import PubSub from './pubSub'; -interface IChannels { - testChannel: (data?: string) => void; - asyncChannel: (data?: string) => Promise; -} - describe('services/pub-sub', () => { - const pubSub = new PubSub(); + const pubSub = new PubSub(); afterEach(() => { pubSub.reset(); @@ -39,14 +34,6 @@ describe('services/pub-sub', () => { expect(callback).not.toHaveBeenCalled(); }); - test('should warn if no subscribers are present for a channel', () => { - console.warn = jest.fn(); - - pubSub.publish('testChannel', 'No one is listening'); - - expect(console.warn).toHaveBeenCalledWith('No subscribers for channel: testChannel'); - }); - test('should handle async subscribe callbacks', async () => { const asyncCallback = jest.fn().mockResolvedValue(undefined); pubSub.subscribe('asyncChannel', asyncCallback); @@ -110,17 +97,17 @@ describe('services/pub-sub', () => { pubSub.subscribe('testChannel', callback2); pubSub.subscribe('asyncChannel', callback1); - const result = pubSub.allSubscribes(); + const result = pubSub.getAllSubscribers(); - const testChannelInfo = result.find(item => item.channel === 'testChannel'); - const asyncChannelInfo = result.find(item => item.channel === 'asyncChannel'); + const testChannelInfo = result.find(item => item.event === 'testChannel'); + const asyncChannelInfo = result.find(item => item.event === 'asyncChannel'); expect(testChannelInfo).toBeDefined(); - expect(testChannelInfo?.subscribers).toBe(2); + expect(testChannelInfo?.subscribers.length).toBe(2); expect(asyncChannelInfo).toBeDefined(); - expect(asyncChannelInfo?.subscribers).toBe(1); + expect(asyncChannelInfo?.subscribers.length).toBe(1); }); }); diff --git a/services/pub-sub/src/pubSub.ts b/services/pub-sub/src/pubSub.ts index 8f2db3e1..e839bd2f 100644 --- a/services/pub-sub/src/pubSub.ts +++ b/services/pub-sub/src/pubSub.ts @@ -1,112 +1,66 @@ -import { TAllSubscribesResult, TChannelData, TChannelMap, TDefaultChannels } from './pubSub.types'; +type Callback = (data: unknown) => void; -class PubSub> { - private channels: TChannelMap = new Map(); +export default class PubSub { + private events: Map> = new Map(); - /** - * Subscribe to the channel. - */ - subscribe( - channel: ChannelKey, - callback: ChannelsRecord[ChannelKey] - ): void { - if (!this.channels.has(channel)) { - this.channels.set(channel, new Set()); + subscribe(event: string, callback: Callback): void { + if (!this.events.has(event)) { + this.events.set(event, new Set()); } - (this.channels.get(channel) as Set).add(callback); + this.events.get(event)!.add(callback); } - /** - * Unsubscribe from the channel. - */ - unsubscribe( - channel: ChannelKey, - callback: ChannelsRecord[ChannelKey] - ): void { - const channelSet = this.channels.get(channel); - if (channelSet) { - channelSet.delete(callback); - if (channelSet.size === 0) { - this.channels.delete(channel); - } - } + subscribeOnce(event: string, callback: Callback): void { + const onceCallback: Callback = (data: unknown) => { + callback(data); // Execute the callback + this.unsubscribe(event, onceCallback); // Unsubscribe after execution + }; + this.subscribe(event, onceCallback); } - /** - * After the first execution, the callback is automatically unsubscribed. - */ - subscribeOnce( - channel: ChannelKey, - callback: ChannelsRecord[ChannelKey] - ): void { - const onceCallback: ChannelsRecord[ChannelKey] = ((data?: TChannelData) => { - this.unsubscribe(channel, onceCallback); - return callback(data); - }) as ChannelsRecord[ChannelKey]; - - this.subscribe(channel, onceCallback); + publish(event: string, data: unknown = null): void { + if (this.events.has(event)) { + this.events.get(event)!.forEach(callback => callback(data)); + } } - /** - * Unsubscribe all callbacks for a specific channel or all channels. - */ - unsubscribeAll = (channel?: ChannelKey): void => { - if (channel) { - this.channels.delete(channel); - } else { - this.channels.clear(); + async publishAsync(event: string, data: unknown = null): Promise { + if (this.events.has(event)) { + const callbacks = Array.from(this.events.get(event)!); + // Execute all callbacks concurrently + await Promise.all(callbacks.map(callback => callback(data))); } - }; + } - /** - * Publishing to the channel. - */ - publish( - channel: ChannelKey, - data?: TChannelData - ): void { - const channelSet = this.channels.get(channel); - if (channelSet) { - for (const callback of channelSet) { - callback(data); + unsubscribe(event: string, callback: Callback): void { + if (this.events.has(event)) { + const callbacks = this.events.get(event)!; + callbacks.delete(callback); + + // Clean up the event if no callbacks are left + if (callbacks.size === 0) { + this.events.delete(event); } - } else { - console.warn(`No subscribers for channel: ${String(channel)}`); } } - async publishAsync( - channel: ChannelKey, - data?: TChannelData - ): Promise { - const channelSet = this.channels.get(channel); - - if (!channelSet) { - console.warn(`No subscribers for channel: ${String(channel)}`); - return; + unsubscribeAll(event?: string): void { + if (event) { + if (this.events.has(event)) { + this.events.delete(event); + } + } else { + this.events.clear(); } - - const promises = Array.from(channelSet).map(callback => Promise.resolve(callback(data))); - - await Promise.all(promises); } - /** - * Returns an array containing information about all current subscriptions. - */ - allSubscribes(): TAllSubscribesResult { - return Array.from(this.channels.entries()).map(([channel, subscribers]) => ({ - channel, - subscribers: subscribers.size - })); + reset(): void { + this.events.clear(); } - /** - * Reset all subscriptions. - */ - reset(): void { - this.channels.clear(); + getAllSubscribers(): { event: string; subscribers: Callback[] }[] { + return Array.from(this.events.entries()).map(([event, callbacks]) => { + return { event, subscribers: Array.from(callbacks) }; + }); } } - -export default PubSub; diff --git a/services/pub-sub/src/pubSub.types.ts b/services/pub-sub/src/pubSub.types.ts deleted file mode 100644 index 02011afe..00000000 --- a/services/pub-sub/src/pubSub.types.ts +++ /dev/null @@ -1,19 +0,0 @@ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type TChannelHandler = (data?: any) => void; - -export type TDefaultChannels = { [K in keyof ChannelsRecord]: TChannelHandler }; - -export type TChannelMap> = Map< - keyof ChannelsRecord, - Set ->; - -export type TChannelData< - ChannelsRecord extends TDefaultChannels, - ChannelKey extends keyof ChannelsRecord -> = Parameters[0]; - -export type TAllSubscribesResult = Array<{ - channel: keyof ChannelsRecord; - subscribers: number; -}>;