Skip to content

Commit b64e33b

Browse files
committed
Take control of the user metadata
1 parent b7b772b commit b64e33b

File tree

7 files changed

+213
-37
lines changed

7 files changed

+213
-37
lines changed

packages/db-client/src/streams/appendToStream/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { UnsupportedError } from "../../utils";
1515

1616
import { append } from "./append";
1717
import { batchAppend } from "./batchAppend";
18-
import { multiAppend } from "./multi";
18+
import { multiStreamAppend } from "./multiStreamAppend";
1919

2020
export interface AppendToStreamOptions extends BaseOptions {
2121
/**
@@ -44,8 +44,8 @@ declare module "../../Client" {
4444
options?: AppendToStreamOptions
4545
): Promise<AppendResult>;
4646

47-
multiAppend(
48-
requests: AppendStreamRequest[],
47+
multiStreamAppend<KnownEventType extends EventType = EventType>(
48+
requests: AppendStreamRequest<KnownEventType>[],
4949
options?: AppendToStreamOptions
5050
): Promise<MultiAppendResult>;
5151
}
@@ -83,13 +83,13 @@ Client.prototype.appendToStream = async function <
8383
});
8484
};
8585

86-
Client.prototype.multiAppend = async function (
86+
Client.prototype.multiStreamAppend = async function (
8787
this: Client,
8888
requests: AppendStreamRequest[],
8989
baseOptions: BaseOptions = {}
9090
): Promise<MultiAppendResult> {
9191
if (!(await this.supports(StreamsServiceService.multiStreamAppendSession))) {
9292
throw new UnsupportedError("multiStreamAppend", "25.10");
9393
}
94-
return multiAppend.call(this, requests, baseOptions);
94+
return multiStreamAppend.call(this, requests, baseOptions);
9595
};

packages/db-client/src/streams/appendToStream/multi.ts renamed to packages/db-client/src/streams/appendToStream/multiStreamAppend.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import {
2-
AppendStreamRequest,
32
MultiAppendResult,
43
AppendStreamSuccess,
54
AppendStreamFailure,
65
UnknownErrorDetails,
76
BaseOptions,
7+
AppendStreamRequest,
88
} from "../../types";
99
import type { Client } from "../../Client";
1010
import grpc from "../../../generated/kurrentdb/protocols/v2/streams/streams_grpc_pb";
@@ -14,13 +14,25 @@ import {
1414
backpressuredWrite,
1515
convertToCommandError,
1616
convertToSchemaDataFormat,
17+
mapObjectToDynamicValueMap,
1718
} from "../../utils";
1819

19-
export const multiAppend = async function (
20+
export const multiStreamAppend = async function (
2021
this: Client,
2122
requests: AppendStreamRequest[],
2223
baseOptions: BaseOptions
2324
): Promise<MultiAppendResult> {
25+
if (
26+
requests.some((request) =>
27+
request.events.some(
28+
(event) => event.metadata && event.metadata.constructor === Uint8Array
29+
)
30+
)
31+
)
32+
throw new Error(
33+
"multiStreamAppend requires all event metadata to be in JSON format."
34+
);
35+
2436
return this.execute(
2537
grpc.StreamsServiceClient,
2638
"multiStreamAppend",
@@ -136,6 +148,15 @@ export const multiAppend = async function (
136148
record.getPropertiesMap().set("$schema.data-format", dataFormat);
137149
record.getPropertiesMap().set("$schema.name", schemaName);
138150

151+
if (event.metadata) {
152+
const metadataMap = mapObjectToDynamicValueMap(
153+
event.metadata as Record<string, unknown>
154+
);
155+
for (const [key, value] of metadataMap) {
156+
record.getPropertiesMap().set(key, value);
157+
}
158+
}
159+
139160
switch (event.contentType) {
140161
case "application/json": {
141162
const data = JSON.stringify(event.data);

packages/db-client/src/types/index.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,11 @@ export interface FellBehind {
528528
position?: Position;
529529
}
530530

531-
export interface AppendStreamRequest {
531+
export interface AppendStreamRequest<
532+
KnownEventType extends EventType = EventType
533+
> {
532534
streamName: string;
533-
events: EventData[];
535+
events: EventData<KnownEventType>[];
534536
expectedState: AppendStreamState;
535537
}
536538

@@ -570,9 +572,10 @@ export interface BaseMultiAppendResult {
570572
success: boolean;
571573
}
572574

573-
export type MultiAppendResult =
574-
| ({ success: true; output: AppendStreamSuccess[] } & BaseMultiAppendResult)
575-
| ({ success: false; output: AppendStreamFailure[] } & BaseMultiAppendResult);
575+
export type MultiAppendResult = {
576+
success: boolean;
577+
output: AppendStreamSuccess[] | AppendStreamFailure[];
578+
} & BaseMultiAppendResult;
576579

577580
// Other listeners that are only supported in catch-up subscriptions
578581
export interface CatchupSubscription {

packages/db-client/src/utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ export * from "./grpcStreamIdentifier";
77
export * from "./grpcUUID";
88
export * from "./utilityTypes";
99
export * from "./isClientCancellationError";
10+
export * from "./mapToDynamicValue";
1011
export * from "./schema";
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import dynamic from "../../generated/kurrentdb/protocols/v2/core_pb";
2+
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
3+
import { NullValue } from "google-protobuf/google/protobuf/struct_pb";
4+
5+
export const mapToDynamicValue = (source: unknown): dynamic.DynamicValue => {
6+
const dynamicValue = new dynamic.DynamicValue();
7+
8+
if (source === null || source === undefined) {
9+
dynamicValue.setNullValue(NullValue.NULL_VALUE);
10+
return dynamicValue;
11+
}
12+
13+
switch (typeof source) {
14+
case "string":
15+
dynamicValue.setStringValue(source);
16+
break;
17+
18+
case "boolean":
19+
dynamicValue.setBooleanValue(source);
20+
break;
21+
22+
case "number":
23+
if (
24+
Number.isInteger(source) &&
25+
source >= -2147483648 &&
26+
source <= 2147483647
27+
) {
28+
dynamicValue.setInt32Value(source);
29+
} else if (Number.isInteger(source)) {
30+
dynamicValue.setInt64Value(source);
31+
} else {
32+
dynamicValue.setDoubleValue(source);
33+
}
34+
break;
35+
36+
case "bigint":
37+
dynamicValue.setInt64Value(Number(source));
38+
break;
39+
40+
case "object":
41+
if (source instanceof Date) {
42+
const timestamp = new Timestamp();
43+
timestamp.fromDate(source);
44+
dynamicValue.setTimestampValue(timestamp);
45+
} else if (source instanceof Uint8Array) {
46+
dynamicValue.setBytesValue(source);
47+
} else if (Buffer.isBuffer(source)) {
48+
dynamicValue.setBytesValue(Uint8Array.from(source));
49+
} else {
50+
dynamicValue.setStringValue(JSON.stringify(source));
51+
}
52+
break;
53+
54+
default:
55+
dynamicValue.setStringValue(String(source));
56+
break;
57+
}
58+
59+
return dynamicValue;
60+
};
61+
62+
export const mapObjectToDynamicValueMap = (
63+
obj: Record<string, unknown>
64+
): Map<string, dynamic.DynamicValue> => {
65+
const map = new Map<string, dynamic.DynamicValue>();
66+
67+
for (const [key, value] of Object.entries(obj)) {
68+
map.set(key, mapToDynamicValue(value));
69+
}
70+
71+
return map;
72+
};

packages/test/src/streams/multiAppendStream.test.ts

Lines changed: 98 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
/** @jest-environment ./src/utils/enableVersionCheck.ts */
22

33
import {
4+
binaryTestEvents,
5+
collect,
46
createTestNode,
57
jsonTestEvents,
68
matchServerVersion,
@@ -10,13 +12,15 @@ import {
1012
import {
1113
KurrentDBClient,
1214
ANY,
13-
AppendStreamRequest,
1415
UnsupportedError,
16+
AppendStreamRequest,
17+
AppendStreamFailure,
18+
STREAM_EXISTS,
1519
} from "@kurrent/kurrentdb-client";
1620

1721
import { v4 } from "uuid";
1822

19-
describe("MultiAppendStream", () => {
23+
describe("multiAppend", () => {
2024
const supported = matchServerVersion`>=25.0`;
2125
const node = createTestNode();
2226
let client!: KurrentDBClient;
@@ -30,33 +34,103 @@ describe("MultiAppendStream", () => {
3034
await node.down();
3135
});
3236

33-
optionalDescribe(supported)(
34-
"should successfully append to multiple streams",
35-
() => {
36-
test("json events", async () => {
37-
const STREAM_NAME_1 = v4().toString();
38-
const STREAM_NAME_2 = v4().toString();
37+
test("json events", async () => {
38+
const STREAM_NAME = v4().toString();
3939

40-
const requests: AppendStreamRequest[] = [];
40+
const requests: AppendStreamRequest[] = [];
4141

42-
requests.push({
43-
streamName: STREAM_NAME_1,
44-
events: jsonTestEvents(),
45-
expectedState: ANY,
46-
});
42+
requests.push({
43+
streamName: STREAM_NAME,
44+
events: binaryTestEvents(),
45+
expectedState: ANY,
46+
});
47+
48+
try {
49+
await client.multiStreamAppend(requests);
50+
} catch (error) {
51+
expect(error).toBeInstanceOf(Error);
52+
expect(error.message).toBe(
53+
"multiStreamAppend requires server version 25.10 or higher."
54+
);
55+
}
56+
});
57+
58+
optionalDescribe(supported)("Supported (>=25.1)", () => {
59+
test("json events", async () => {
60+
const STREAM_NAME_1 = v4().toString();
61+
const STREAM_NAME_2 = v4().toString();
62+
const expectedMetadata = {
63+
timestamp: new Date().toISOString(),
64+
int: 1,
65+
float: 1.1,
66+
string: "test",
67+
};
68+
69+
const requests: AppendStreamRequest[] = [];
70+
71+
requests.push({
72+
streamName: STREAM_NAME_1,
73+
events: jsonTestEvents(4, "test", expectedMetadata),
74+
expectedState: ANY,
75+
});
76+
77+
requests.push({
78+
streamName: STREAM_NAME_2,
79+
events: jsonTestEvents(4, "test", expectedMetadata),
80+
expectedState: ANY,
81+
});
82+
83+
const result = await client.multiStreamAppend(requests);
84+
expect(result).toBeDefined();
85+
expect(result.success).toBeTruthy();
86+
87+
const stream1Events = await collect(client.readStream(STREAM_NAME_1));
88+
const stream2Events = await collect(client.readStream(STREAM_NAME_2));
89+
90+
expect(stream1Events.length).toBe(4);
91+
expect(stream2Events.length).toBe(4);
4792

48-
requests.push({
49-
streamName: STREAM_NAME_2,
50-
events: jsonTestEvents(),
51-
expectedState: ANY,
93+
for (const event of [...stream1Events, ...stream2Events]) {
94+
expect(event.event).toBeDefined();
95+
expect(event.event?.metadata).toEqual({
96+
"$schema.data-format": "Json",
97+
"$schema.name": "test",
98+
...expectedMetadata
5299
});
100+
}
101+
});
102+
});
103+
104+
optionalDescribe(supported)("Supported (>=25.1)", () => {
105+
test("stream revision conflict", async () => {
106+
const STREAM_NAME = v4().toString();
107+
108+
const requests: AppendStreamRequest[] = [];
53109

54-
const result = await client.multiAppend(requests);
55-
expect(result).toBeDefined();
56-
expect(result.success).toBeTruthy();
110+
requests.push({
111+
streamName: STREAM_NAME,
112+
events: jsonTestEvents(),
113+
expectedState: STREAM_EXISTS,
57114
});
58-
}
59-
);
115+
116+
const result = await client.multiStreamAppend(requests);
117+
expect(result).toBeDefined();
118+
expect(result.success).toBeFalsy();
119+
expect(result.output).toBeDefined();
120+
expect(result.output.length).toBe(1);
121+
122+
const failures = result.output as AppendStreamFailure[];
123+
expect(failures[0].streamName).toBe(STREAM_NAME);
124+
125+
expect(failures[0]).toMatchObject({
126+
streamName: expect.any(String),
127+
details: {
128+
type: "wrong_expected_revision",
129+
revision: BigInt(-1),
130+
},
131+
});
132+
});
133+
});
60134

61135
optionalDescribe(!supported)("not supported (<25.0)", () => {
62136
test("throw unsupported error", async () => {
@@ -77,7 +151,7 @@ describe("MultiAppendStream", () => {
77151
});
78152

79153
try {
80-
await client.multiAppend(requests);
154+
await client.multiStreamAppend(requests);
81155
} catch (error) {
82156
expect(error).toBeInstanceOf(UnsupportedError);
83157
}

packages/test/src/utils/testEvents.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@ export interface TestEventData {
55
index: number;
66
}
77

8-
export const jsonTestEvents = (count = 4, type = "test"): EventData[] =>
8+
export const jsonTestEvents = (
9+
count = 4,
10+
type = "test",
11+
metadata?: Record<string, unknown>
12+
): EventData[] =>
913
Array.from({ length: count }, (_, i) =>
1014
jsonEvent({
1115
type,
1216
data: {
1317
message: "test",
1418
index: i,
1519
},
20+
...(metadata && { metadata }),
1621
})
1722
);
1823

0 commit comments

Comments
 (0)