Skip to content

Commit 78ad637

Browse files
feat: add appendRecords operation with consistency checks (#460)
Co-authored-by: William Chong <william-chong@outlook.com>
1 parent efd197f commit 78ad637

File tree

19 files changed

+3730
-52
lines changed

19 files changed

+3730
-52
lines changed

docs/api/appending-events.md

Lines changed: 168 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const event = jsonEvent({
2929
});
3030

3131
await client.appendToStream("orders", event, {
32-
streamState NO_STREAM,
32+
streamState: NO_STREAM,
3333
});
3434
```
3535

@@ -136,49 +136,49 @@ This check can be used to implement optimistic concurrency. When retrieving a
136136
stream from KurrentDB, note the current version number. When you save it back,
137137
you can determine if somebody else has modified the record in the meantime.
138138

139-
```ts {6,9,26-28,41-43}
140-
const events = client.readStream("order-stream", {
139+
```ts
140+
const events = client.readStream("order-12345", {
141141
fromRevision: START,
142142
direction: FORWARDS,
143143
});
144144

145+
// Get the current revision to use for optimistic concurrency
145146
let revision: AppendStreamState = NO_STREAM;
146147

147148
for await (const { event } of events) {
148149
revision = event?.revision ?? revision;
149150
}
150151

151-
const orderPlacedEvent = jsonEvent({
152-
id: uuid(),
153-
type: "OrderPlaced",
154-
data: {
155-
orderId: "order-456",
156-
customerId: "customer-789",
157-
totalAmount: 149.99,
158-
items: [
159-
{ productId: "prod-123", quantity: 2, price: 49.99 },
160-
{ productId: "prod-456", quantity: 1, price: 49.99 }
161-
]
162-
},
163-
});
164-
165-
await client.appendToStream("order-stream", orderPlacedEvent, {
166-
streamState revision,
167-
});
168-
152+
// Two concurrent operations trying to update the same order
169153
const paymentProcessedEvent = jsonEvent({
170154
id: uuid(),
171155
type: "PaymentProcessed",
172156
data: {
173-
orderId: "order-456",
157+
orderId: "order-12345",
174158
paymentId: "payment-789",
175159
amount: 149.99,
176160
paymentMethod: "credit_card"
177161
},
178162
});
179163

180-
await client.appendToStream("order-stream", paymentProcessedEvent, {
181-
streamState revision,
164+
const orderCancelledEvent = jsonEvent({
165+
id: uuid(),
166+
type: "OrderCancelled",
167+
data: {
168+
orderId: "order-12345",
169+
reason: "customer-request",
170+
comment: "Customer changed mind"
171+
},
172+
});
173+
174+
// Process payment (succeeds)
175+
await client.appendToStream("order-12345", paymentProcessedEvent, {
176+
streamState: revision,
177+
});
178+
179+
// Cancel order (fails due to concurrency conflict)
180+
await client.appendToStream("order-12345", orderCancelledEvent, {
181+
streamState: revision,
182182
});
183183
```
184184

@@ -197,13 +197,16 @@ await client.appendToStream("some-stream", event, {
197197
});
198198
```
199199

200-
## Append to multiple streams
200+
## Atomic appends
201201

202-
::: note
203-
This feature is only available in KurrentDB 25.1 and later.
204-
:::
202+
KurrentDB provides two operations for appending events to one or more streams in a single atomic transaction: `appendRecords` and `multiStreamAppend`. Both guarantee that either all writes succeed or the entire operation fails, but they differ in how records are organized, ordered, and validated.
205203

206-
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
204+
| | `appendRecords` | `multiStreamAppend` |
205+
|---|---|---|
206+
| **Available since** | KurrentDB 26.1 | KurrentDB 25.1 |
207+
| **Record ordering** | Interleaved. Records from different streams can be mixed, and their exact order is preserved in the global log. | Grouped. All records for a stream are sent together; ordering across streams is not guaranteed. |
208+
| **Consistency checks** | Decoupled. Can validate the state of any stream, including streams not being written to. | Coupled. Expected state is specified per stream being written to. |
209+
| **Protocol** | Unary RPC. All records and checks sent in a single request. | Client-streaming RPC. Records are streamed per stream. |
207210

208211
::: warning
209212
Metadata must be a valid JSON object, using string keys and string values only.
@@ -212,13 +215,135 @@ KurrentDB's metadata handling. This restriction will be lifted in the next major
212215
release.
213216
:::
214217

218+
### appendRecords
219+
220+
::: note
221+
This feature is only available in KurrentDB 26.1 and later.
222+
:::
223+
224+
`appendRecords` appends events to one or more streams atomically. Each record specifies which stream it targets, and the exact order of records is preserved in the global log across all streams.
225+
226+
#### Single stream
227+
228+
The simplest usage appends events to a single stream:
229+
230+
```ts
231+
import { jsonEvent, STREAM_STATE, NO_STREAM } from "@kurrent/kurrentdb-client";
232+
import { v4 as uuid } from "uuid";
233+
234+
const records = [
235+
{
236+
streamName: "order-123",
237+
record: jsonEvent({
238+
id: uuid(),
239+
type: "OrderPlaced",
240+
data: { orderId: "123", amount: 99.99 },
241+
}),
242+
},
243+
{
244+
streamName: "order-123",
245+
record: jsonEvent({
246+
id: uuid(),
247+
type: "OrderShipped",
248+
data: { orderId: "123" },
249+
}),
250+
},
251+
];
252+
253+
await client.appendRecords(records);
254+
```
255+
256+
You can also pass consistency checks for optimistic concurrency:
257+
258+
```ts
259+
await client.appendRecords(records, [
260+
{ type: STREAM_STATE, streamName: "order-123", expectedState: NO_STREAM },
261+
]);
262+
```
263+
264+
#### Multiple streams
265+
266+
Records can target different streams and be interleaved freely. The global log preserves the exact order you specify:
267+
268+
```ts
269+
const records = [
270+
{
271+
streamName: "order-stream",
272+
record: jsonEvent({
273+
id: uuid(),
274+
type: "OrderCreated",
275+
data: { orderId: "123" },
276+
}),
277+
},
278+
{
279+
streamName: "inventory-stream",
280+
record: jsonEvent({
281+
id: uuid(),
282+
type: "ItemReserved",
283+
data: { itemId: "abc", quantity: 2 },
284+
}),
285+
},
286+
{
287+
streamName: "order-stream",
288+
record: jsonEvent({
289+
id: uuid(),
290+
type: "OrderConfirmed",
291+
data: { orderId: "123" },
292+
}),
293+
},
294+
];
295+
296+
await client.appendRecords(records);
297+
```
298+
299+
#### Consistency checks
300+
301+
Consistency checks let you validate the state of any stream, including streams you are not writing to, before the append is committed. All checks are evaluated atomically: if any check fails, the entire operation is rejected and an `AppendConsistencyViolationError` is thrown with details about every failing check and the actual state observed.
302+
303+
```ts
304+
import { STREAM_STATE, STREAM_EXISTS } from "@kurrent/kurrentdb-client";
305+
306+
const records = [
307+
{
308+
streamName: "order-stream",
309+
record: jsonEvent({
310+
id: uuid(),
311+
type: "OrderConfirmed",
312+
data: { orderId: "123" },
313+
}),
314+
},
315+
];
316+
317+
const checks = [
318+
// ensure the inventory stream exists before confirming the order,
319+
// even though we are not writing to it
320+
{
321+
type: STREAM_STATE,
322+
streamName: "inventory-stream",
323+
expectedState: STREAM_EXISTS,
324+
},
325+
];
326+
327+
await client.appendRecords(records, checks);
328+
```
329+
330+
This decoupling of checks from writes enables [Dynamic Consistency Boundary](https://www.eventstore.com/blog/dynamic-consistency-boundary) patterns, where a business decision depends on the state of multiple streams but the resulting event is written to only one of them.
331+
332+
### multiStreamAppend
333+
334+
::: note
335+
This feature is only available in KurrentDB 25.1 and later.
336+
:::
337+
338+
`multiStreamAppend` appends events to one or more streams atomically. Records are grouped per stream using `AppendStreamRequest`, where each request specifies a stream name, an expected state, and the events for that stream.
339+
215340
```ts
216341
import { jsonEvent } from "@kurrent/kurrentdb-client";
217342
import { v4 as uuid } from "uuid";
218343

219344
const metadata = {
220345
source: "OrderProcessingSystem",
221-
version: "1.0"
346+
version: "1.0",
222347
};
223348

224349
const requests = [
@@ -229,30 +354,26 @@ const requests = [
229354
jsonEvent({
230355
id: uuid(),
231356
type: "OrderCreated",
232-
data: {
233-
orderId: "12345",
234-
amount: 99.99
235-
},
236-
metadata
237-
})
238-
]
357+
data: { orderId: "12345", amount: 99.99 },
358+
metadata,
359+
}),
360+
],
239361
},
240362
{
241-
streamName: "inventory-stream-1",
363+
streamName: "inventory-stream-1",
242364
expectedState: "any",
243365
events: [
244366
jsonEvent({
245367
id: uuid(),
246368
type: "ItemReserved",
247-
data: {
248-
itemId: "ABC123",
249-
quantity: 2
250-
},
251-
metadata
252-
})
253-
]
254-
}
369+
data: { itemId: "ABC123", quantity: 2 },
370+
metadata,
371+
}),
372+
],
373+
},
255374
];
256375

257376
await client.multiStreamAppend(requests);
258-
```
377+
```
378+
379+
Each stream can only appear once in the request. The expected state is validated per stream before the transaction is committed.

packages/db-client/generated/kurrentdb/protocols/v2/streams/errors_pb.d.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,90 @@ export namespace StreamAlreadyInAppendSessionErrorDetails {
185185
}
186186
}
187187

188+
export class AppendConsistencyViolationErrorDetails extends jspb.Message {
189+
clearViolationsList(): void;
190+
getViolationsList(): Array<ConsistencyViolation>;
191+
setViolationsList(value: Array<ConsistencyViolation>): AppendConsistencyViolationErrorDetails;
192+
addViolations(value?: ConsistencyViolation, index?: number): ConsistencyViolation;
193+
194+
serializeBinary(): Uint8Array;
195+
toObject(includeInstance?: boolean): AppendConsistencyViolationErrorDetails.AsObject;
196+
static toObject(includeInstance: boolean, msg: AppendConsistencyViolationErrorDetails): AppendConsistencyViolationErrorDetails.AsObject;
197+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
198+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
199+
static serializeBinaryToWriter(message: AppendConsistencyViolationErrorDetails, writer: jspb.BinaryWriter): void;
200+
static deserializeBinary(bytes: Uint8Array): AppendConsistencyViolationErrorDetails;
201+
static deserializeBinaryFromReader(message: AppendConsistencyViolationErrorDetails, reader: jspb.BinaryReader): AppendConsistencyViolationErrorDetails;
202+
}
203+
204+
export namespace AppendConsistencyViolationErrorDetails {
205+
export type AsObject = {
206+
violationsList: Array<ConsistencyViolation.AsObject>,
207+
}
208+
}
209+
210+
export class ConsistencyViolation extends jspb.Message {
211+
getCheckIndex(): number;
212+
setCheckIndex(value: number): ConsistencyViolation;
213+
214+
hasStreamState(): boolean;
215+
clearStreamState(): void;
216+
getStreamState(): ConsistencyViolation.StreamStateViolation | undefined;
217+
setStreamState(value?: ConsistencyViolation.StreamStateViolation): ConsistencyViolation;
218+
219+
getTypeCase(): ConsistencyViolation.TypeCase;
220+
221+
serializeBinary(): Uint8Array;
222+
toObject(includeInstance?: boolean): ConsistencyViolation.AsObject;
223+
static toObject(includeInstance: boolean, msg: ConsistencyViolation): ConsistencyViolation.AsObject;
224+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
225+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
226+
static serializeBinaryToWriter(message: ConsistencyViolation, writer: jspb.BinaryWriter): void;
227+
static deserializeBinary(bytes: Uint8Array): ConsistencyViolation;
228+
static deserializeBinaryFromReader(message: ConsistencyViolation, reader: jspb.BinaryReader): ConsistencyViolation;
229+
}
230+
231+
export namespace ConsistencyViolation {
232+
export type AsObject = {
233+
checkIndex: number,
234+
streamState?: ConsistencyViolation.StreamStateViolation.AsObject,
235+
}
236+
237+
238+
export class StreamStateViolation extends jspb.Message {
239+
getStream(): string;
240+
setStream(value: string): StreamStateViolation;
241+
getExpectedState(): string;
242+
setExpectedState(value: string): StreamStateViolation;
243+
getActualState(): string;
244+
setActualState(value: string): StreamStateViolation;
245+
246+
serializeBinary(): Uint8Array;
247+
toObject(includeInstance?: boolean): StreamStateViolation.AsObject;
248+
static toObject(includeInstance: boolean, msg: StreamStateViolation): StreamStateViolation.AsObject;
249+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
250+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
251+
static serializeBinaryToWriter(message: StreamStateViolation, writer: jspb.BinaryWriter): void;
252+
static deserializeBinary(bytes: Uint8Array): StreamStateViolation;
253+
static deserializeBinaryFromReader(message: StreamStateViolation, reader: jspb.BinaryReader): StreamStateViolation;
254+
}
255+
256+
export namespace StreamStateViolation {
257+
export type AsObject = {
258+
stream: string,
259+
expectedState: string,
260+
actualState: string,
261+
}
262+
}
263+
264+
265+
export enum TypeCase {
266+
TYPE_NOT_SET = 0,
267+
STREAM_STATE = 2,
268+
}
269+
270+
}
271+
188272
export enum StreamsError {
189273
STREAMS_ERROR_UNSPECIFIED = 0,
190274
STREAMS_ERROR_STREAM_NOT_FOUND = 1,
@@ -196,4 +280,5 @@ export enum StreamsError {
196280
STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7,
197281
STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8,
198282
STREAMS_ERROR_APPEND_SESSION_NO_REQUESTS = 9,
283+
STREAMS_ERROR_APPEND_CONSISTENCY_VIOLATION = 14,
199284
}

0 commit comments

Comments
 (0)