Skip to content

Commit 95680b4

Browse files
committed
Set status code on the span for failures
1 parent e39fa9d commit 95680b4

2 files changed

Lines changed: 105 additions & 5 deletions

File tree

packages/opentelemetry/src/instrumentation.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ export class Instrumentation extends InstrumentationBase {
243243
[KurrentAttributes.SERVER_ADDRESS]: hostname,
244244
[KurrentAttributes.SERVER_PORT]: port,
245245
[KurrentAttributes.DATABASE_SYSTEM]: INSTRUMENTATION_NAME,
246-
[KurrentAttributes.DATABASE_OPERATION]: operation,
246+
[KurrentAttributes.DATABASE_OPERATION]:
247+
KurrentAttributes.STREAM_APPEND,
247248
},
248249
}
249250
);
@@ -267,15 +268,42 @@ export class Instrumentation extends InstrumentationBase {
267268

268269
try {
269270
const result = await original.apply(this, [requests]);
271+
272+
const requestEndTime: TimeInput = Date.now();
273+
274+
if (result.success) {
275+
requestSpans.forEach((span) => span.end(requestEndTime));
276+
} else {
277+
const failures: kurrentdb.AppendStreamFailure[] = result.output;
278+
const failedStreamNames = new Set(
279+
failures.map((f) => f.streamName)
280+
);
281+
282+
requestSpans.forEach((span, index) => {
283+
const request = requests[index];
284+
let errorMessage = "";
285+
286+
if (failedStreamNames.has(request.streamName)) {
287+
const details = failures.find(
288+
(f) => f.streamName === request.streamName
289+
)!.details;
290+
errorMessage = details.type;
291+
}
292+
293+
span.setStatus({
294+
code: SpanStatusCode.ERROR,
295+
message: errorMessage,
296+
});
297+
span.end(requestEndTime);
298+
});
299+
}
300+
270301
return result;
271302
} catch (error) {
272303
requestSpans.forEach((span) => {
273304
Instrumentation.handleError(error, span);
274305
});
275306
throw error;
276-
} finally {
277-
const requestEndTime: TimeInput = Date.now();
278-
requestSpans.forEach((span) => span.end(requestEndTime));
279307
}
280308
};
281309
};

packages/test/src/samples/opentelemetry.ts

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
import * as kurrentdb from "@kurrent/kurrentdb-client";
3030
import { KurrentAttributes } from "@kurrent/opentelemetry/src/attributes";
3131
import { v4 } from "uuid";
32+
import { SpanStatusCode } from "@opentelemetry/api";
3233

3334
const memoryExporter = new InMemorySpanExporter();
3435
const otlpExporter = new OTLPTraceExporter({ url: "http://localhost:4317" }); // change this to your OTLP receiver address
@@ -211,8 +212,79 @@ describe("[sample] opentelemetry", () => {
211212
[KurrentAttributes.SERVER_ADDRESS]: node.endpoints[0].address,
212213
[KurrentAttributes.SERVER_PORT]: node.endpoints[0].port.toString(),
213214
[KurrentAttributes.DATABASE_SYSTEM]: moduleName,
214-
[KurrentAttributes.DATABASE_OPERATION]: "multiStreamAppend",
215+
[KurrentAttributes.DATABASE_OPERATION]: KurrentAttributes.STREAM_APPEND,
215216
});
216217
});
218+
219+
test("does not trace if result contains failures", async () => {
220+
// Arrange
221+
const defer = new Defer();
222+
223+
const { KurrentDBClient, jsonEvent } = await import(
224+
"@kurrent/kurrentdb-client"
225+
);
226+
227+
const client = KurrentDBClient.connectionString(node.connectionString());
228+
229+
const firstOrderReq: kurrentdb.AppendStreamRequest = {
230+
streamName: `order-${v4()}`,
231+
events: [
232+
jsonEvent({
233+
type: "OrderPlaced",
234+
data: { id: v4() },
235+
}),
236+
jsonEvent({
237+
type: "PaymentProcessed",
238+
data: { id: v4() },
239+
}),
240+
],
241+
expectedState: kurrentdb.ANY,
242+
};
243+
244+
const secondOrderReq: kurrentdb.AppendStreamRequest = {
245+
streamName: `order-${v4()}`,
246+
events: [
247+
jsonEvent({
248+
type: "OrderPlaced",
249+
data: { customerId: "cust-456" },
250+
}),
251+
],
252+
expectedState: kurrentdb.STREAM_EXISTS,
253+
};
254+
255+
// Act
256+
const appendResponse = await client.multiStreamAppend([
257+
firstOrderReq,
258+
secondOrderReq,
259+
]);
260+
261+
expect(appendResponse.success).toBeFalsy();
262+
263+
// Assert
264+
const firstOrderAppendSpans = getSpans(
265+
KurrentAttributes.STREAM_APPEND,
266+
firstOrderReq.streamName
267+
);
268+
const secondOrderAppendSpans = getSpans(
269+
KurrentAttributes.STREAM_APPEND,
270+
secondOrderReq.streamName
271+
);
272+
273+
expect(firstOrderAppendSpans).toHaveLength(1);
274+
expect(secondOrderAppendSpans).toHaveLength(1);
275+
276+
expect(firstOrderAppendSpans[0].attributes).toMatchObject({
277+
[KurrentAttributes.KURRENT_DB_STREAM]: firstOrderReq.streamName,
278+
[KurrentAttributes.SERVER_ADDRESS]: node.endpoints[0].address,
279+
[KurrentAttributes.SERVER_PORT]: node.endpoints[0].port.toString(),
280+
[KurrentAttributes.DATABASE_SYSTEM]: moduleName,
281+
[KurrentAttributes.DATABASE_OPERATION]: KurrentAttributes.STREAM_APPEND,
282+
});
283+
284+
expect(secondOrderAppendSpans[0].status.code).toBe(SpanStatusCode.ERROR);
285+
expect(secondOrderAppendSpans[0].status.message).toBe(
286+
"wrong_expected_revision"
287+
);
288+
});
217289
});
218290
});

0 commit comments

Comments
 (0)