From b4a38187b8bc7705b378f5b82f0e7c09cc8ce93f Mon Sep 17 00:00:00 2001 From: Tony Le Date: Mon, 30 Mar 2026 16:20:02 -0400 Subject: [PATCH 1/4] feat(spans): Flush oversized segments in chunks --- src/sentry/options/defaults.py | 6 ++ src/sentry/spans/buffer.py | 12 +-- src/sentry/spans/consumers/process/flusher.py | 60 ++++++++++++-- .../spans/consumers/process/test_flusher.py | 61 +++++++++++++- tests/sentry/spans/test_buffer.py | 83 +++++++++++++++++++ 5 files changed, 209 insertions(+), 13 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 45fe2de8d841e3..e62b44f4a7a7a1 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3228,6 +3228,12 @@ default=10 * 1024 * 1024, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) +# When enabled, oversized segments are split into chunks instead of being dropped. +register( + "spans.buffer.flush-oversized-segments", + default=False, + flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) # TTL for keys in Redis. This is a downside protection in case of bugs. register( "spans.buffer.redis-ttl", diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index a00d7517a62b9e..952d99fd595658 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -710,6 +710,7 @@ def _load_segment_data( cursors[payload_key] = 0 payload_keys_map[key] = segment_payload_keys + flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") dropped_segments: set[SegmentKey] = set() def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: @@ -726,11 +727,12 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) if sizes[key] > max_segment_bytes: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") - logger.warning("Skipping too large segment, byte size %s", sizes[key]) - payloads.pop(key, None) - sizes.pop(key, None) - dropped_segments.add(key) - return False + if not flush_oversized_segments: + logger.warning("Skipping too large segment, byte size %s", sizes[key]) + payloads.pop(key, None) + sizes.pop(key, None) + dropped_segments.add(key) + return False payloads[key].extend(decompressed) return True diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 78d219db430917..2fdfc1ca56b233 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -6,6 +6,7 @@ from collections.abc import Callable, Mapping from concurrent.futures import Future from functools import partial +from typing import Any import orjson import sentry_sdk @@ -35,6 +36,42 @@ type ProduceToPipe = Callable[[int, KafkaPayload, int], None] +type SpanPayload = dict[str, Any] + + +def _chunk_segment(span_payloads: list[SpanPayload]) -> list[list[SpanPayload]]: + """ + Split a segment into chunks of spans payloads that fit under max_segment_bytes. + If all spans of the segment fit in one chunk, returns a single-element list. + """ + max_segment_bytes = options.get("spans.buffer.max-segment-bytes") + + sizes = [len(orjson.dumps(s)) for s in span_payloads] + total_size = sum(sizes) + if total_size <= max_segment_bytes: + return [span_payloads] + + chunks: list[list[SpanPayload]] = [] + current_chunk: list[SpanPayload] = [] + current_size = 0 + + for payload, payload_size in zip(span_payloads, sizes): + if current_chunk and current_size + payload_size > max_segment_bytes: + chunks.append(current_chunk) + current_chunk = [] + current_size = 0 + current_chunk.append(payload) + current_size += payload_size + + if current_chunk: + chunks.append(current_chunk) + + if len(chunks) > 1: + metrics.incr("spans.buffer.flusher.oversized_segments_chunked") + + return chunks + + class MultiProducer: """ Manages multiple Kafka producers for load balancing across brokers/topics. @@ -268,6 +305,8 @@ def main( healthy_since, produce_to_pipe: ProduceToPipe | None, ) -> None: + flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") + logger.info("Flusher process main started for shards %s", shards) shard_tag = ",".join(map(str, shards)) @@ -334,13 +373,20 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: continue spans = [span.payload for span in flushed_segment.spans] - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing( - "spans.buffer.segment_size_bytes", - len(kafka_payload.value), - tags={"shard": shard_tag}, - ) - produce(flushed_segment.project_id, kafka_payload, len(spans)) + + if flush_oversized_segments: + chunks = _chunk_segment(spans) + else: + chunks = [spans] + + for chunk in chunks: + kafka_payload = KafkaPayload(None, orjson.dumps({"spans": chunk}), []) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard_tag}, + ) + produce(flushed_segment.project_id, kafka_payload, len(chunk)) with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): for project_id, future, dropped in producer_futures: diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 3ee2f8428bebe0..587c2f700890d7 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -10,7 +10,7 @@ from sentry.conf.types.kafka_definition import Topic from sentry.spans.buffer import Span, SpansBuffer -from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher +from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher, _chunk_segment from sentry.testutils.helpers.options import override_options from tests.sentry.spans.test_buffer import DEFAULT_OPTIONS @@ -216,3 +216,62 @@ def hang_main( next_step=Noop(), produce_to_pipe=lambda project_id, payload, dropped: None, ) + + +def test_chunk_segment_under_limit() -> None: + spans = [{"span_id": "a"}, {"span_id": "b"}] + with override_options({"spans.buffer.max-segment-bytes": 10000}): + chunks = _chunk_segment(spans) + assert chunks == [spans] + + +def test_chunk_segment_splits_oversized() -> None: + # Mirror the segment payload shape used in buffer tests. + spans = [ + { + "span_id": "a" * 16, + "is_segment": True, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "b" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "c" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "d" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "e" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + ] + + with override_options({"spans.buffer.max-segment-bytes": 500}): + chunks = _chunk_segment(spans) + + assert len(chunks) == 2 + assert [len(chunk) for chunk in chunks] == [3, 2] + + all_spans = [span for chunk in chunks for span in chunk] + assert all_spans == spans + + for chunk in chunks[:-1]: + chunk_size = sum(len(orjson.dumps(s)) for s in chunk) + assert chunk_size <= 500 + + +def test_chunk_segment_single_large_span() -> None: + """A single span larger than max_bytes still gets its own chunk.""" + spans = [{"span_id": "a" * 16}] + with override_options({"spans.buffer.max-segment-bytes": 10}): + chunks = _chunk_segment(spans) + assert chunks == [spans] diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 84b06f7ac22ab8..f3bc7c2ce84deb 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -39,6 +39,7 @@ "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, "spans.process-segments.schema-validation": 1.0, + "spans.buffer.flush-oversized-segments": False, } @@ -967,6 +968,88 @@ def test_dropped_spans_emit_outcomes( assert ingested_bytes_timing_calls[0].args[1] == expected_bytes +@mock.patch("sentry.spans.buffer.Project") +def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None: + """When flush-oversized-segments is enabled, oversized segments are kept instead of dropped.""" + mock_project = mock.Mock() + mock_project.id = 1 + mock_project.organization_id = 100 + mock_project_model.objects.get_from_cache.return_value = mock_project + + batch1 = [ + Span( + payload=_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch2 = [ + Span( + payload=_payload("d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id=None, + project_id=1, + segment_id=None, + is_segment_span=True, + ), + ] + + with override_options( + {"spans.buffer.max-segment-bytes": 100, "spans.buffer.flush-oversized-segments": True} + ): + buffer.process_spans(batch1, now=0) + buffer.process_spans(batch2, now=0) + rv = buffer.flush_segments(now=11) + + segment = rv[_segment_id(1, "a" * 32, "a" * 16)] + assert len(segment.spans) == 5 + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "a" * 16): FlushedSegment( + queue_key=mock.ANY, + score=mock.ANY, + ingested_count=mock.ANY, + payload_keys=mock.ANY, + project_id=1, + spans=[ + _output_segment(b"a" * 16, b"a" * 16, True), + _output_segment(b"b" * 16, b"a" * 16, False), + _output_segment(b"c" * 16, b"a" * 16, False), + _output_segment(b"d" * 16, b"a" * 16, False), + _output_segment(b"e" * 16, b"a" * 16, False), + ], + ) + } + + def test_kafka_slice_id(buffer: SpansBuffer) -> None: with override_options(DEFAULT_OPTIONS): buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2) From 6d7fbc0c8cf5dec26c61bf435914ae78135548cd Mon Sep 17 00:00:00 2001 From: Tony Le Date: Mon, 30 Mar 2026 16:40:30 -0400 Subject: [PATCH 2/4] bugbot fix --- src/sentry/spans/buffer.py | 13 ++++++------- src/sentry/spans/consumers/process/flusher.py | 3 +-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index 952d99fd595658..57aa1da0a72b60 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -725,14 +725,13 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) - if sizes[key] > max_segment_bytes: + if sizes[key] > max_segment_bytes and not flush_oversized_segments: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") - if not flush_oversized_segments: - logger.warning("Skipping too large segment, byte size %s", sizes[key]) - payloads.pop(key, None) - sizes.pop(key, None) - dropped_segments.add(key) - return False + logger.warning("Skipping too large segment, byte size %s", sizes[key]) + payloads.pop(key, None) + sizes.pop(key, None) + dropped_segments.add(key) + return False payloads[key].extend(decompressed) return True diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 2fdfc1ca56b233..479c7c5d721aa7 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -305,8 +305,6 @@ def main( healthy_since, produce_to_pipe: ProduceToPipe | None, ) -> None: - flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") - logger.info("Flusher process main started for shards %s", shards) shard_tag = ",".join(map(str, shards)) @@ -342,6 +340,7 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: first_iteration = True while not stopped.value: + flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") system_now = int(time.time()) now = system_now + current_drift.value flushed_segments = buffer.flush_segments(now=now) From 9e288ef0eeaf98ae100821e43812741b7e382f2c Mon Sep 17 00:00:00 2001 From: Tony Le Date: Wed, 1 Apr 2026 14:08:17 -0400 Subject: [PATCH 3/4] move chunking logic to buffer file --- src/sentry/options/defaults.py | 2 +- src/sentry/spans/buffer.py | 48 ++++++- src/sentry/spans/consumers/process/flusher.py | 55 +------- .../spans/consumers/process/test_flusher.py | 61 +-------- tests/sentry/spans/test_buffer.py | 120 +++++++++++++++++- 5 files changed, 171 insertions(+), 115 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index e62b44f4a7a7a1..768c0738a0b643 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3230,7 +3230,7 @@ ) # When enabled, oversized segments are split into chunks instead of being dropped. register( - "spans.buffer.flush-oversized-segments", + "spans.buffer.chunk-oversized-segments", default=False, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index 57aa1da0a72b60..59cec919869165 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -143,8 +143,11 @@ def effective_parent_id(self): return self.segment_id or self.parent_span_id or self.span_id +type SpanPayload = dict[str, Any] + + class OutputSpan(NamedTuple): - payload: dict[str, Any] + payload: SpanPayload class FlushedSegment(NamedTuple): @@ -157,6 +160,45 @@ class FlushedSegment(NamedTuple): ingested_count: int = 0 # Ingested count at flush time, used for conditional data cleanup payload_keys: list[PayloadKey] = [] # For cleanup + def to_messages(self) -> list[dict[str, Any]]: + """ + Build producer messages for this segment. + + If chunk-oversized-segments is enabled and the segment exceeds + max_segment_bytes, the segment is split into multiple messages with + skip_enrichment=True. Otherwise, returns a single message. + """ + chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") + max_segment_bytes = options.get("spans.buffer.max-segment-bytes") + + spans: list[SpanPayload] = [span.payload for span in self.spans] + if not chunk_oversized_segments: + return [{"spans": spans}] + + sizes = [len(orjson.dumps(s)) for s in spans] + if sum(sizes) <= max_segment_bytes: + return [{"spans": spans}] + + messages: list[dict[str, Any]] = [] + current: list[SpanPayload] = [] + current_size = 0 + + for span, size in zip(spans, sizes): + if current and current_size + size > max_segment_bytes: + messages.append({"spans": current, "skip_enrichment": True}) + current = [] + current_size = 0 + current.append(span) + current_size += size + + if current: + messages.append({"spans": current, "skip_enrichment": True}) + + if len(messages) > 1: + metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages)) + + return messages + class SpansBuffer: def __init__(self, assigned_shards: list[int], slice_id: int | None = None): @@ -710,7 +752,7 @@ def _load_segment_data( cursors[payload_key] = 0 payload_keys_map[key] = segment_payload_keys - flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") + chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") dropped_segments: set[SegmentKey] = set() def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: @@ -725,7 +767,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) - if sizes[key] > max_segment_bytes and not flush_oversized_segments: + if sizes[key] > max_segment_bytes and not chunk_oversized_segments: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") logger.warning("Skipping too large segment, byte size %s", sizes[key]) payloads.pop(key, None) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 479c7c5d721aa7..bacafd04e183fc 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -6,7 +6,6 @@ from collections.abc import Callable, Mapping from concurrent.futures import Future from functools import partial -from typing import Any import orjson import sentry_sdk @@ -36,42 +35,6 @@ type ProduceToPipe = Callable[[int, KafkaPayload, int], None] -type SpanPayload = dict[str, Any] - - -def _chunk_segment(span_payloads: list[SpanPayload]) -> list[list[SpanPayload]]: - """ - Split a segment into chunks of spans payloads that fit under max_segment_bytes. - If all spans of the segment fit in one chunk, returns a single-element list. - """ - max_segment_bytes = options.get("spans.buffer.max-segment-bytes") - - sizes = [len(orjson.dumps(s)) for s in span_payloads] - total_size = sum(sizes) - if total_size <= max_segment_bytes: - return [span_payloads] - - chunks: list[list[SpanPayload]] = [] - current_chunk: list[SpanPayload] = [] - current_size = 0 - - for payload, payload_size in zip(span_payloads, sizes): - if current_chunk and current_size + payload_size > max_segment_bytes: - chunks.append(current_chunk) - current_chunk = [] - current_size = 0 - current_chunk.append(payload) - current_size += payload_size - - if current_chunk: - chunks.append(current_chunk) - - if len(chunks) > 1: - metrics.incr("spans.buffer.flusher.oversized_segments_chunked") - - return chunks - - class MultiProducer: """ Manages multiple Kafka producers for load balancing across brokers/topics. @@ -340,7 +303,6 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: first_iteration = True while not stopped.value: - flush_oversized_segments = options.get("spans.buffer.flush-oversized-segments") system_now = int(time.time()) now = system_now + current_drift.value flushed_segments = buffer.flush_segments(now=now) @@ -371,21 +333,18 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: if not flushed_segment.spans: continue - spans = [span.payload for span in flushed_segment.spans] - - if flush_oversized_segments: - chunks = _chunk_segment(spans) - else: - chunks = [spans] - - for chunk in chunks: - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": chunk}), []) + for message in flushed_segment.to_messages(): + kafka_payload = KafkaPayload(None, orjson.dumps(message), []) metrics.timing( "spans.buffer.segment_size_bytes", len(kafka_payload.value), tags={"shard": shard_tag}, ) - produce(flushed_segment.project_id, kafka_payload, len(chunk)) + produce( + flushed_segment.project_id, + kafka_payload, + len(message["spans"]), + ) with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): for project_id, future, dropped in producer_futures: diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 587c2f700890d7..3ee2f8428bebe0 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -10,7 +10,7 @@ from sentry.conf.types.kafka_definition import Topic from sentry.spans.buffer import Span, SpansBuffer -from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher, _chunk_segment +from sentry.spans.consumers.process.flusher import MultiProducer, SpanFlusher from sentry.testutils.helpers.options import override_options from tests.sentry.spans.test_buffer import DEFAULT_OPTIONS @@ -216,62 +216,3 @@ def hang_main( next_step=Noop(), produce_to_pipe=lambda project_id, payload, dropped: None, ) - - -def test_chunk_segment_under_limit() -> None: - spans = [{"span_id": "a"}, {"span_id": "b"}] - with override_options({"spans.buffer.max-segment-bytes": 10000}): - chunks = _chunk_segment(spans) - assert chunks == [spans] - - -def test_chunk_segment_splits_oversized() -> None: - # Mirror the segment payload shape used in buffer tests. - spans = [ - { - "span_id": "a" * 16, - "is_segment": True, - "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, - }, - { - "span_id": "b" * 16, - "is_segment": False, - "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, - }, - { - "span_id": "c" * 16, - "is_segment": False, - "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, - }, - { - "span_id": "d" * 16, - "is_segment": False, - "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, - }, - { - "span_id": "e" * 16, - "is_segment": False, - "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, - }, - ] - - with override_options({"spans.buffer.max-segment-bytes": 500}): - chunks = _chunk_segment(spans) - - assert len(chunks) == 2 - assert [len(chunk) for chunk in chunks] == [3, 2] - - all_spans = [span for chunk in chunks for span in chunk] - assert all_spans == spans - - for chunk in chunks[:-1]: - chunk_size = sum(len(orjson.dumps(s)) for s in chunk) - assert chunk_size <= 500 - - -def test_chunk_segment_single_large_span() -> None: - """A single span larger than max_bytes still gets its own chunk.""" - spans = [{"span_id": "a" * 16}] - with override_options({"spans.buffer.max-segment-bytes": 10}): - chunks = _chunk_segment(spans) - assert chunks == [spans] diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index f3bc7c2ce84deb..7e5f62209dc8b1 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -39,7 +39,7 @@ "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, "spans.process-segments.schema-validation": 1.0, - "spans.buffer.flush-oversized-segments": False, + "spans.buffer.chunk-oversized-segments": False, } @@ -970,7 +970,7 @@ def test_dropped_spans_emit_outcomes( @mock.patch("sentry.spans.buffer.Project") def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None: - """When flush-oversized-segments is enabled, oversized segments are kept instead of dropped.""" + """When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped.""" mock_project = mock.Mock() mock_project.id = 1 mock_project.organization_id = 100 @@ -1023,7 +1023,7 @@ def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> No ] with override_options( - {"spans.buffer.max-segment-bytes": 100, "spans.buffer.flush-oversized-segments": True} + {"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True} ): buffer.process_spans(batch1, now=0) buffer.process_spans(batch2, now=0) @@ -1050,6 +1050,120 @@ def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> No } +def test_to_messages_under_limit(buffer: SpansBuffer) -> None: + spans = [{"span_id": "a"}, {"span_id": "b"}] + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload=s) for s in spans], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 10000, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert messages[0] == {"spans": spans} + assert "skip_enrichment" not in messages[0] + + +def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None: + spans = [ + { + "span_id": "a" * 16, + "is_segment": True, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "b" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "c" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "d" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "e" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + ] + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload=s) for s in spans], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 500, + } + ): + messages = segment.to_messages() + + assert len(messages) == 2 + assert [len(m["spans"]) for m in messages] == [3, 2] + + all_spans = [span for m in messages for span in m["spans"]] + assert all_spans == spans + + for message in messages: + assert message["skip_enrichment"] is True + + for message in messages[:-1]: + chunk_size = sum(len(orjson.dumps(s)) for s in message["spans"]) + assert chunk_size <= 500 + + +def test_to_messages_single_large_span(buffer: SpansBuffer) -> None: + """A single span larger than max_bytes still gets its own message.""" + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload={"span_id": "a" * 16})], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 10, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert messages[0]["skip_enrichment"] is True + + +def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None: + """When chunk-oversized-segments is disabled, always returns a single message.""" + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload={"span_id": "a" * 16})], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": False, + "spans.buffer.max-segment-bytes": 10, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert "skip_enrichment" not in messages[0] + + def test_kafka_slice_id(buffer: SpansBuffer) -> None: with override_options(DEFAULT_OPTIONS): buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2) From 03306e310676c685b821289849dea26bbdea2a59 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Tue, 7 Apr 2026 13:35:24 -0400 Subject: [PATCH 4/4] update README --- src/sentry/spans/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/sentry/spans/README.md b/src/sentry/spans/README.md index e8d0b271a785d9..6955d8a8a502ce 100644 --- a/src/sentry/spans/README.md +++ b/src/sentry/spans/README.md @@ -84,7 +84,13 @@ event types are limited in terms of frequency. the tree. - As we extract the subsegments and reassemble them, if the segment is too big - we drop it entirely and record an `invalid` outcome. + we either drop it or chunk it depending on the + `spans.buffer.chunk-oversized-segments` option: + - **Default (disabled)**: The segment is dropped entirely and an `invalid` + outcome is recorded. + - **Enabled**: The segment is kept and split into multiple Kafka messages, + each within `max-segment-bytes`, and every chunk is sent with the flag + `skip_enrichment=True`. ### Flushing segments