Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions packages/db-client/src/streams/appendToStream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { UnsupportedError } from "../../utils";

import { append } from "./append";
import { batchAppend } from "./batchAppend";
import { multiAppend } from "./multi";
import { multiStreamAppend } from "./multiStreamAppend";

export interface AppendToStreamOptions extends BaseOptions {
/**
Expand Down Expand Up @@ -44,9 +44,8 @@ declare module "../../Client" {
options?: AppendToStreamOptions
): Promise<AppendResult>;

multiAppend(
requests: AppendStreamRequest[],
options?: AppendToStreamOptions
multiStreamAppend<KnownEventType extends EventType = EventType>(
requests: AppendStreamRequest<KnownEventType>[]
): Promise<MultiAppendResult>;
}
}
Expand Down Expand Up @@ -83,13 +82,12 @@ Client.prototype.appendToStream = async function <
});
};

Client.prototype.multiAppend = async function (
Client.prototype.multiStreamAppend = async function (
this: Client,
requests: AppendStreamRequest[],
baseOptions: BaseOptions = {}
requests: AppendStreamRequest[]
): Promise<MultiAppendResult> {
if (!(await this.supports(StreamsServiceService.multiStreamAppendSession))) {
throw new UnsupportedError("multiStreamAppend", "25.10");
}
return multiAppend.call(this, requests, baseOptions);
return multiStreamAppend.call(this, requests);
};
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {
AppendStreamRequest,
MultiAppendResult,
AppendStreamSuccess,
AppendStreamFailure,
UnknownErrorDetails,
BaseOptions,
AppendStreamRequest,
} from "../../types";
import type { Client } from "../../Client";
import grpc from "../../../generated/kurrentdb/protocols/v2/streams/streams_grpc_pb";
Expand All @@ -14,21 +13,33 @@ import {
backpressuredWrite,
convertToCommandError,
convertToSchemaDataFormat,
mapObjectToDynamicValueMap,
} from "../../utils";

export const multiAppend = async function (
export const multiStreamAppend = async function (
this: Client,
requests: AppendStreamRequest[],
baseOptions: BaseOptions
requests: AppendStreamRequest[]
): Promise<MultiAppendResult> {
if (
requests.some((request) =>
request.events.some(
(event) => event.metadata && event.metadata.constructor === Uint8Array
)
)
)
throw new Error(
"multiStreamAppend requires all event metadata to be in JSON format."
);

return this.execute(
grpc.StreamsServiceClient,
"multiStreamAppend",
(client) =>
new Promise<MultiAppendResult>(async (resolve, reject) => {
baseOptions.requiresLeader = false;
const sink = client.multiStreamAppendSession(
...this.callArguments(baseOptions),
...this.callArguments({
requiresLeader: false,
}),
(error, response) => {
if (error != null) {
return reject(convertToCommandError(error));
Expand Down Expand Up @@ -136,6 +147,15 @@ export const multiAppend = async function (
record.getPropertiesMap().set("$schema.data-format", dataFormat);
record.getPropertiesMap().set("$schema.name", schemaName);

if (event.metadata) {
const metadataMap = mapObjectToDynamicValueMap(
event.metadata as Record<string, unknown>
);
for (const [key, value] of metadataMap) {
record.getPropertiesMap().set(key, value);
}
}

switch (event.contentType) {
case "application/json": {
const data = JSON.stringify(event.data);
Expand Down
13 changes: 8 additions & 5 deletions packages/db-client/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,11 @@ export interface FellBehind {
position?: Position;
}

export interface AppendStreamRequest {
export interface AppendStreamRequest<
KnownEventType extends EventType = EventType
> {
streamName: string;
events: EventData[];
events: EventData<KnownEventType>[];
expectedState: AppendStreamState;
}

Expand Down Expand Up @@ -570,9 +572,10 @@ export interface BaseMultiAppendResult {
success: boolean;
}

export type MultiAppendResult =
| ({ success: true; output: AppendStreamSuccess[] } & BaseMultiAppendResult)
| ({ success: false; output: AppendStreamFailure[] } & BaseMultiAppendResult);
export type MultiAppendResult = {
success: boolean;
output: AppendStreamSuccess[] | AppendStreamFailure[];
} & BaseMultiAppendResult;

// Other listeners that are only supported in catch-up subscriptions
export interface CatchupSubscription {
Expand Down
1 change: 1 addition & 0 deletions packages/db-client/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export * from "./grpcStreamIdentifier";
export * from "./grpcUUID";
export * from "./utilityTypes";
export * from "./isClientCancellationError";
export * from "./mapToDynamicValue";
export * from "./schema";
72 changes: 72 additions & 0 deletions packages/db-client/src/utils/mapToDynamicValue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import dynamic from "../../generated/kurrentdb/protocols/v2/core_pb";
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
import { NullValue } from "google-protobuf/google/protobuf/struct_pb";

export const mapToDynamicValue = (source: unknown): dynamic.DynamicValue => {
const dynamicValue = new dynamic.DynamicValue();

if (source === null || source === undefined) {
dynamicValue.setNullValue(NullValue.NULL_VALUE);
return dynamicValue;
}

switch (typeof source) {
case "string":
dynamicValue.setStringValue(source);
break;

case "boolean":
dynamicValue.setBooleanValue(source);
break;

case "number":
if (
Number.isInteger(source) &&
source >= -2147483648 &&
source <= 2147483647
) {
dynamicValue.setInt32Value(source);
} else if (Number.isInteger(source)) {
dynamicValue.setInt64Value(source);
} else {
dynamicValue.setDoubleValue(source);
}
break;

case "bigint":
dynamicValue.setInt64Value(Number(source));
break;

case "object":
if (source instanceof Date) {
const timestamp = new Timestamp();
timestamp.fromDate(source);
dynamicValue.setTimestampValue(timestamp);
} else if (source instanceof Uint8Array) {
dynamicValue.setBytesValue(source);
} else if (Buffer.isBuffer(source)) {
dynamicValue.setBytesValue(Uint8Array.from(source));
} else {
dynamicValue.setStringValue(JSON.stringify(source));
}
break;

default:
dynamicValue.setStringValue(String(source));
break;
}

return dynamicValue;
};

export const mapObjectToDynamicValueMap = (
obj: Record<string, unknown>
): Map<string, dynamic.DynamicValue> => {
const map = new Map<string, dynamic.DynamicValue>();

for (const [key, value] of Object.entries(obj)) {
map.set(key, mapToDynamicValue(value));
}

return map;
};
Loading