Skip to content

Commit 930a398

Browse files
committed
Improve PinnedByCorrelation and add docs
1 parent 8dff513 commit 930a398

File tree

6 files changed

+132
-116
lines changed

6 files changed

+132
-116
lines changed

docs/api/persistent-subscriptions.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,27 @@ The main aim of this strategy is to decrease the likelihood of concurrency and
203203
ordering issues while maintaining load balancing. This is **not a guarantee**,
204204
and you should handle the usual ordering and concurrency issues.
205205

206+
### PinnedByCorrelation
207+
208+
The PinnedByCorrelation strategy is a consumer strategy available for persistent subscriptions
209+
It ensures that events with the same correlation id are consistently delivered to the same
210+
consumer within a subscription group.
211+
212+
:::note
213+
This strategy requires database version 21.10.1 or later. You can only create a persistent subscription
214+
with this strategy. To change the strategy, you must delete the existing subscription and create a
215+
new one with the desired settings.
216+
:::
217+
218+
## Updating a subscription group
219+
220+
You can edit the settings of an existing subscription group while it is running,
221+
you don't need to delete and recreate it to change settings. When you update the
222+
subscription group, it resets itself internally, dropping the connections and
223+
having them reconnect. You must have admin permissions to update a persistent
224+
subscription group.
225+
226+
206227
## Updating a subscription group
207228

208229
You can edit the settings of an existing subscription group while it is running,

packages/db-client/src/persistentSubscription/createPersistentSubscriptionToAll.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Empty } from "../../generated/kurrentdb/protocols/v1/shared_pb";
22
import { CreateReq } from "../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb";
3+
import semver from "semver";
34
import {
45
PersistentSubscriptionsClient,
56
PersistentSubscriptionsService,
@@ -8,7 +9,14 @@ import {
89
import type { BaseOptions, Filter } from "../types";
910
import { debug, convertToCommandError, UnsupportedError } from "../utils";
1011
import { Client } from "../Client";
11-
import { END, EVENT_TYPE, START, STREAM_NAME } from "../constants";
12+
import {
13+
END,
14+
EVENT_TYPE,
15+
PINNED_BY_CORRELATION,
16+
ROUND_ROBIN,
17+
START,
18+
STREAM_NAME,
19+
} from "../constants";
1220

1321
import { settingsToGRPC } from "./utils/settingsToGRPC";
1422
import type { PersistentSubscriptionToAllSettings } from "./utils/persistentSubscriptionSettings";
@@ -46,20 +54,26 @@ Client.prototype.createPersistentSubscriptionToAll = async function (
4654
settings: PersistentSubscriptionToAllSettings,
4755
{ filter, ...baseOptions }: CreatePersistentSubscriptionToAllOptions = {}
4856
): Promise<void> {
49-
const capabilities = await this.capabilities;
57+
const { serverVersion, supports } = await this.capabilities;
5058

51-
if (!capabilities.supports(PersistentSubscriptionsService.create, "all")) {
59+
if (!supports(PersistentSubscriptionsService.create, "all")) {
5260
throw new UnsupportedError("createPersistentSubscriptionToAll", "21.10");
5361
}
5462

63+
if (
64+
semver.lt(serverVersion, "21.10.1") &&
65+
settings.consumerStrategyName === PINNED_BY_CORRELATION
66+
) {
67+
console.warn(
68+
`Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.`
69+
);
70+
settings.consumerStrategyName = ROUND_ROBIN;
71+
}
72+
5573
const req = new CreateReq();
5674
const options = new CreateReq.Options();
5775
const allOptions = new CreateReq.AllOptions();
58-
const reqSettings = settingsToGRPC(
59-
settings,
60-
CreateReq.Settings,
61-
capabilities
62-
);
76+
const reqSettings = settingsToGRPC(settings, CreateReq.Settings);
6377

6478
switch (settings.startFrom) {
6579
case START: {

packages/db-client/src/persistentSubscription/createPersistentSubscriptionToStream.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { PersistentSubscriptionsClient } from "../../generated/kurrentdb/protoco
44
import type { BaseOptions } from "../types";
55
import { debug, convertToCommandError, createStreamIdentifier } from "../utils";
66
import { Client } from "../Client";
7-
import { END, START } from "../constants";
7+
import { END, PINNED_BY_CORRELATION, ROUND_ROBIN, START } from "../constants";
8+
import semver from "semver";
89

910
import { settingsToGRPC } from "./utils/settingsToGRPC";
1011
import type { PersistentSubscriptionToStreamSettings } from "./utils/persistentSubscriptionSettings";
@@ -37,16 +38,22 @@ Client.prototype.createPersistentSubscriptionToStream = async function (
3738
settings: PersistentSubscriptionToStreamSettings,
3839
baseOptions: BaseOptions = {}
3940
): Promise<void> {
40-
const capabilities = await this.capabilities;
41+
const { serverVersion } = await this.capabilities;
42+
43+
if (
44+
semver.lt(serverVersion, "21.10.1") &&
45+
settings.consumerStrategyName === PINNED_BY_CORRELATION
46+
) {
47+
console.warn(
48+
`Consumer strategy "${PINNED_BY_CORRELATION}" requires server version ${serverVersion} or higher. "${ROUND_ROBIN}" will be used instead.`
49+
);
50+
settings.consumerStrategyName = ROUND_ROBIN;
51+
}
4152

4253
const req = new CreateReq();
4354
const options = new CreateReq.Options();
4455
const identifier = createStreamIdentifier(streamName);
45-
const reqSettings = settingsToGRPC(
46-
settings,
47-
CreateReq.Settings,
48-
capabilities
49-
);
56+
const reqSettings = settingsToGRPC(settings, CreateReq.Settings);
5057

5158
// Add deprecated revision option for pre-21.10 support
5259
switch (settings.startFrom) {

packages/db-client/src/persistentSubscription/utils/settingsToGRPC.ts

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ import {
22
CreateReq,
33
UpdateReq,
44
} from "../../../generated/kurrentdb/protocols/v1/persistentsubscriptions_pb";
5-
import { ServerFeatures } from "../../Client/ServerFeatures";
6-
import semver from "semver";
75

86
import {
97
DISPATCH_TO_SINGLE,
@@ -17,16 +15,14 @@ import type {
1715
PersistentSubscriptionToStreamSettings,
1816
PersistentSubscriptionToAllSettings,
1917
} from "./persistentSubscriptionSettings";
20-
import { UnsupportedError } from "../../utils";
2118

2219
type GRPCSettings = typeof CreateReq.Settings | typeof UpdateReq.Settings;
2320

2421
export const settingsToGRPC = <T extends GRPCSettings>(
2522
settings:
2623
| PersistentSubscriptionToStreamSettings
2724
| PersistentSubscriptionToAllSettings,
28-
ReqSettings: T,
29-
capabilities?: ServerFeatures
25+
ReqSettings: T
3026
): InstanceType<T> => {
3127
const reqSettings = new ReqSettings() as InstanceType<T>;
3228

@@ -53,40 +49,37 @@ export const settingsToGRPC = <T extends GRPCSettings>(
5349
reqSettings.setReadBatchSize(settings.readBatchSize);
5450
reqSettings.setHistoryBufferSize(settings.historyBufferSize);
5551

56-
if (
57-
capabilities &&
58-
semver.satisfies(capabilities.serverVersion, ">=21.10.1") &&
59-
reqSettings instanceof CreateReq.Settings
60-
) {
61-
reqSettings.setConsumerStrategy(settings.consumerStrategyName);
62-
} else {
63-
switch (settings.consumerStrategyName) {
64-
case DISPATCH_TO_SINGLE: {
65-
reqSettings.setNamedConsumerStrategy(
66-
CreateReq.ConsumerStrategy.DISPATCHTOSINGLE
67-
);
68-
break;
69-
}
70-
case PINNED: {
71-
reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED);
72-
break;
73-
}
74-
case ROUND_ROBIN: {
75-
reqSettings.setNamedConsumerStrategy(
76-
CreateReq.ConsumerStrategy.ROUNDROBIN
77-
);
78-
break;
79-
}
80-
case PINNED_BY_CORRELATION: {
81-
throw new UnsupportedError(PINNED_BY_CORRELATION, "21.10.1");
82-
}
83-
default: {
84-
console.warn(
85-
`Unknown consumerStrategyName ${settings.consumerStrategyName}.`
86-
);
52+
switch (settings.consumerStrategyName) {
53+
case DISPATCH_TO_SINGLE: {
54+
reqSettings.setNamedConsumerStrategy(
55+
CreateReq.ConsumerStrategy.DISPATCHTOSINGLE
56+
);
57+
break;
58+
}
59+
case PINNED: {
60+
reqSettings.setNamedConsumerStrategy(CreateReq.ConsumerStrategy.PINNED);
61+
break;
62+
}
63+
case ROUND_ROBIN: {
64+
reqSettings.setNamedConsumerStrategy(
65+
CreateReq.ConsumerStrategy.ROUNDROBIN
66+
);
67+
break;
68+
}
69+
case PINNED_BY_CORRELATION: {
70+
if (reqSettings instanceof CreateReq.Settings) {
71+
reqSettings.setConsumerStrategy(settings.consumerStrategyName);
8772
break;
73+
} else {
74+
throw new Error("'PinnedByCorrelation' is not supported for updates.");
8875
}
8976
}
77+
default: {
78+
console.warn(
79+
`Unknown consumerStrategyName ${settings.consumerStrategyName}.`
80+
);
81+
break;
82+
}
9083
}
9184

9285
return reqSettings;

packages/test/src/persistentSubscription/createPersistentSubscriptionToAll.test.ts

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import {
1515
START,
1616
UnsupportedError,
1717
PINNED_BY_CORRELATION,
18+
ROUND_ROBIN,
19+
PINNED,
20+
DISPATCH_TO_SINGLE,
1821
} from "@kurrent/kurrentdb-client";
1922

2023
describe("createPersistentSubscriptionToAll", () => {
@@ -88,41 +91,29 @@ describe("createPersistentSubscriptionToAll", () => {
8891
).resolves.toBeUndefined();
8992
});
9093

91-
test("valid consumer strategy", async () => {
92-
const GROUP_NAME = "group_name_valid_consumer_strategy";
94+
test.each([
95+
PINNED_BY_CORRELATION,
96+
PINNED,
97+
ROUND_ROBIN,
98+
DISPATCH_TO_SINGLE,
99+
])("consumer strategy: %s", async (strategy) => {
100+
const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`;
93101
await expect(
94102
client.createPersistentSubscriptionToAll(
95103
GROUP_NAME,
96104
persistentSubscriptionToAllSettingsFromDefaults({
97-
consumerStrategyName: PINNED_BY_CORRELATION,
105+
consumerStrategyName: strategy,
98106
})
99107
)
100108
).resolves.toBeUndefined();
101109

102-
let persistentSubscriptions =
103-
await client.listAllPersistentSubscriptions();
110+
let persistentSubscription =
111+
await client.getPersistentSubscriptionToAllInfo(GROUP_NAME);
104112

105-
persistentSubscriptions = persistentSubscriptions.filter(
106-
(ps) => ps.groupName === GROUP_NAME
113+
expect(persistentSubscription.groupName).toBe(GROUP_NAME);
114+
expect(persistentSubscription.settings.consumerStrategyName).toBe(
115+
strategy
107116
);
108-
109-
expect(persistentSubscriptions).toHaveLength(1);
110-
expect(persistentSubscriptions[0].groupName).toBe(GROUP_NAME);
111-
expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe(
112-
PINNED_BY_CORRELATION
113-
);
114-
});
115-
116-
test("invalid consumer strategy", async () => {
117-
const GROUP_NAME = "group_name_invalid_consumer_strategy";
118-
await expect(
119-
client.createPersistentSubscriptionToAll(
120-
GROUP_NAME,
121-
persistentSubscriptionToAllSettingsFromDefaults({
122-
consumerStrategyName: "strategy_does_not_exists",
123-
})
124-
)
125-
).rejects.toThrow(Error);
126117
});
127118

128119
test("with a filter", async () => {

packages/test/src/persistentSubscription/createPersistentSubscriptionToStream.test.ts

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import { createTestNode } from "@test-utils";
22

33
import {
4+
DISPATCH_TO_SINGLE,
45
KurrentDBClient,
56
PersistentSubscriptionExistsError,
67
persistentSubscriptionToStreamSettingsFromDefaults,
8+
PINNED,
79
PINNED_BY_CORRELATION,
10+
ROUND_ROBIN,
811
START,
912
} from "@kurrent/kurrentdb-client";
1013

@@ -63,46 +66,33 @@ describe("createPersistentSubscriptionToStream", () => {
6366
).resolves.toBeUndefined();
6467
});
6568

66-
test.only("valid consumer strategy", async () => {
67-
const STREAM_NAME = "stream_name_from_revision";
68-
const GROUP_NAME = "group_name_valid_consumer_strategy";
69-
await expect(
70-
client.createPersistentSubscriptionToStream(
71-
STREAM_NAME,
72-
GROUP_NAME,
73-
persistentSubscriptionToStreamSettingsFromDefaults({
74-
consumerStrategyName: PINNED_BY_CORRELATION,
75-
})
76-
)
77-
).resolves.toBeUndefined();
78-
79-
let persistentSubscriptions =
80-
await client.listAllPersistentSubscriptions();
81-
82-
persistentSubscriptions = persistentSubscriptions.filter(
83-
(ps) => ps.groupName === GROUP_NAME && ps.eventSource === STREAM_NAME
84-
);
85-
86-
expect(persistentSubscriptions).toHaveLength(1);
87-
expect(persistentSubscriptions[0].eventSource).toBe(STREAM_NAME);
88-
expect(persistentSubscriptions[0].settings.consumerStrategyName).toBe(
89-
PINNED_BY_CORRELATION
90-
);
91-
});
92-
93-
test("invalid consumer strategy", async () => {
94-
const STREAM_NAME = "stream_name_from_revision";
95-
const GROUP_NAME = "group_name_invalid_consumer_strategy";
96-
await expect(
97-
client.createPersistentSubscriptionToStream(
98-
STREAM_NAME,
99-
GROUP_NAME,
100-
persistentSubscriptionToStreamSettingsFromDefaults({
101-
consumerStrategyName: "strategy_does_not_exists",
102-
})
103-
)
104-
).rejects.toThrow(Error);
105-
});
69+
test.each([PINNED_BY_CORRELATION, PINNED, ROUND_ROBIN, DISPATCH_TO_SINGLE])(
70+
"consumer strategy: %s",
71+
async (strategy) => {
72+
const STREAM_NAME = `stream_name_from_revision_${strategy.toLowerCase()}`;
73+
const GROUP_NAME = `group_name_valid_consumer_strategy_${strategy.toLowerCase()}`;
74+
await expect(
75+
client.createPersistentSubscriptionToStream(
76+
STREAM_NAME,
77+
GROUP_NAME,
78+
persistentSubscriptionToStreamSettingsFromDefaults({
79+
consumerStrategyName: strategy,
80+
})
81+
)
82+
).resolves.toBeUndefined();
83+
84+
let persistentSubscription =
85+
await client.getPersistentSubscriptionToStreamInfo(
86+
STREAM_NAME,
87+
GROUP_NAME
88+
);
89+
90+
expect(persistentSubscription.eventSource).toBe(STREAM_NAME);
91+
expect(persistentSubscription.settings.consumerStrategyName).toBe(
92+
strategy
93+
);
94+
}
95+
);
10696
});
10797

10898
test("should throw an error if subscription exists", async () => {

0 commit comments

Comments
 (0)