diff --git a/packages/opentelemetry/src/attributes.ts b/packages/opentelemetry/src/attributes.ts index 65ae659f..a335a80a 100644 --- a/packages/opentelemetry/src/attributes.ts +++ b/packages/opentelemetry/src/attributes.ts @@ -12,6 +12,7 @@ export const KurrentAttributes = { SERVER_PORT: `${server}.port`, STREAM_APPEND: `${streams}.append`, + STREAM_MULTI_APPEND: `${streams}.multi-append`, STREAM_SUBSCRIBE: `${streams}.subscribe`, KURRENT_DB_STREAM: `${kurrentdb}.stream`, diff --git a/packages/opentelemetry/src/instrumentation.ts b/packages/opentelemetry/src/instrumentation.ts index db8cc1ca..ad041b8a 100644 --- a/packages/opentelemetry/src/instrumentation.ts +++ b/packages/opentelemetry/src/instrumentation.ts @@ -8,6 +8,7 @@ import { Span, SpanKind, SpanStatusCode, + TimeInput, trace, TraceFlags, Tracer, @@ -24,6 +25,7 @@ import type { EventData, EventType, JSONEventType, + MultiAppendResult, ResolvedEvent, SubscribeToAllOptions, SubscribeToPersistentSubscriptionToAllOptions, @@ -38,6 +40,7 @@ import type { Subscription } from "@kurrent/kurrentdb-client/src/streams/utils/S import { INSTRUMENTATION_NAME, INSTRUMENTATION_VERSION } from "./version"; import type { AppendToStreamParams, + MultiStreamAppendParams, PersistentSubscribeParameters, SubscribeParameters, } from "./types"; @@ -67,6 +70,11 @@ export class Instrumentation extends InstrumentationBase { "appendToStream", this._patchAppendToStream() ); + this.wrap( + moduleExports.KurrentDBClient.prototype, + "multiStreamAppend", + this._patchMultiStreamAppend() + ); this.wrap( moduleExports.KurrentDBClient.prototype, "subscribeToStream", @@ -106,6 +114,10 @@ export class Instrumentation extends InstrumentationBase { this._diag.debug("un-patching"); this._unwrap(moduleExports.KurrentDBClient.prototype, "appendToStream"); + this._unwrap( + moduleExports.KurrentDBClient.prototype, + "multiStreamAppend" + ); this._unwrap( moduleExports.KurrentDBClient.prototype, "subscribeToStream" @@ -196,6 +208,118 @@ export class Instrumentation extends InstrumentationBase { }; } + private _patchMultiStreamAppend(): ( + original: Function, + operation: keyof kurrentdb.KurrentDBClient + ) => (...args: MultiStreamAppendParams) => Promise { + const instrumentation = this; + const tracer = instrumentation.tracer; + + return function multiStreamAppend( + original: Function, + operation: keyof kurrentdb.KurrentDBClient + ) { + return async function ( + this: kurrentdb.KurrentDBClient, + ...args: MultiStreamAppendParams + ): Promise { + const [requests] = [...args]; + + const uri = await this.resolveUri(); + const { hostname, port } = Instrumentation.getServerAddress(uri); + + const requestStartTime: TimeInput = Date.now(); + + const span = tracer.startSpan(KurrentAttributes.STREAM_MULTI_APPEND, { + kind: SpanKind.CLIENT, + startTime: requestStartTime, + attributes: { + [KurrentAttributes.SERVER_ADDRESS]: hostname, + [KurrentAttributes.SERVER_PORT]: port, + [KurrentAttributes.DATABASE_SYSTEM]: INSTRUMENTATION_NAME, + [KurrentAttributes.DATABASE_OPERATION]: operation, + }, + }); + + requests.forEach((request) => { + const traceId = span.spanContext().traceId; + const spanId = span.spanContext().spanId; + + request.events.forEach((event) => { + const metadata = (event.metadata = event.metadata || {}); + if (isJSONEventData(event) && typeof metadata === "object") { + event.metadata = { + ...metadata, + [TRACE_ID]: traceId, + [SPAN_ID]: spanId, + }; + } + }); + }); + + try { + const result = await original.apply(this, [requests]); + + const requestEndTime: TimeInput = Date.now(); + + if (!result.success) { + const failures: kurrentdb.AppendStreamFailure[] = result.output; + + span.setStatus({ + code: SpanStatusCode.ERROR, + }); + + failures.forEach((failure) => { + switch (failure.details.type) { + case "wrong_expected_revision": + span.addEvent("exception", { + "exception.type": "wrong_expected_revision", + "exception.revision": + failure.details.revision.toLocaleString(), + }); + break; + + case "access_denied": + span.addEvent("exception", { + "exception.type": failure.details.type, + "exception.message": failure.details.reason, + }); + break; + + case "stream_deleted": + span.addEvent("exception", { + "exception.type": failure.details.type, + }); + break; + + case "transaction_max_size_exceeded": + span.addEvent("exception", { + "exception.type": failure.details.type, + "exception.max_size": + failure.details.maxSize.toLocaleString(), + }); + break; + + case "unknown": + span.addEvent("exception", { + "exception.type": "unknown", + }); + break; + } + }); + } + + span.end(requestEndTime); + + return result; + } catch (error) { + Instrumentation.handleError(error, span); + throw error; + } + }; + }; + } + static applySubscriptionInstrumentation( spanName: string, subscription: diff --git a/packages/opentelemetry/src/types.ts b/packages/opentelemetry/src/types.ts index bff83aa6..eee21684 100644 --- a/packages/opentelemetry/src/types.ts +++ b/packages/opentelemetry/src/types.ts @@ -23,3 +23,7 @@ export type PersistentSubscribeParameters = export type AppendToStreamParams = Parameters< kdb.KurrentDBClient["appendToStream"] >; + +export type MultiStreamAppendParams = Parameters< + kdb.KurrentDBClient["multiStreamAppend"] +>; diff --git a/packages/test/package.json b/packages/test/package.json index 4aa4df2f..565e7d58 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -37,6 +37,7 @@ "@opentelemetry/api": "^1.9.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.51.1", "@opentelemetry/instrumentation": "^0.56.0", + "@opentelemetry/resources": "^2.0.1", "@opentelemetry/sdk-trace-node": "^1.30.0", "@opentelemetry/semantic-conventions": "^1.28.0", "@types/debug": "^4.1.12", diff --git a/packages/test/src/opentelemetry/instrumentation.test.ts b/packages/test/src/opentelemetry/instrumentation.test.ts index 83a73c24..5e4fe3cc 100644 --- a/packages/test/src/opentelemetry/instrumentation.test.ts +++ b/packages/test/src/opentelemetry/instrumentation.test.ts @@ -1,3 +1,5 @@ +/** @jest-environment ./src/utils/enableVersionCheck.ts */ + import { createTestNode, Defer, delay, jsonTestEvents } from "@test-utils"; import { NodeTracerProvider, diff --git a/packages/test/src/samples/opentelemetry.ts b/packages/test/src/samples/opentelemetry.ts index d270fde9..700fa467 100644 --- a/packages/test/src/samples/opentelemetry.ts +++ b/packages/test/src/samples/opentelemetry.ts @@ -1,4 +1,10 @@ -// region import-required-packages +/** @jest-environment ./src/utils/enableVersionCheck.ts */ + +/** + * Download and start aspire dashboard from https://aspiredashboard.com/ + * You can also use Jaeger or any other OpenTelemetry compatible dashboard. + */ + import { InMemorySpanExporter, NodeTracerProvider, @@ -8,15 +14,39 @@ import { import { registerInstrumentations } from "@opentelemetry/instrumentation"; import { KurrentDBInstrumentation } from "@kurrent/opentelemetry"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"; -import {} from "@opentelemetry/sdk-trace-node"; -// endregion import-required-packages -import { createTestNode } from "@test-utils"; -import { KurrentDBClient } from "@kurrent/kurrentdb-client"; +import { resourceFromAttributes } from "@opentelemetry/resources"; +import { + ATTR_SERVICE_NAME, + ATTR_SERVICE_VERSION, +} from "@opentelemetry/semantic-conventions"; +import { + createTestNode, + Defer, + matchServerVersion, + optionalDescribe, +} from "@test-utils"; -import * as esdb from "@kurrent/kurrentdb-client"; +import * as kurrentdb from "@kurrent/kurrentdb-client"; +import { KurrentAttributes } from "@kurrent/opentelemetry/src/attributes"; +import { v4 } from "uuid"; +import { SpanStatusCode } from "@opentelemetry/api"; +import { multiStreamAppend } from "@kurrent/kurrentdb-client/src/streams/appendToStream/multiStreamAppend"; -// region register-instrumentation -const provider = new NodeTracerProvider(); +const memoryExporter = new InMemorySpanExporter(); +const otlpExporter = new OTLPTraceExporter({ url: "http://localhost:4317" }); // change this to your OTLP receiver address +const consoleExporter = new ConsoleSpanExporter(); + +const provider = new NodeTracerProvider({ + resource: resourceFromAttributes({ + [ATTR_SERVICE_NAME]: "kurrentdb", + [ATTR_SERVICE_VERSION]: "1.0.0", + }), + spanProcessors: [ + new SimpleSpanProcessor(memoryExporter), + // new SimpleSpanProcessor(consoleExporter), + new SimpleSpanProcessor(otlpExporter), + ], +}); const instrumentation = new KurrentDBInstrumentation(); @@ -24,30 +54,22 @@ registerInstrumentations({ instrumentations: [instrumentation], tracerProvider: provider, }); -// endregion register-instrumentation instrumentation.disable(); +const getSpans = (name: string) => { + return memoryExporter.getFinishedSpans().filter((span) => span.name === name); +}; + describe("[sample] opentelemetry", () => { const node = createTestNode(); - let client!: KurrentDBClient; - - // region setup-exporter - const memoryExporter = new InMemorySpanExporter(); - const otlpExporter = new OTLPTraceExporter({ url: "http://localhost:4317" }); // change this to your OTLP receiver address - const consoleExporter = new ConsoleSpanExporter(); - - provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); - provider.addSpanProcessor(new SimpleSpanProcessor(consoleExporter)); - provider.addSpanProcessor(new SimpleSpanProcessor(otlpExporter)); - // endregion setup-exporter + const moduleName = "@kurrent/opentelemetry"; // @ts-expect-error the moduleExports property is private. This is needed to make the test work with auto-mocking - instrumentation._modules[0].moduleExports = esdb; + instrumentation._modules[0].moduleExports = kurrentdb; beforeAll(async () => { await node.up(); - client = KurrentDBClient.connectionString(node.connectionString()); }); beforeAll(async () => { @@ -64,33 +86,174 @@ describe("[sample] opentelemetry", () => { memoryExporter.reset(); }); - test("tracing", async () => { - // region setup-client-for-tracing - const { KurrentDBClient, jsonEvent } = await import( - "@kurrent/kurrentdb-client" - ); - - const client = KurrentDBClient.connectionString(node.connectionString()); - // endregion setup-client-for-tracing - - const response = await client.appendToStream( - "some-stream", - jsonEvent({ - type: "OrderPlaced", - data: { - orderId: "1337", - orderValue: 123.45, - }, - }), - { - streamState: "any", - } - ); - - expect(response).toBeDefined(); - - const memorySpans = memoryExporter.getFinishedSpans(); - - expect(memorySpans.length).toBe(1); + optionalDescribe(matchServerVersion`>=25.0`)("multistream append", () => { + test("append then subscribe", async () => { + // Arrange + const defer = new Defer(); + + const { KurrentDBClient, jsonEvent } = await import( + "@kurrent/kurrentdb-client" + ); + + const handleError = jest.fn((error) => { + defer.reject(error); + }); + const handleClose = jest.fn(); + const handleEvent = jest.fn((event: kurrentdb.ResolvedEvent) => { + expect(event).toBeDefined(); + }); + const handleCaughtUp = jest.fn(async () => { + try { + expect(handleEvent).toHaveBeenCalledTimes(3); + await subscription.unsubscribe(); + } catch (error) { + defer.reject(error); + } + }); + const handleEnd = jest.fn(defer.resolve); + const handleConfirmation = jest.fn(); + + const client = KurrentDBClient.connectionString(node.connectionString()); + + const firstOrderReq: kurrentdb.AppendStreamRequest = { + streamName: `order-${v4()}`, + events: [ + jsonEvent({ + type: "OrderPlaced", + data: { id: v4() }, + }), + jsonEvent({ + type: "PaymentProcessed", + data: { id: v4() }, + }), + ], + expectedState: kurrentdb.ANY, + }; + + const secondOrderReq: kurrentdb.AppendStreamRequest = { + streamName: `order-${v4()}`, + events: [ + jsonEvent({ + type: "OrderPlaced", + data: { customerId: "cust-456" }, + }), + ], + expectedState: kurrentdb.ANY, + }; + + // Act + const appendResponse = await client.multiStreamAppend([ + firstOrderReq, + secondOrderReq, + ]); + + expect(appendResponse.success).toBeTruthy(); + + const subscription = client + .subscribeToAll({ + filter: kurrentdb.streamNameFilter({ + prefixes: ["order-"], + }), + }) + .on("error", handleError) + .on("data", handleEvent) + .on("close", handleClose) + .on("confirmation", handleConfirmation) + .on("caughtUp", handleCaughtUp) + .on("end", handleEnd); + + await defer.promise; + + // Assert + expect(handleError).not.toHaveBeenCalled(); + expect(handleConfirmation).toHaveBeenCalledTimes(1); + expect(handleEvent).toHaveBeenCalledTimes(3); + expect(handleCaughtUp).toHaveBeenCalled(); + + const appendSpans = getSpans(KurrentAttributes.STREAM_MULTI_APPEND); + const subscribeSpans = getSpans(KurrentAttributes.STREAM_SUBSCRIBE); + + expect(appendSpans).toHaveLength(1); + expect(subscribeSpans).toHaveLength(3); + + expect(subscribeSpans[0].parentSpanId).toBe( + appendSpans[0].spanContext().spanId + ); + expect(subscribeSpans[1].parentSpanId).toBe( + appendSpans[0].spanContext().spanId + ); + + expect(appendSpans[0].attributes).toMatchObject({ + [KurrentAttributes.SERVER_ADDRESS]: node.endpoints[0].address, + [KurrentAttributes.SERVER_PORT]: node.endpoints[0].port.toString(), + [KurrentAttributes.DATABASE_SYSTEM]: moduleName, + [KurrentAttributes.DATABASE_OPERATION]: multiStreamAppend.name, + }); + }); + + test("append with failures", async () => { + // Arrange + const defer = new Defer(); + + const { KurrentDBClient, jsonEvent } = await import( + "@kurrent/kurrentdb-client" + ); + + const client = KurrentDBClient.connectionString(node.connectionString()); + + const firstOrderReq: kurrentdb.AppendStreamRequest = { + streamName: `order-${v4()}`, + events: [ + jsonEvent({ + type: "OrderPlaced", + data: { id: v4() }, + }), + jsonEvent({ + type: "PaymentProcessed", + data: { id: v4() }, + }), + ], + expectedState: kurrentdb.ANY, + }; + + const secondOrderReq: kurrentdb.AppendStreamRequest = { + streamName: `order-${v4()}`, + events: [ + jsonEvent({ + type: "OrderPlaced", + data: { customerId: "cust-456" }, + }), + ], + expectedState: kurrentdb.STREAM_EXISTS, + }; + + // Act + const appendResponse = await client.multiStreamAppend([ + firstOrderReq, + secondOrderReq, + ]); + + expect(appendResponse.success).toBeFalsy(); + + // Assert + const appendSpans = getSpans(KurrentAttributes.STREAM_MULTI_APPEND); + + expect(appendSpans).toHaveLength(1); + + expect(appendSpans[0].attributes).toMatchObject({ + [KurrentAttributes.SERVER_ADDRESS]: node.endpoints[0].address, + [KurrentAttributes.SERVER_PORT]: node.endpoints[0].port.toString(), + [KurrentAttributes.DATABASE_SYSTEM]: moduleName, + [KurrentAttributes.DATABASE_OPERATION]: multiStreamAppend.name, + }); + + expect(appendSpans[0].status.code).toBe(SpanStatusCode.ERROR); + expect(appendSpans[0].events.length).toBe(1); + expect(appendSpans[0].events[0].name).toBe("exception"); + expect(appendSpans[0].events[0].attributes).toMatchObject({ + "exception.type": "wrong_expected_revision", + "exception.revision": "-1", + }); + }); }); }); diff --git a/packages/test/src/utils/dockerImages.ts b/packages/test/src/utils/dockerImages.ts index e08854f1..f266a366 100644 --- a/packages/test/src/utils/dockerImages.ts +++ b/packages/test/src/utils/dockerImages.ts @@ -1,7 +1,7 @@ const kdbImage = ((): string => { const image = process.env.KURRENT_IMAGE ?? - "docker.kurrent.io/eventstore/eventstoredb-ee:lts"; + "docker.cloudsmith.io/eventstore/kurrent-staging/kurrentdb:ci"; return image; })(); diff --git a/yarn.lock b/yarn.lock index 957a2750..4de7f3ca 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1394,6 +1394,17 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/core@npm:2.0.1": + version: 2.0.1 + resolution: "@opentelemetry/core@npm:2.0.1" + dependencies: + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/d587b1289559757d80da98039f9f57612f84f72ec608cd665dc467c7c6c5ce3a987dfcc2c63b521c7c86ce984a2552b3ead15a0dc458de1cf6bde5cdfe4ca9d8 + languageName: node + linkType: hard + "@opentelemetry/exporter-trace-otlp-grpc@npm:^0.51.1": version: 0.51.1 resolution: "@opentelemetry/exporter-trace-otlp-grpc@npm:0.51.1" @@ -1513,6 +1524,18 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/resources@npm:^2.0.1": + version: 2.0.1 + resolution: "@opentelemetry/resources@npm:2.0.1" + dependencies: + "@opentelemetry/core": "npm:2.0.1" + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.3.0 <1.10.0" + checksum: 10c0/96532b7553b26607a7a892d72f6b03ad12bd542dc23c95135a8ae40362da9c883c21a4cff3d2296d9e0e9bd899a5977e325ed52d83142621a8ffe81d08d99341 + languageName: node + linkType: hard + "@opentelemetry/sdk-logs@npm:0.51.1": version: 0.51.1 resolution: "@opentelemetry/sdk-logs@npm:0.51.1" @@ -1602,6 +1625,13 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/semantic-conventions@npm:^1.29.0": + version: 1.36.0 + resolution: "@opentelemetry/semantic-conventions@npm:1.36.0" + checksum: 10c0/edc8a6fe3ec4fc0c67ba3a92b86fb3dcc78fe1eb4f19838d8013c3232b9868540a034dd25cfe0afdd5eae752c5f0e9f42272ff46da144a2d5b35c644478e1c62 + languageName: node + linkType: hard + "@pkgjs/parseargs@npm:^0.11.0": version: 0.11.0 resolution: "@pkgjs/parseargs@npm:0.11.0" @@ -10365,6 +10395,7 @@ __metadata: "@opentelemetry/api": "npm:^1.9.0" "@opentelemetry/exporter-trace-otlp-grpc": "npm:^0.51.1" "@opentelemetry/instrumentation": "npm:^0.56.0" + "@opentelemetry/resources": "npm:^2.0.1" "@opentelemetry/sdk-trace-node": "npm:^1.30.0" "@opentelemetry/semantic-conventions": "npm:^1.28.0" "@types/debug": "npm:^4.1.12"