Skip to content

Commit c7e503e

Browse files
committed
Take control of the user metadata
1 parent b7b772b commit c7e503e

File tree

7 files changed

+250
-45
lines changed

7 files changed

+250
-45
lines changed

packages/db-client/src/streams/appendToStream/index.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { UnsupportedError } from "../../utils";
1515

1616
import { append } from "./append";
1717
import { batchAppend } from "./batchAppend";
18-
import { multiAppend } from "./multi";
18+
import { multiStreamAppend } from "./multiStreamAppend";
1919

2020
export interface AppendToStreamOptions extends BaseOptions {
2121
/**
@@ -44,9 +44,8 @@ declare module "../../Client" {
4444
options?: AppendToStreamOptions
4545
): Promise<AppendResult>;
4646

47-
multiAppend(
48-
requests: AppendStreamRequest[],
49-
options?: AppendToStreamOptions
47+
multiStreamAppend<KnownEventType extends EventType = EventType>(
48+
requests: AppendStreamRequest<KnownEventType>[]
5049
): Promise<MultiAppendResult>;
5150
}
5251
}
@@ -83,13 +82,12 @@ Client.prototype.appendToStream = async function <
8382
});
8483
};
8584

86-
Client.prototype.multiAppend = async function (
85+
Client.prototype.multiStreamAppend = async function (
8786
this: Client,
88-
requests: AppendStreamRequest[],
89-
baseOptions: BaseOptions = {}
87+
requests: AppendStreamRequest[]
9088
): Promise<MultiAppendResult> {
9189
if (!(await this.supports(StreamsServiceService.multiStreamAppendSession))) {
9290
throw new UnsupportedError("multiStreamAppend", "25.10");
9391
}
94-
return multiAppend.call(this, requests, baseOptions);
92+
return multiStreamAppend.call(this, requests);
9593
};

packages/db-client/src/streams/appendToStream/multi.ts renamed to packages/db-client/src/streams/appendToStream/multiStreamAppend.ts

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import {
2-
AppendStreamRequest,
32
MultiAppendResult,
43
AppendStreamSuccess,
54
AppendStreamFailure,
65
UnknownErrorDetails,
7-
BaseOptions,
6+
AppendStreamRequest,
87
} from "../../types";
98
import type { Client } from "../../Client";
109
import grpc from "../../../generated/kurrentdb/protocols/v2/streams/streams_grpc_pb";
@@ -14,21 +13,33 @@ import {
1413
backpressuredWrite,
1514
convertToCommandError,
1615
convertToSchemaDataFormat,
16+
mapObjectToDynamicValueMap,
1717
} from "../../utils";
1818

19-
export const multiAppend = async function (
19+
export const multiStreamAppend = async function (
2020
this: Client,
21-
requests: AppendStreamRequest[],
22-
baseOptions: BaseOptions
21+
requests: AppendStreamRequest[]
2322
): Promise<MultiAppendResult> {
23+
if (
24+
requests.some((request) =>
25+
request.events.some(
26+
(event) => event.metadata && event.metadata.constructor === Uint8Array
27+
)
28+
)
29+
)
30+
throw new Error(
31+
"multiStreamAppend requires all event metadata to be in JSON format."
32+
);
33+
2434
return this.execute(
2535
grpc.StreamsServiceClient,
2636
"multiStreamAppend",
2737
(client) =>
2838
new Promise<MultiAppendResult>(async (resolve, reject) => {
29-
baseOptions.requiresLeader = false;
3039
const sink = client.multiStreamAppendSession(
31-
...this.callArguments(baseOptions),
40+
...this.callArguments({
41+
requiresLeader: false,
42+
}),
3243
(error, response) => {
3344
if (error != null) {
3445
return reject(convertToCommandError(error));
@@ -136,6 +147,15 @@ export const multiAppend = async function (
136147
record.getPropertiesMap().set("$schema.data-format", dataFormat);
137148
record.getPropertiesMap().set("$schema.name", schemaName);
138149

150+
if (event.metadata) {
151+
const metadataMap = mapObjectToDynamicValueMap(
152+
event.metadata as Record<string, unknown>
153+
);
154+
for (const [key, value] of metadataMap) {
155+
record.getPropertiesMap().set(key, value);
156+
}
157+
}
158+
139159
switch (event.contentType) {
140160
case "application/json": {
141161
const data = JSON.stringify(event.data);

packages/db-client/src/types/index.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,11 @@ export interface FellBehind {
528528
position?: Position;
529529
}
530530

531-
export interface AppendStreamRequest {
531+
export interface AppendStreamRequest<
532+
KnownEventType extends EventType = EventType
533+
> {
532534
streamName: string;
533-
events: EventData[];
535+
events: EventData<KnownEventType>[];
534536
expectedState: AppendStreamState;
535537
}
536538

@@ -570,9 +572,10 @@ export interface BaseMultiAppendResult {
570572
success: boolean;
571573
}
572574

573-
export type MultiAppendResult =
574-
| ({ success: true; output: AppendStreamSuccess[] } & BaseMultiAppendResult)
575-
| ({ success: false; output: AppendStreamFailure[] } & BaseMultiAppendResult);
575+
export type MultiAppendResult = {
576+
success: boolean;
577+
output: AppendStreamSuccess[] | AppendStreamFailure[];
578+
} & BaseMultiAppendResult;
576579

577580
// Other listeners that are only supported in catch-up subscriptions
578581
export interface CatchupSubscription {

packages/db-client/src/utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ export * from "./grpcStreamIdentifier";
77
export * from "./grpcUUID";
88
export * from "./utilityTypes";
99
export * from "./isClientCancellationError";
10+
export * from "./mapToDynamicValue";
1011
export * from "./schema";
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import dynamic from "../../generated/kurrentdb/protocols/v2/core_pb";
2+
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
3+
import { NullValue } from "google-protobuf/google/protobuf/struct_pb";
4+
5+
export const mapToDynamicValue = (source: unknown): dynamic.DynamicValue => {
6+
const dynamicValue = new dynamic.DynamicValue();
7+
8+
if (source === null || source === undefined) {
9+
dynamicValue.setNullValue(NullValue.NULL_VALUE);
10+
return dynamicValue;
11+
}
12+
13+
switch (typeof source) {
14+
case "string":
15+
dynamicValue.setStringValue(source);
16+
break;
17+
18+
case "boolean":
19+
dynamicValue.setBooleanValue(source);
20+
break;
21+
22+
case "number":
23+
if (
24+
Number.isInteger(source) &&
25+
source >= -2147483648 &&
26+
source <= 2147483647
27+
) {
28+
dynamicValue.setInt32Value(source);
29+
} else if (Number.isInteger(source)) {
30+
dynamicValue.setInt64Value(source);
31+
} else {
32+
dynamicValue.setDoubleValue(source);
33+
}
34+
break;
35+
36+
case "bigint":
37+
dynamicValue.setInt64Value(Number(source));
38+
break;
39+
40+
case "object":
41+
if (source instanceof Date) {
42+
const timestamp = new Timestamp();
43+
timestamp.fromDate(source);
44+
dynamicValue.setTimestampValue(timestamp);
45+
} else if (source instanceof Uint8Array) {
46+
dynamicValue.setBytesValue(source);
47+
} else if (Buffer.isBuffer(source)) {
48+
dynamicValue.setBytesValue(Uint8Array.from(source));
49+
} else {
50+
dynamicValue.setStringValue(JSON.stringify(source));
51+
}
52+
break;
53+
54+
default:
55+
dynamicValue.setStringValue(String(source));
56+
break;
57+
}
58+
59+
return dynamicValue;
60+
};
61+
62+
export const mapObjectToDynamicValueMap = (
63+
obj: Record<string, unknown>
64+
): Map<string, dynamic.DynamicValue> => {
65+
const map = new Map<string, dynamic.DynamicValue>();
66+
67+
for (const [key, value] of Object.entries(obj)) {
68+
map.set(key, mapToDynamicValue(value));
69+
}
70+
71+
return map;
72+
};

0 commit comments

Comments
 (0)