Skip to content

Commit e96091e

Browse files
committed
feat(core): Add SpanBuffer implementation
1 parent de1e0c2 commit e96091e

File tree

5 files changed

+434
-2
lines changed

5 files changed

+434
-2
lines changed

packages/core/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ export type {
174174
GoogleGenAIOptions,
175175
GoogleGenAIIstrumentedMethod,
176176
} from './tracing/google-genai/types';
177+
178+
export { SpanBuffer } from './tracing/spans/spanBuffer';
179+
177180
export type { FeatureFlag } from './utils/featureFlags';
178181

179182
export {

packages/core/src/tracing/dynamicSamplingContext.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
SEMANTIC_ATTRIBUTE_SENTRY_PREVIOUS_TRACE_SAMPLE_RATE,
77
SEMANTIC_ATTRIBUTE_SENTRY_SAMPLE_RATE,
88
SEMANTIC_ATTRIBUTE_SENTRY_SOURCE,
9+
SEMANTIC_ATTRIBUTE_SENTRY_SPAN_SOURCE,
910
} from '../semanticAttributes';
1011
import type { DynamicSamplingContext } from '../types-hoist/envelope';
1112
import type { Span } from '../types-hoist/span';
@@ -119,7 +120,9 @@ export function getDynamicSamplingContextFromSpan(span: Span): Readonly<Partial<
119120
const dsc = getDynamicSamplingContextFromClient(span.spanContext().traceId, client);
120121

121122
// We don't want to have a transaction name in the DSC if the source is "url" because URLs might contain PII
122-
const source = rootSpanAttributes[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE];
123+
// TODO(v11): Only read `sentry.span.source` to determine the source
124+
const source =
125+
rootSpanAttributes[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE] ?? rootSpanAttributes[SEMANTIC_ATTRIBUTE_SENTRY_SPAN_SOURCE];
123126

124127
// after JSON conversion, txn.name becomes jsonSpan.description
125128
const name = rootSpanJson.description;

packages/core/src/tracing/spans/captureSpan.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import {
2626
} from '../../utils/spanUtils';
2727
import { getCapturedScopesOnSpan } from '../utils';
2828

29-
type SerializedStreamedSpanWithSegmentSpan = SerializedStreamedSpan & {
29+
export type SerializedStreamedSpanWithSegmentSpan = SerializedStreamedSpan & {
3030
_segmentSpan: Span;
3131
};
3232

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import type { Client } from '../../client';
2+
import { DEBUG_BUILD } from '../../debug-build';
3+
import type { SerializedStreamedSpan } from '../../types-hoist/span';
4+
import { debug } from '../../utils/debug-logger';
5+
import { safeUnref } from '../../utils/timer';
6+
import { getDynamicSamplingContextFromSpan } from '../dynamicSamplingContext';
7+
import type { SerializedStreamedSpanWithSegmentSpan } from './captureSpan';
8+
import { createStreamedSpanEnvelope } from './envelope';
9+
10+
/**
11+
* We must not send more than 1000 spans in one envelope.
12+
* Otherwise the envelope is dropped by Relay.
13+
*/
14+
const MAX_SPANS_PER_ENVELOPE = 1000;
15+
16+
export interface SpanBufferOptions {
17+
/**
18+
* Max spans per trace before auto-flush
19+
* Must not exceed 1000.
20+
*
21+
* @default 1_000
22+
*/
23+
maxSpanLimit?: number;
24+
25+
/**
26+
* Flush interval in ms
27+
* Must be greater than 0.
28+
*
29+
* @default 5_000
30+
*/
31+
flushInterval?: number;
32+
}
33+
34+
/**
35+
* A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes.
36+
* Handles interval-based flushing, size thresholds, and graceful shutdown.
37+
* Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet
38+
* frozen onto the segment span.
39+
*
40+
* For this, we need the reference to the segment span instance, from
41+
* which we compute the DSC. Doing this in the buffer ensures that we compute the DSC as late as possible,
42+
* allowing span name and data updates up to this point. Worth noting here that the segment span is likely
43+
* still active and modifyable when child spans are added to the buffer.
44+
*/
45+
export class SpanBuffer {
46+
/* Bucket spans by their trace id */
47+
private _traceMap: Map<string, Set<SerializedStreamedSpanWithSegmentSpan>>;
48+
49+
private _flushIntervalId: ReturnType<typeof setInterval> | null;
50+
private _client: Client;
51+
private _maxSpanLimit: number;
52+
private _flushInterval: number;
53+
54+
public constructor(client: Client, options?: SpanBufferOptions) {
55+
this._traceMap = new Map();
56+
this._client = client;
57+
58+
const { maxSpanLimit, flushInterval } = options ?? {};
59+
60+
this._maxSpanLimit =
61+
maxSpanLimit && maxSpanLimit > 0 && maxSpanLimit <= MAX_SPANS_PER_ENVELOPE
62+
? maxSpanLimit
63+
: MAX_SPANS_PER_ENVELOPE;
64+
this._flushInterval = flushInterval && flushInterval > 0 ? flushInterval : 5_000;
65+
66+
this._flushIntervalId = null;
67+
this._debounceFlushInterval();
68+
69+
this._client.on('flush', () => {
70+
this.drain();
71+
});
72+
}
73+
74+
/**
75+
* Add a span to the buffer.
76+
*/
77+
public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void {
78+
const traceId = spanJSON.trace_id;
79+
let traceBucket = this._traceMap.get(traceId);
80+
if (traceBucket) {
81+
traceBucket.add(spanJSON);
82+
} else {
83+
traceBucket = new Set([spanJSON]);
84+
this._traceMap.set(traceId, traceBucket);
85+
}
86+
87+
if (traceBucket.size >= this._maxSpanLimit) {
88+
this.flush(traceId);
89+
this._debounceFlushInterval();
90+
}
91+
}
92+
93+
/**
94+
* Drain and flush all buffered traces.
95+
*/
96+
public drain(): void {
97+
if (!this._traceMap.size) {
98+
return;
99+
}
100+
101+
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`);
102+
103+
this._traceMap.forEach((_, traceId) => {
104+
this.flush(traceId);
105+
});
106+
this._debounceFlushInterval();
107+
}
108+
109+
/**
110+
* Flush spans of a specific trace.
111+
* In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId.
112+
*/
113+
public flush(traceId: string): void {
114+
const traceBucket = this._traceMap.get(traceId);
115+
if (!traceBucket) {
116+
return;
117+
}
118+
119+
if (!traceBucket.size) {
120+
// we should never get here, given we always add a span when we create a new bucket
121+
// and delete the bucket once we flush out the trace
122+
this._traceMap.delete(traceId);
123+
return;
124+
}
125+
126+
const spans = Array.from(traceBucket);
127+
128+
const segmentSpan = spans[0]?._segmentSpan;
129+
if (!segmentSpan) {
130+
DEBUG_BUILD && debug.warn('No segment span reference found on span JSON, cannot compute DSC');
131+
this._traceMap.delete(traceId);
132+
return;
133+
}
134+
135+
const dsc = getDynamicSamplingContextFromSpan(segmentSpan);
136+
137+
const cleanedSpans: SerializedStreamedSpan[] = Array.from(traceBucket).map(spanJSON => {
138+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
139+
const { _segmentSpan, ...cleanSpanJSON } = spanJSON;
140+
return cleanSpanJSON;
141+
});
142+
143+
const envelope = createStreamedSpanEnvelope(cleanedSpans, dsc, this._client);
144+
145+
DEBUG_BUILD && debug.log(`Sending span envelope for trace ${traceId} with ${cleanedSpans.length} spans`);
146+
147+
this._client.sendEnvelope(envelope).then(null, reason => {
148+
DEBUG_BUILD && debug.error('Error while sending streamed span envelope:', reason);
149+
});
150+
151+
this._traceMap.delete(traceId);
152+
}
153+
154+
private _debounceFlushInterval(): void {
155+
if (this._flushIntervalId) {
156+
clearInterval(this._flushIntervalId);
157+
}
158+
this._flushIntervalId = safeUnref(
159+
setInterval(() => {
160+
this.drain();
161+
}, this._flushInterval),
162+
);
163+
}
164+
}

0 commit comments

Comments
 (0)