Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/interceptors-opentelemetry/src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as otel from '@opentelemetry/api';
import { createTraceState } from '@opentelemetry/api';
import type { Resource } from '@opentelemetry/resources';
import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base';
import type { Context as ActivityContext } from '@temporalio/activity';
Expand Down Expand Up @@ -133,7 +134,15 @@ export function makeWorkflowExporter(
* Deserialize a serialized span created by the Workflow isolate
*/
function extractReadableSpan(serializable: SerializableSpan, resource: Resource): ReadableSpan {
const { spanContext, ...rest } = serializable;
const {
spanContext: { traceState, ...restSpanContext },
...rest
} = serializable;
const spanContext: otel.SpanContext = {
// Reconstruct the TraceState from the serialized string.
traceState: traceState ? createTraceState(traceState) : undefined,
...restSpanContext,
};
return {
spanContext() {
return spanContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ import * as tracing from '@opentelemetry/sdk-trace-base';
import { InstrumentationLibrary } from '@opentelemetry/core'; // eslint-disable deprecation/deprecation
import type { Sink, Sinks } from '@temporalio/workflow';

/**
* Serializable version of SpanContext where traceState is converted to a string.
*/
export type SerializableSpanContext = Omit<otel.SpanContext, 'traceState'> & { traceState?: string };

/**
* Serializable version of the opentelemetry Span for cross isolate copying
*/
export interface SerializableSpan {
readonly name: string;
readonly kind: otel.SpanKind;
readonly spanContext: otel.SpanContext;
readonly spanContext: SerializableSpanContext;
readonly parentSpanId?: string;
readonly startTime: otel.HrTime;
readonly endTime: otel.HrTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as tracing from '@opentelemetry/sdk-trace-base';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { OpenTelemetrySinks, SerializableSpan } from './definitions';
import { OpenTelemetrySinks, SerializableSpan, SerializableSpanContext } from './definitions';
import { proxySinks } from './workflow-imports';

export class SpanExporter implements tracing.SpanExporter {
Expand All @@ -19,10 +19,18 @@ export class SpanExporter implements tracing.SpanExporter {
}

public makeSerializable(span: tracing.ReadableSpan): SerializableSpan {
const { traceState, ...restSpanContext } = span.spanContext();
// Serialize traceState to a string because TraceState objects lose their
// prototype methods when crossing the V8 isolate boundary.
// See: https://github.com/temporalio/sdk-typescript/issues/1738
const serializableSpanContext: SerializableSpanContext = {
traceState: traceState?.serialize(),
...restSpanContext,
};
return {
name: span.name,
kind: span.kind,
spanContext: span.spanContext(),
spanContext: serializableSpanContext,
parentSpanId: span.parentSpanId,
startTime: span.startTime,
endTime: span.endTime,
Expand Down
116 changes: 115 additions & 1 deletion packages/test/src/test-otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
*/
import * as http from 'http';
import * as http2 from 'http2';
import { SpanStatusCode } from '@opentelemetry/api';
import * as otelApi from '@opentelemetry/api';
import { SpanStatusCode, createTraceState } from '@opentelemetry/api';
import { ExportResultCode } from '@opentelemetry/core';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import * as opentelemetry from '@opentelemetry/sdk-node';
Expand Down Expand Up @@ -587,6 +588,119 @@ if (RUN_INTEGRATION_TESTS) {
t.is(updateResult, true);
t.is(workflowResult, true);
});

// Regression test for https://github.com/temporalio/sdk-typescript/issues/1738
test.serial('traceState properly crosses V8 isolate boundary', async (t) => {
const exportErrors: Error[] = [];
// Collect spans exported from workflow (those that crossed the isolate boundary)
const workflowExportedSpans: opentelemetry.tracing.ReadableSpan[] = [];

await withFakeGrpcServer(async (port) => {
const staticResource = new opentelemetry.resources.Resource({
[SEMRESATTRS_SERVICE_NAME]: 'test-tracestate-issue-1738',
});

const traceExporter = new OTLPTraceExporter({ url: `http://127.0.0.1:${port}` });

// Wrap the exporter to catch errors
const wrappedExporter: opentelemetry.tracing.SpanExporter = {
export(spans, resultCallback) {
traceExporter.export(spans, (result) => {
if (result.code === ExportResultCode.FAILED && result.error) {
exportErrors.push(result.error);
}
resultCallback(result);
});
},
async shutdown() {
await traceExporter.shutdown();
},
};

// Create a separate exporter for workflow spans that captures them for inspection
const workflowSpanExporter: opentelemetry.tracing.SpanExporter = {
export(spans, resultCallback) {
// Capture spans for later inspection
workflowExportedSpans.push(...spans);
// Also send to the real exporter to test serialization
wrappedExporter.export(spans, resultCallback);
},
async shutdown() {
await wrappedExporter.shutdown();
},
};

const otel = new opentelemetry.NodeSDK({
resource: staticResource,
traceExporter: wrappedExporter,
});
otel.start();

const sinks: InjectedSinks<OpenTelemetrySinks> = {
exporter: makeWorkflowExporter(workflowSpanExporter, staticResource),
};

const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'test-otel-tracestate',
interceptors: {
workflowModules: [require.resolve('./workflows/otel-interceptors')],
},
sinks,
});

const client = new WorkflowClient({
interceptors: [new OpenTelemetryWorkflowClientInterceptor()],
});

// Create a parent span with traceState and run the workflow within that context
const tracer = otelApi.trace.getTracer('test-tracestate');

await worker.runUntil(async () => {
// Create a span with traceState by starting a span and then creating a new context with traceState
const parentSpan = tracer.startSpan('parent-with-tracestate');
const parentContext = otelApi.trace.setSpan(otelApi.context.active(), parentSpan);

// Get the span context and create a new one with traceState
const originalSpanContext = parentSpan.spanContext();
const traceState = createTraceState('vendor1=value1,vendor2=value2');
const spanContextWithTraceState = {
...originalSpanContext,
traceState,
};

// Create a new context with the modified span context
const contextWithTraceState = otelApi.trace.setSpanContext(parentContext, spanContextWithTraceState);

// Execute the workflow within this context so the traceState is propagated
await otelApi.context.with(contextWithTraceState, async () => {
await client.execute(workflows.successString, {
taskQueue: 'test-otel-tracestate',
workflowId: uuid4(),
});
});

parentSpan.end();
});

await otel.shutdown();
});

t.deepEqual(exportErrors, [], 'should have no errors exporting spans');

const traceStates = workflowExportedSpans.map((span) => span.spanContext().traceState).filter(Boolean);
t.assert(traceStates.length > 0, 'Should have spans with traceState');

// Verify the traceState was properly reconstructed with working methods
for (const traceState of traceStates) {
// Verify serialize() method works and returns expected value
const serialized = traceState!.serialize();
t.is(serialized, 'vendor1=value1,vendor2=value2');
t.is(traceState!.get('vendor1'), 'value1');
t.is(traceState!.get('vendor2'), 'value2');
}
});
}

test('Can replay otel history from 1.11.3', async (t) => {
Expand Down
Loading