Skip to content

Commit 320e805

Browse files
adinauerclaude
andcommitted
fix(spring-jakarta): Fork root scopes and skip when OTel is active in Kafka consumer interceptor
Use Sentry.forkedRootScopes() instead of scopes.forkedScopes() so each Kafka message starts with a clean scope from root, matching the pattern used by SentryWebFilter for reactive request boundaries. Add isIgnored() check using SpanUtils.isIgnored() on the trace origin so the interceptor no-ops when OTel is active and the origin is in the ignored span origins list, consistent with SentryTracingFilter. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 2501e57 commit 320e805

File tree

2 files changed

+54
-51
lines changed

2 files changed

+54
-51
lines changed

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import io.sentry.IScopes;
55
import io.sentry.ISentryLifecycleToken;
66
import io.sentry.ITransaction;
7+
import io.sentry.Sentry;
78
import io.sentry.SentryTraceHeader;
89
import io.sentry.SpanDataConvention;
910
import io.sentry.SpanStatus;
1011
import io.sentry.TransactionContext;
1112
import io.sentry.TransactionOptions;
13+
import io.sentry.util.SpanUtils;
1214
import java.nio.charset.StandardCharsets;
1315
import java.util.Collections;
1416
import java.util.List;
@@ -48,13 +50,13 @@ public SentryKafkaRecordInterceptor(
4850
@Override
4951
public @Nullable ConsumerRecord<K, V> intercept(
5052
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
51-
if (!scopes.getOptions().isEnableQueueTracing()) {
53+
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
5254
return delegateIntercept(record, consumer);
5355
}
5456

5557
finishStaleContext();
5658

57-
final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
59+
final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor");
5860
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
5961

6062
final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record);
@@ -105,6 +107,10 @@ public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
105107
finishStaleContext();
106108
}
107109

110+
private boolean isIgnored() {
111+
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN);
112+
}
113+
108114
private @Nullable ConsumerRecord<K, V> delegateIntercept(
109115
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
110116
if (delegate != null) {

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import kotlin.test.assertEquals
1717
import org.apache.kafka.clients.consumer.Consumer
1818
import org.apache.kafka.clients.consumer.ConsumerRecord
1919
import org.apache.kafka.common.header.internals.RecordHeaders
20+
import org.mockito.Mockito
2021
import org.mockito.kotlin.any
2122
import org.mockito.kotlin.mock
2223
import org.mockito.kotlin.never
@@ -27,6 +28,7 @@ import org.springframework.kafka.listener.RecordInterceptor
2728
class SentryKafkaRecordInterceptorTest {
2829

2930
private lateinit var scopes: IScopes
31+
private lateinit var forkedScopes: IScopes
3032
private lateinit var options: SentryOptions
3133
private lateinit var consumer: Consumer<String, String>
3234
private lateinit var lifecycleToken: ISentryLifecycleToken
@@ -46,10 +48,9 @@ class SentryKafkaRecordInterceptorTest {
4648
whenever(scopes.options).thenReturn(options)
4749
whenever(scopes.isEnabled).thenReturn(true)
4850

49-
val forkedScopes = mock<IScopes>()
51+
forkedScopes = mock()
5052
whenever(forkedScopes.options).thenReturn(options)
5153
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
52-
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)
5354

5455
val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
5556
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
@@ -60,6 +61,13 @@ class SentryKafkaRecordInterceptorTest {
6061
Sentry.close()
6162
}
6263

64+
private fun <T> withMockSentry(closure: () -> T): T =
65+
Mockito.mockStatic(Sentry::class.java).use {
66+
it.`when`<Any> { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes)
67+
it.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
68+
closure.invoke()
69+
}
70+
6371
private fun createRecord(
6472
topic: String = "my-topic",
6573
headers: RecordHeaders = RecordHeaders(),
@@ -93,49 +101,33 @@ class SentryKafkaRecordInterceptorTest {
93101
}
94102

95103
@Test
96-
fun `intercept creates forked scopes`() {
104+
fun `intercept forks root scopes`() {
97105
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
98106
val record = createRecord()
99107

100-
interceptor.intercept(record, consumer)
108+
withMockSentry { interceptor.intercept(record, consumer) }
101109

102-
verify(scopes).forkedScopes("SentryKafkaRecordInterceptor")
110+
verify(forkedScopes).makeCurrent()
103111
}
104112

105113
@Test
106114
fun `intercept continues trace from headers`() {
107-
val forkedScopes = mock<IScopes>()
108-
whenever(forkedScopes.options).thenReturn(options)
109-
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
110-
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)
111-
112-
val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
113-
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
114-
115115
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
116116
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
117117
val record = createRecordWithHeaders(sentryTrace = sentryTraceValue)
118118

119-
interceptor.intercept(record, consumer)
119+
withMockSentry { interceptor.intercept(record, consumer) }
120120

121121
verify(forkedScopes)
122122
.continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull())
123123
}
124124

125125
@Test
126126
fun `intercept calls continueTrace with null when no headers`() {
127-
val forkedScopes = mock<IScopes>()
128-
whenever(forkedScopes.options).thenReturn(options)
129-
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
130-
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)
131-
132-
val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
133-
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
134-
135127
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
136128
val record = createRecord()
137129

138-
interceptor.intercept(record, consumer)
130+
withMockSentry { interceptor.intercept(record, consumer) }
139131

140132
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
141133
}
@@ -148,7 +140,19 @@ class SentryKafkaRecordInterceptorTest {
148140

149141
val result = interceptor.intercept(record, consumer)
150142

151-
verify(scopes, never()).forkedScopes(any())
143+
verify(forkedScopes, never()).makeCurrent()
144+
assertEquals(record, result)
145+
}
146+
147+
@Test
148+
fun `does not create span when origin is ignored`() {
149+
options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN))
150+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
151+
val record = createRecord()
152+
153+
val result = interceptor.intercept(record, consumer)
154+
155+
verify(forkedScopes, never()).makeCurrent()
152156
assertEquals(record, result)
153157
}
154158

@@ -159,7 +163,7 @@ class SentryKafkaRecordInterceptorTest {
159163
whenever(delegate.intercept(record, consumer)).thenReturn(record)
160164

161165
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
162-
interceptor.intercept(record, consumer)
166+
withMockSentry { interceptor.intercept(record, consumer) }
163167

164168
verify(delegate).intercept(record, consumer)
165169
}
@@ -170,8 +174,7 @@ class SentryKafkaRecordInterceptorTest {
170174
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
171175
val record = createRecord()
172176

173-
// intercept first to set up context
174-
interceptor.intercept(record, consumer)
177+
withMockSentry { interceptor.intercept(record, consumer) }
175178
interceptor.success(record, consumer)
176179

177180
verify(delegate).success(record, consumer)
@@ -184,7 +187,7 @@ class SentryKafkaRecordInterceptorTest {
184187
val record = createRecord()
185188
val exception = RuntimeException("processing failed")
186189

187-
interceptor.intercept(record, consumer)
190+
withMockSentry { interceptor.intercept(record, consumer) }
188191
interceptor.failure(record, exception, consumer)
189192

190193
verify(delegate).failure(record, exception, consumer)
@@ -214,13 +217,10 @@ class SentryKafkaRecordInterceptorTest {
214217
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
215218
val record = createRecord()
216219

217-
// intercept sets up context in ThreadLocal
218-
interceptor.intercept(record, consumer)
220+
withMockSentry { interceptor.intercept(record, consumer) }
219221

220-
// clearThreadState should clean up without success/failure being called
221222
interceptor.clearThreadState(consumer)
222223

223-
// lifecycle token should have been closed
224224
verify(lifecycleToken).close()
225225
}
226226

@@ -242,28 +242,25 @@ class SentryKafkaRecordInterceptorTest {
242242
whenever(forkedScopes2.startTransaction(any<TransactionContext>(), any())).thenReturn(tx2)
243243

244244
var callCount = 0
245-
whenever(scopes.forkedScopes(any())).thenAnswer {
246-
callCount++
247-
if (callCount == 1) {
248-
val forkedScopes1 = mock<IScopes>()
249-
whenever(forkedScopes1.options).thenReturn(options)
250-
whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken)
251-
val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1)
252-
whenever(forkedScopes1.startTransaction(any<TransactionContext>(), any())).thenReturn(tx1)
253-
forkedScopes1
254-
} else {
255-
forkedScopes2
256-
}
257-
}
258245

259246
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
260247
val record = createRecord()
261248

262-
// First intercept sets up context
263-
interceptor.intercept(record, consumer)
249+
Mockito.mockStatic(Sentry::class.java).use { mockSentry ->
250+
mockSentry.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
251+
mockSentry
252+
.`when`<Any> { Sentry.forkedRootScopes(any()) }
253+
.thenAnswer {
254+
callCount++
255+
if (callCount == 1) forkedScopes else forkedScopes2
256+
}
257+
258+
// First intercept sets up context
259+
interceptor.intercept(record, consumer)
264260

265-
// Second intercept without success/failure — should clean up stale context first
266-
interceptor.intercept(record, consumer)
261+
// Second intercept without success/failure — should clean up stale context first
262+
interceptor.intercept(record, consumer)
263+
}
267264

268265
// First lifecycle token should have been closed by the defensive cleanup
269266
verify(lifecycleToken).close()

0 commit comments

Comments
 (0)