Skip to content

Commit 683c4cc

Browse files
committed
Set status code on the span for failures
1 parent e39fa9d commit 683c4cc

File tree

2 files changed

+102
-3
lines changed

2 files changed

+102
-3
lines changed

packages/opentelemetry/src/instrumentation.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,15 +267,42 @@ export class Instrumentation extends InstrumentationBase {
267267

268268
try {
269269
const result = await original.apply(this, [requests]);
270+
271+
const requestEndTime: TimeInput = Date.now();
272+
273+
if (result.success) {
274+
requestSpans.forEach((span) => span.end(requestEndTime));
275+
} else {
276+
const failures: kurrentdb.AppendStreamFailure[] = result.output;
277+
const failedStreamNames = new Set(
278+
failures.map((f) => f.streamName)
279+
);
280+
281+
requestSpans.forEach((span, index) => {
282+
const request = requests[index];
283+
let errorMessage = "This request did not go through successfully";
284+
285+
if (failedStreamNames.has(request.streamName)) {
286+
const details = failures.find(
287+
(f) => f.streamName === request.streamName
288+
)!.details;
289+
errorMessage = details.type;
290+
}
291+
292+
span.setStatus({
293+
code: SpanStatusCode.ERROR,
294+
message: errorMessage,
295+
});
296+
span.end(requestEndTime);
297+
});
298+
}
299+
270300
return result;
271301
} catch (error) {
272302
requestSpans.forEach((span) => {
273303
Instrumentation.handleError(error, span);
274304
});
275305
throw error;
276-
} finally {
277-
const requestEndTime: TimeInput = Date.now();
278-
requestSpans.forEach((span) => span.end(requestEndTime));
279306
}
280307
};
281308
};

packages/test/src/samples/opentelemetry.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
import * as kurrentdb from "@kurrent/kurrentdb-client";
3030
import { KurrentAttributes } from "@kurrent/opentelemetry/src/attributes";
3131
import { v4 } from "uuid";
32+
import { SpanStatusCode } from "@opentelemetry/api";
3233

3334
const memoryExporter = new InMemorySpanExporter();
3435
const otlpExporter = new OTLPTraceExporter({ url: "http://localhost:4317" }); // change this to your OTLP receiver address
@@ -214,5 +215,76 @@ describe("[sample] opentelemetry", () => {
214215
[KurrentAttributes.DATABASE_OPERATION]: "multiStreamAppend",
215216
});
216217
});
218+
219+
test("does not trace if result contains failures", async () => {
220+
// Arrange
221+
const defer = new Defer();
222+
223+
const { KurrentDBClient, jsonEvent } = await import(
224+
"@kurrent/kurrentdb-client"
225+
);
226+
227+
const client = KurrentDBClient.connectionString(node.connectionString());
228+
229+
const firstOrderReq: kurrentdb.AppendStreamRequest = {
230+
streamName: `order-${v4()}`,
231+
events: [
232+
jsonEvent({
233+
type: "OrderPlaced",
234+
data: { id: v4() },
235+
}),
236+
jsonEvent({
237+
type: "PaymentProcessed",
238+
data: { id: v4() },
239+
}),
240+
],
241+
expectedState: kurrentdb.ANY,
242+
};
243+
244+
const secondOrderReq: kurrentdb.AppendStreamRequest = {
245+
streamName: `order-${v4()}`,
246+
events: [
247+
jsonEvent({
248+
type: "OrderPlaced",
249+
data: { customerId: "cust-456" },
250+
}),
251+
],
252+
expectedState: kurrentdb.STREAM_EXISTS,
253+
};
254+
255+
// Act
256+
const appendResponse = await client.multiStreamAppend([
257+
firstOrderReq,
258+
secondOrderReq,
259+
]);
260+
261+
expect(appendResponse.success).toBeFalsy();
262+
263+
// Assert
264+
const firstOrderAppendSpans = getSpans(
265+
KurrentAttributes.STREAM_APPEND,
266+
firstOrderReq.streamName
267+
);
268+
const secondOrderAppendSpans = getSpans(
269+
KurrentAttributes.STREAM_APPEND,
270+
secondOrderReq.streamName
271+
);
272+
273+
expect(firstOrderAppendSpans).toHaveLength(1);
274+
expect(secondOrderAppendSpans).toHaveLength(1);
275+
276+
expect(firstOrderAppendSpans[0].attributes).toMatchObject({
277+
[KurrentAttributes.KURRENT_DB_STREAM]: firstOrderReq.streamName,
278+
[KurrentAttributes.SERVER_ADDRESS]: node.endpoints[0].address,
279+
[KurrentAttributes.SERVER_PORT]: node.endpoints[0].port.toString(),
280+
[KurrentAttributes.DATABASE_SYSTEM]: moduleName,
281+
[KurrentAttributes.DATABASE_OPERATION]: "multiStreamAppend",
282+
});
283+
284+
expect(secondOrderAppendSpans[0].status.code).toBe(SpanStatusCode.ERROR);
285+
expect(secondOrderAppendSpans[0].status.message).toBe(
286+
"wrong_expected_revision"
287+
);
288+
});
217289
});
218290
});

0 commit comments

Comments
 (0)