diff --git a/docs/api/persistent-subscriptions.md b/docs/api/persistent-subscriptions.md index 5d3ce6e2..cf1bbe2f 100644 --- a/docs/api/persistent-subscriptions.md +++ b/docs/api/persistent-subscriptions.md @@ -203,6 +203,27 @@ The main aim of this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. This is **not a guarantee**, and you should handle the usual ordering and concurrency issues. +### PinnedByCorrelation + +The PinnedByCorrelation strategy is a consumer strategy available for persistent subscriptions +It ensures that events with the same correlation id are consistently delivered to the same +consumer within a subscription group. + +:::note +This strategy requires database version 21.10.1 or later. You can only create a persistent subscription +with this strategy. To change the strategy, you must delete the existing subscription and create a +new one with the desired settings. +::: + +## Updating a subscription group + +You can edit the settings of an existing subscription group while it is running, +you don't need to delete and recreate it to change settings. When you update the +subscription group, it resets itself internally, dropping the connections and +having them reconnect. You must have admin permissions to update a persistent +subscription group. + + ## Updating a subscription group You can edit the settings of an existing subscription group while it is running, diff --git a/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToAll.ts b/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToAll.ts index 5839b4b9..c80cf5ba 100644 --- a/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToAll.ts +++ b/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToAll.ts @@ -1,5 +1,6 @@ import { Empty } from "../../generated/kurrentdb/protocols/v1/shared_pb"; import { CreateReq } from "../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb"; +import semver from "semver"; import { PersistentSubscriptionsClient, PersistentSubscriptionsService, @@ -8,7 +9,14 @@ import { import type { BaseOptions, Filter } from "../types"; import { debug, convertToCommandError, UnsupportedError } from "../utils"; import { Client } from "../Client"; -import { END, EVENT_TYPE, START, STREAM_NAME } from "../constants"; +import { + END, + EVENT_TYPE, + PINNED_BY_CORRELATION, + ROUND_ROBIN, + START, + STREAM_NAME, +} from "../constants"; import { settingsToGRPC } from "./utils/settingsToGRPC"; import type { PersistentSubscriptionToAllSettings } from "./utils/persistentSubscriptionSettings"; @@ -46,20 +54,26 @@ Client.prototype.createPersistentSubscriptionToAll = async function ( settings: PersistentSubscriptionToAllSettings, { filter, ...baseOptions }: CreatePersistentSubscriptionToAllOptions = {} ): Promise { - const capabilities = await this.capabilities; + const { serverVersion, supports } = await this.capabilities; - if (!capabilities.supports(PersistentSubscriptionsService.create, "all")) { + if (!supports(PersistentSubscriptionsService.create, "all")) { throw new UnsupportedError("createPersistentSubscriptionToAll", "21.10"); } + if ( + semver.lt(serverVersion, "21.10.1") && + settings.consumerStrategyName === PINNED_BY_CORRELATION + ) { + console.warn( + `Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.` + ); + settings.consumerStrategyName = ROUND_ROBIN; + } + const req = new CreateReq(); const options = new CreateReq.Options(); const allOptions = new CreateReq.AllOptions(); - const reqSettings = settingsToGRPC( - settings, - CreateReq.Settings, - capabilities - ); + const reqSettings = settingsToGRPC(settings, CreateReq.Settings); switch (settings.startFrom) { case START: { diff --git a/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToStream.ts b/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToStream.ts index 349be5cb..ac4b24bd 100644 --- a/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToStream.ts +++ b/packages/db-client/src/persistentSubscription/createPersistentSubscriptionToStream.ts @@ -4,7 +4,8 @@ import { PersistentSubscriptionsClient } from "../../generated/kurrentdb/protoco import type { BaseOptions } from "../types"; import { debug, convertToCommandError, createStreamIdentifier } from "../utils"; import { Client } from "../Client"; -import { END, START } from "../constants"; +import { END, PINNED_BY_CORRELATION, ROUND_ROBIN, START } from "../constants"; +import semver from "semver"; import { settingsToGRPC } from "./utils/settingsToGRPC"; import type { PersistentSubscriptionToStreamSettings } from "./utils/persistentSubscriptionSettings"; @@ -37,16 +38,22 @@ Client.prototype.createPersistentSubscriptionToStream = async function ( settings: PersistentSubscriptionToStreamSettings, baseOptions: BaseOptions = {} ): Promise { - const capabilities = await this.capabilities; + const { serverVersion } = await this.capabilities; + + if ( + semver.lt(serverVersion, "21.10.1") && + settings.consumerStrategyName === PINNED_BY_CORRELATION + ) { + console.warn( + `Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.` + ); + settings.consumerStrategyName = ROUND_ROBIN; + } const req = new CreateReq(); const options = new CreateReq.Options(); const identifier = createStreamIdentifier(streamName); - const reqSettings = settingsToGRPC( - settings, - CreateReq.Settings, - capabilities - ); + const reqSettings = settingsToGRPC(settings, CreateReq.Settings); // Add deprecated revision option for pre-21.10 support switch (settings.startFrom) { diff --git a/packages/db-client/src/persistentSubscription/utils/settingsToGRPC.ts b/packages/db-client/src/persistentSubscription/utils/settingsToGRPC.ts index c22afb22..2b8b1235 100644 --- a/packages/db-client/src/persistentSubscription/utils/settingsToGRPC.ts +++ b/packages/db-client/src/persistentSubscription/utils/settingsToGRPC.ts @@ -2,8 +2,6 @@ import { CreateReq, UpdateReq, } from "../../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb"; -import { ServerFeatures } from "../../Client/ServerFeatures"; -import semver from "semver"; import { DISPATCH_TO_SINGLE, @@ -17,7 +15,6 @@ import type { PersistentSubscriptionToStreamSettings, PersistentSubscriptionToAllSettings, } from "./persistentSubscriptionSettings"; -import { UnsupportedError } from "../../utils"; type GRPCSettings = typeof CreateReq.Settings | typeof UpdateReq.Settings; @@ -25,8 +22,7 @@ export const settingsToGRPC = ( settings: | PersistentSubscriptionToStreamSettings | PersistentSubscriptionToAllSettings, - ReqSettings: T, - capabilities?: ServerFeatures + ReqSettings: T ): InstanceType => { const reqSettings = new ReqSettings() as InstanceType; @@ -53,40 +49,37 @@ export const settingsToGRPC = ( reqSettings.setReadBatchSize(settings.readBatchSize); reqSettings.setHistoryBufferSize(settings.historyBufferSize); - if ( - capabilities && - semver.satisfies(capabilities.serverVersion, ">=21.10.1") && - reqSettings instanceof CreateReq.Settings - ) { - reqSettings.setConsumerStrategy(settings.consumerStrategyName); - } else { - switch (settings.consumerStrategyName) { - case DISPATCH_TO_SINGLE: { - reqSettings.setNamedConsumerStrategy( - CreateReq.ConsumerStrategy.DISPATCHTOSINGLE - ); - break; - } - case PINNED: { - reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED); - break; - } - case ROUND_ROBIN: { - reqSettings.setNamedConsumerStrategy( - CreateReq.ConsumerStrategy.ROUNDROBIN - ); - break; - } - case PINNED_BY_CORRELATION: { - throw new UnsupportedError(PINNED_BY_CORRELATION, "21.10.1"); - } - default: { - console.warn( - `Unknown consumerStrategyName ${settings.consumerStrategyName}.` - ); + switch (settings.consumerStrategyName) { + case DISPATCH_TO_SINGLE: { + reqSettings.setNamedConsumerStrategy( + CreateReq.ConsumerStrategy.DISPATCHTOSINGLE + ); + break; + } + case PINNED: { + reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED); + break; + } + case ROUND_ROBIN: { + reqSettings.setNamedConsumerStrategy( + CreateReq.ConsumerStrategy.ROUNDROBIN + ); + break; + } + case PINNED_BY_CORRELATION: { + if (reqSettings instanceof CreateReq.Settings) { + reqSettings.setConsumerStrategy(settings.consumerStrategyName); break; + } else { + throw new Error("'PinnedByCorrelation' is not supported for updates."); } } + default: { + console.warn( + `Unknown consumerStrategyName ${settings.consumerStrategyName}.` + ); + break; + } } return reqSettings; diff --git a/packages/test/src/persistentSubscription/createPersistentSubscriptionToAll.test.ts b/packages/test/src/persistentSubscription/createPersistentSubscriptionToAll.test.ts index dd4ac544..1ea3941b 100644 --- a/packages/test/src/persistentSubscription/createPersistentSubscriptionToAll.test.ts +++ b/packages/test/src/persistentSubscription/createPersistentSubscriptionToAll.test.ts @@ -15,6 +15,9 @@ import { START, UnsupportedError, PINNED_BY_CORRELATION, + ROUND_ROBIN, + PINNED, + DISPATCH_TO_SINGLE, } from "@kurrent/kurrentdb-client"; describe("createPersistentSubscriptionToAll", () => { @@ -88,41 +91,29 @@ describe("createPersistentSubscriptionToAll", () => { ).resolves.toBeUndefined(); }); - test("valid consumer strategy", async () => { - const GROUP_NAME = "group_name_valid_consumer_strategy"; + test.each([ + PINNED_BY_CORRELATION, + PINNED, + ROUND_ROBIN, + DISPATCH_TO_SINGLE, + ])("consumer strategy: %s", async (strategy) => { + const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`; await expect( client.createPersistentSubscriptionToAll( GROUP_NAME, persistentSubscriptionToAllSettingsFromDefaults({ - consumerStrategyName: PINNED_BY_CORRELATION, + consumerStrategyName: strategy, }) ) ).resolves.toBeUndefined(); - let persistentSubscriptions = - await client.listAllPersistentSubscriptions(); + let persistentSubscription = + await client.getPersistentSubscriptionToAllInfo(GROUP_NAME); - persistentSubscriptions = persistentSubscriptions.filter( - (ps) => ps.groupName === GROUP_NAME + expect(persistentSubscription.groupName).toBe(GROUP_NAME); + expect(persistentSubscription.settings.consumerStrategyName).toBe( + strategy ); - - expect(persistentSubscriptions).toHaveLength(1); - expect(persistentSubscriptions[0].groupName).toBe(GROUP_NAME); - expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe( - PINNED_BY_CORRELATION - ); - }); - - test("invalid consumer strategy", async () => { - const GROUP_NAME = "group_name_invalid_consumer_strategy"; - await expect( - client.createPersistentSubscriptionToAll( - GROUP_NAME, - persistentSubscriptionToAllSettingsFromDefaults({ - consumerStrategyName: "strategy_does_not_exists", - }) - ) - ).rejects.toThrow(Error); }); test("with a filter", async () => { diff --git a/packages/test/src/persistentSubscription/createPersistentSubscriptionToStream.test.ts b/packages/test/src/persistentSubscription/createPersistentSubscriptionToStream.test.ts index cce143ce..55891326 100644 --- a/packages/test/src/persistentSubscription/createPersistentSubscriptionToStream.test.ts +++ b/packages/test/src/persistentSubscription/createPersistentSubscriptionToStream.test.ts @@ -1,10 +1,13 @@ import { createTestNode } from "@test-utils"; import { + DISPATCH_TO_SINGLE, KurrentDBClient, PersistentSubscriptionExistsError, persistentSubscriptionToStreamSettingsFromDefaults, + PINNED, PINNED_BY_CORRELATION, + ROUND_ROBIN, START, } from "@kurrent/kurrentdb-client"; @@ -63,46 +66,33 @@ describe("createPersistentSubscriptionToStream", () => { ).resolves.toBeUndefined(); }); - test.only("valid consumer strategy", async () => { - const STREAM_NAME = "stream_name_from_revision"; - const GROUP_NAME = "group_name_valid_consumer_strategy"; - await expect( - client.createPersistentSubscriptionToStream( - STREAM_NAME, - GROUP_NAME, - persistentSubscriptionToStreamSettingsFromDefaults({ - consumerStrategyName: PINNED_BY_CORRELATION, - }) - ) - ).resolves.toBeUndefined(); - - let persistentSubscriptions = - await client.listAllPersistentSubscriptions(); - - persistentSubscriptions = persistentSubscriptions.filter( - (ps) => ps.groupName === GROUP_NAME && ps.eventSource === STREAM_NAME - ); - - expect(persistentSubscriptions).toHaveLength(1); - expect(persistentSubscriptions[0].eventSource).toBe(STREAM_NAME); - expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe( - PINNED_BY_CORRELATION - ); - }); - - test("invalid consumer strategy", async () => { - const STREAM_NAME = "stream_name_from_revision"; - const GROUP_NAME = "group_name_invalid_consumer_strategy"; - await expect( - client.createPersistentSubscriptionToStream( - STREAM_NAME, - GROUP_NAME, - persistentSubscriptionToStreamSettingsFromDefaults({ - consumerStrategyName: "strategy_does_not_exists", - }) - ) - ).rejects.toThrow(Error); - }); + test.each([PINNED_BY_CORRELATION, PINNED, ROUND_ROBIN, DISPATCH_TO_SINGLE])( + "consumer strategy: %s", + async (strategy) => { + const STREAM_NAME = `stream_name_from_revision_${strategy.toLowerCase()}`; + const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`; + await expect( + client.createPersistentSubscriptionToStream( + STREAM_NAME, + GROUP_NAME, + persistentSubscriptionToStreamSettingsFromDefaults({ + consumerStrategyName: strategy, + }) + ) + ).resolves.toBeUndefined(); + + let persistentSubscription = + await client.getPersistentSubscriptionToStreamInfo( + STREAM_NAME, + GROUP_NAME + ); + + expect(persistentSubscription.eventSource).toBe(STREAM_NAME); + expect(persistentSubscription.settings.consumerStrategyName).toBe( + strategy + ); + } + ); }); test("should throw an error if subscription exists", async () => {