Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/api/persistent-subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,27 @@ The main aim of this strategy is to decrease the likelihood of concurrency and
ordering issues while maintaining load balancing. This is **not a guarantee**,
and you should handle the usual ordering and concurrency issues.

### PinnedByCorrelation

The PinnedByCorrelation strategy is a consumer strategy available for persistent subscriptions
It ensures that events with the same correlation id are consistently delivered to the same
consumer within a subscription group.

:::note
This strategy requires database version 21.10.1 or later. You can only create a persistent subscription
with this strategy. To change the strategy, you must delete the existing subscription and create a
new one with the desired settings.
:::

## Updating a subscription group

You can edit the settings of an existing subscription group while it is running,
you don't need to delete and recreate it to change settings. When you update the
subscription group, it resets itself internally, dropping the connections and
having them reconnect. You must have admin permissions to update a persistent
subscription group.


## Updating a subscription group

You can edit the settings of an existing subscription group while it is running,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Empty } from "../../generated/kurrentdb/protocols/v1/shared_pb";
import { CreateReq } from "../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb";
import semver from "semver";
import {
PersistentSubscriptionsClient,
PersistentSubscriptionsService,
Expand All @@ -8,7 +9,14 @@ import {
import type { BaseOptions, Filter } from "../types";
import { debug, convertToCommandError, UnsupportedError } from "../utils";
import { Client } from "../Client";
import { END, EVENT_TYPE, START, STREAM_NAME } from "../constants";
import {
END,
EVENT_TYPE,
PINNED_BY_CORRELATION,
ROUND_ROBIN,
START,
STREAM_NAME,
} from "../constants";

import { settingsToGRPC } from "./utils/settingsToGRPC";
import type { PersistentSubscriptionToAllSettings } from "./utils/persistentSubscriptionSettings";
Expand Down Expand Up @@ -46,20 +54,26 @@ Client.prototype.createPersistentSubscriptionToAll = async function (
settings: PersistentSubscriptionToAllSettings,
{ filter, ...baseOptions }: CreatePersistentSubscriptionToAllOptions = {}
): Promise<void> {
const capabilities = await this.capabilities;
const { serverVersion, supports } = await this.capabilities;

if (!capabilities.supports(PersistentSubscriptionsService.create, "all")) {
if (!supports(PersistentSubscriptionsService.create, "all")) {
throw new UnsupportedError("createPersistentSubscriptionToAll", "21.10");
}

if (
semver.lt(serverVersion, "21.10.1") &&
settings.consumerStrategyName === PINNED_BY_CORRELATION
) {
console.warn(
`Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.`
);
settings.consumerStrategyName = ROUND_ROBIN;
}

const req = new CreateReq();
const options = new CreateReq.Options();
const allOptions = new CreateReq.AllOptions();
const reqSettings = settingsToGRPC(
settings,
CreateReq.Settings,
capabilities
);
const reqSettings = settingsToGRPC(settings, CreateReq.Settings);

switch (settings.startFrom) {
case START: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { PersistentSubscriptionsClient } from "../../generated/kurrentdb/protoco
import type { BaseOptions } from "../types";
import { debug, convertToCommandError, createStreamIdentifier } from "../utils";
import { Client } from "../Client";
import { END, START } from "../constants";
import { END, PINNED_BY_CORRELATION, ROUND_ROBIN, START } from "../constants";
import semver from "semver";

import { settingsToGRPC } from "./utils/settingsToGRPC";
import type { PersistentSubscriptionToStreamSettings } from "./utils/persistentSubscriptionSettings";
Expand Down Expand Up @@ -37,16 +38,22 @@ Client.prototype.createPersistentSubscriptionToStream = async function (
settings: PersistentSubscriptionToStreamSettings,
baseOptions: BaseOptions = {}
): Promise<void> {
const capabilities = await this.capabilities;
const { serverVersion } = await this.capabilities;

if (
semver.lt(serverVersion, "21.10.1") &&
settings.consumerStrategyName === PINNED_BY_CORRELATION
) {
console.warn(
`Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.`
);
settings.consumerStrategyName = ROUND_ROBIN;
}

const req = new CreateReq();
const options = new CreateReq.Options();
const identifier = createStreamIdentifier(streamName);
const reqSettings = settingsToGRPC(
settings,
CreateReq.Settings,
capabilities
);
const reqSettings = settingsToGRPC(settings, CreateReq.Settings);

// Add deprecated revision option for pre-21.10 support
switch (settings.startFrom) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import {
CreateReq,
UpdateReq,
} from "../../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb";
import { ServerFeatures } from "../../Client/ServerFeatures";
import semver from "semver";

import {
DISPATCH_TO_SINGLE,
Expand All @@ -17,16 +15,14 @@ import type {
PersistentSubscriptionToStreamSettings,
PersistentSubscriptionToAllSettings,
} from "./persistentSubscriptionSettings";
import { UnsupportedError } from "../../utils";

type GRPCSettings = typeof CreateReq.Settings | typeof UpdateReq.Settings;

export const settingsToGRPC = <T extends GRPCSettings>(
settings:
| PersistentSubscriptionToStreamSettings
| PersistentSubscriptionToAllSettings,
ReqSettings: T,
capabilities?: ServerFeatures
ReqSettings: T
): InstanceType<T> => {
const reqSettings = new ReqSettings() as InstanceType<T>;

Expand All @@ -53,40 +49,37 @@ export const settingsToGRPC = <T extends GRPCSettings>(
reqSettings.setReadBatchSize(settings.readBatchSize);
reqSettings.setHistoryBufferSize(settings.historyBufferSize);

if (
capabilities &&
semver.satisfies(capabilities.serverVersion, ">=21.10.1") &&
reqSettings instanceof CreateReq.Settings
) {
reqSettings.setConsumerStrategy(settings.consumerStrategyName);
} else {
switch (settings.consumerStrategyName) {
case DISPATCH_TO_SINGLE: {
reqSettings.setNamedConsumerStrategy(
CreateReq.ConsumerStrategy.DISPATCHTOSINGLE
);
break;
}
case PINNED: {
reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED);
break;
}
case ROUND_ROBIN: {
reqSettings.setNamedConsumerStrategy(
CreateReq.ConsumerStrategy.ROUNDROBIN
);
break;
}
case PINNED_BY_CORRELATION: {
throw new UnsupportedError(PINNED_BY_CORRELATION, "21.10.1");
}
default: {
console.warn(
`Unknown consumerStrategyName ${settings.consumerStrategyName}.`
);
switch (settings.consumerStrategyName) {
case DISPATCH_TO_SINGLE: {
reqSettings.setNamedConsumerStrategy(
CreateReq.ConsumerStrategy.DISPATCHTOSINGLE
);
break;
}
case PINNED: {
reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED);
break;
}
case ROUND_ROBIN: {
reqSettings.setNamedConsumerStrategy(
CreateReq.ConsumerStrategy.ROUNDROBIN
);
break;
}
case PINNED_BY_CORRELATION: {
if (reqSettings instanceof CreateReq.Settings) {
reqSettings.setConsumerStrategy(settings.consumerStrategyName);
break;
} else {
throw new Error("'PinnedByCorrelation' is not supported for updates.");
}
}
default: {
console.warn(
`Unknown consumerStrategyName ${settings.consumerStrategyName}.`
);
break;
}
}

return reqSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import {
START,
UnsupportedError,
PINNED_BY_CORRELATION,
ROUND_ROBIN,
PINNED,
DISPATCH_TO_SINGLE,
} from "@kurrent/kurrentdb-client";

describe("createPersistentSubscriptionToAll", () => {
Expand Down Expand Up @@ -88,41 +91,29 @@ describe("createPersistentSubscriptionToAll", () => {
).resolves.toBeUndefined();
});

test("valid consumer strategy", async () => {
const GROUP_NAME = "group_name_valid_consumer_strategy";
test.each([
PINNED_BY_CORRELATION,
PINNED,
ROUND_ROBIN,
DISPATCH_TO_SINGLE,
])("consumer strategy: %s", async (strategy) => {
const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`;
await expect(
client.createPersistentSubscriptionToAll(
GROUP_NAME,
persistentSubscriptionToAllSettingsFromDefaults({
consumerStrategyName: PINNED_BY_CORRELATION,
consumerStrategyName: strategy,
})
)
).resolves.toBeUndefined();

let persistentSubscriptions =
await client.listAllPersistentSubscriptions();
let persistentSubscription =
await client.getPersistentSubscriptionToAllInfo(GROUP_NAME);

persistentSubscriptions = persistentSubscriptions.filter(
(ps) => ps.groupName === GROUP_NAME
expect(persistentSubscription.groupName).toBe(GROUP_NAME);
expect(persistentSubscription.settings.consumerStrategyName).toBe(
strategy
);

expect(persistentSubscriptions).toHaveLength(1);
expect(persistentSubscriptions[0].groupName).toBe(GROUP_NAME);
expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe(
PINNED_BY_CORRELATION
);
});

test("invalid consumer strategy", async () => {
const GROUP_NAME = "group_name_invalid_consumer_strategy";
await expect(
client.createPersistentSubscriptionToAll(
GROUP_NAME,
persistentSubscriptionToAllSettingsFromDefaults({
consumerStrategyName: "strategy_does_not_exists",
})
)
).rejects.toThrow(Error);
});

test("with a filter", async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { createTestNode } from "@test-utils";

import {
DISPATCH_TO_SINGLE,
KurrentDBClient,
PersistentSubscriptionExistsError,
persistentSubscriptionToStreamSettingsFromDefaults,
PINNED,
PINNED_BY_CORRELATION,
ROUND_ROBIN,
START,
} from "@kurrent/kurrentdb-client";

Expand Down Expand Up @@ -63,46 +66,33 @@ describe("createPersistentSubscriptionToStream", () => {
).resolves.toBeUndefined();
});

test.only("valid consumer strategy", async () => {
const STREAM_NAME = "stream_name_from_revision";
const GROUP_NAME = "group_name_valid_consumer_strategy";
await expect(
client.createPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults({
consumerStrategyName: PINNED_BY_CORRELATION,
})
)
).resolves.toBeUndefined();

let persistentSubscriptions =
await client.listAllPersistentSubscriptions();

persistentSubscriptions = persistentSubscriptions.filter(
(ps) => ps.groupName === GROUP_NAME && ps.eventSource === STREAM_NAME
);

expect(persistentSubscriptions).toHaveLength(1);
expect(persistentSubscriptions[0].eventSource).toBe(STREAM_NAME);
expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe(
PINNED_BY_CORRELATION
);
});

test("invalid consumer strategy", async () => {
const STREAM_NAME = "stream_name_from_revision";
const GROUP_NAME = "group_name_invalid_consumer_strategy";
await expect(
client.createPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults({
consumerStrategyName: "strategy_does_not_exists",
})
)
).rejects.toThrow(Error);
});
test.each([PINNED_BY_CORRELATION, PINNED, ROUND_ROBIN, DISPATCH_TO_SINGLE])(
"consumer strategy: %s",
async (strategy) => {
const STREAM_NAME = `stream_name_from_revision_${strategy.toLowerCase()}`;
const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`;
await expect(
client.createPersistentSubscriptionToStream(
STREAM_NAME,
GROUP_NAME,
persistentSubscriptionToStreamSettingsFromDefaults({
consumerStrategyName: strategy,
})
)
).resolves.toBeUndefined();

let persistentSubscription =
await client.getPersistentSubscriptionToStreamInfo(
STREAM_NAME,
GROUP_NAME
);

expect(persistentSubscription.eventSource).toBe(STREAM_NAME);
expect(persistentSubscription.settings.consumerStrategyName).toBe(
strategy
);
}
);
});

test("should throw an error if subscription exists", async () => {
Expand Down
Loading