diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 12942114b0c17e..b86e20a5959091 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3172,6 +3172,12 @@ default=10 * 1024 * 1024, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) +# When enabled, oversized segments are split into chunks instead of being dropped. +register( + "spans.buffer.chunk-oversized-segments", + default=False, + flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) # TTL for keys in Redis. This is a downside protection in case of bugs. register( "spans.buffer.redis-ttl", diff --git a/src/sentry/spans/README.md b/src/sentry/spans/README.md index e8d0b271a785d9..6955d8a8a502ce 100644 --- a/src/sentry/spans/README.md +++ b/src/sentry/spans/README.md @@ -84,7 +84,13 @@ event types are limited in terms of frequency. the tree. - As we extract the subsegments and reassemble them, if the segment is too big - we drop it entirely and record an `invalid` outcome. + we either drop it or chunk it depending on the + `spans.buffer.chunk-oversized-segments` option: + - **Default (disabled)**: The segment is dropped entirely and an `invalid` + outcome is recorded. + - **Enabled**: The segment is kept and split into multiple Kafka messages, + each within `max-segment-bytes`, and every chunk is sent with the flag + `skip_enrichment=True`. ### Flushing segments diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index a00d7517a62b9e..59cec919869165 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -143,8 +143,11 @@ def effective_parent_id(self): return self.segment_id or self.parent_span_id or self.span_id +type SpanPayload = dict[str, Any] + + class OutputSpan(NamedTuple): - payload: dict[str, Any] + payload: SpanPayload class FlushedSegment(NamedTuple): @@ -157,6 +160,45 @@ class FlushedSegment(NamedTuple): ingested_count: int = 0 # Ingested count at flush time, used for conditional data cleanup payload_keys: list[PayloadKey] = [] # For cleanup + def to_messages(self) -> list[dict[str, Any]]: + """ + Build producer messages for this segment. + + If chunk-oversized-segments is enabled and the segment exceeds + max_segment_bytes, the segment is split into multiple messages with + skip_enrichment=True. Otherwise, returns a single message. + """ + chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") + max_segment_bytes = options.get("spans.buffer.max-segment-bytes") + + spans: list[SpanPayload] = [span.payload for span in self.spans] + if not chunk_oversized_segments: + return [{"spans": spans}] + + sizes = [len(orjson.dumps(s)) for s in spans] + if sum(sizes) <= max_segment_bytes: + return [{"spans": spans}] + + messages: list[dict[str, Any]] = [] + current: list[SpanPayload] = [] + current_size = 0 + + for span, size in zip(spans, sizes): + if current and current_size + size > max_segment_bytes: + messages.append({"spans": current, "skip_enrichment": True}) + current = [] + current_size = 0 + current.append(span) + current_size += size + + if current: + messages.append({"spans": current, "skip_enrichment": True}) + + if len(messages) > 1: + metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages)) + + return messages + class SpansBuffer: def __init__(self, assigned_shards: list[int], slice_id: int | None = None): @@ -710,6 +752,7 @@ def _load_segment_data( cursors[payload_key] = 0 payload_keys_map[key] = segment_payload_keys + chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") dropped_segments: set[SegmentKey] = set() def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: @@ -724,7 +767,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) - if sizes[key] > max_segment_bytes: + if sizes[key] > max_segment_bytes and not chunk_oversized_segments: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") logger.warning("Skipping too large segment, byte size %s", sizes[key]) payloads.pop(key, None) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 78d219db430917..bacafd04e183fc 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -333,14 +333,18 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: if not flushed_segment.spans: continue - spans = [span.payload for span in flushed_segment.spans] - kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) - metrics.timing( - "spans.buffer.segment_size_bytes", - len(kafka_payload.value), - tags={"shard": shard_tag}, - ) - produce(flushed_segment.project_id, kafka_payload, len(spans)) + for message in flushed_segment.to_messages(): + kafka_payload = KafkaPayload(None, orjson.dumps(message), []) + metrics.timing( + "spans.buffer.segment_size_bytes", + len(kafka_payload.value), + tags={"shard": shard_tag}, + ) + produce( + flushed_segment.project_id, + kafka_payload, + len(message["spans"]), + ) with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): for project_id, future, dropped in producer_futures: diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 84b06f7ac22ab8..7e5f62209dc8b1 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -39,6 +39,7 @@ "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, "spans.process-segments.schema-validation": 1.0, + "spans.buffer.chunk-oversized-segments": False, } @@ -967,6 +968,202 @@ def test_dropped_spans_emit_outcomes( assert ingested_bytes_timing_calls[0].args[1] == expected_bytes +@mock.patch("sentry.spans.buffer.Project") +def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None: + """When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped.""" + mock_project = mock.Mock() + mock_project.id = 1 + mock_project.organization_id = 100 + mock_project_model.objects.get_from_cache.return_value = mock_project + + batch1 = [ + Span( + payload=_payload("c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + ] + batch2 = [ + Span( + payload=_payload("d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="a" * 16, + segment_id=None, + project_id=1, + ), + Span( + payload=_payload("a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id=None, + project_id=1, + segment_id=None, + is_segment_span=True, + ), + ] + + with override_options( + {"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True} + ): + buffer.process_spans(batch1, now=0) + buffer.process_spans(batch2, now=0) + rv = buffer.flush_segments(now=11) + + segment = rv[_segment_id(1, "a" * 32, "a" * 16)] + assert len(segment.spans) == 5 + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "a" * 16): FlushedSegment( + queue_key=mock.ANY, + score=mock.ANY, + ingested_count=mock.ANY, + payload_keys=mock.ANY, + project_id=1, + spans=[ + _output_segment(b"a" * 16, b"a" * 16, True), + _output_segment(b"b" * 16, b"a" * 16, False), + _output_segment(b"c" * 16, b"a" * 16, False), + _output_segment(b"d" * 16, b"a" * 16, False), + _output_segment(b"e" * 16, b"a" * 16, False), + ], + ) + } + + +def test_to_messages_under_limit(buffer: SpansBuffer) -> None: + spans = [{"span_id": "a"}, {"span_id": "b"}] + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload=s) for s in spans], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 10000, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert messages[0] == {"spans": spans} + assert "skip_enrichment" not in messages[0] + + +def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None: + spans = [ + { + "span_id": "a" * 16, + "is_segment": True, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "b" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "c" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "d" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + { + "span_id": "e" * 16, + "is_segment": False, + "attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}}, + }, + ] + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload=s) for s in spans], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 500, + } + ): + messages = segment.to_messages() + + assert len(messages) == 2 + assert [len(m["spans"]) for m in messages] == [3, 2] + + all_spans = [span for m in messages for span in m["spans"]] + assert all_spans == spans + + for message in messages: + assert message["skip_enrichment"] is True + + for message in messages[:-1]: + chunk_size = sum(len(orjson.dumps(s)) for s in message["spans"]) + assert chunk_size <= 500 + + +def test_to_messages_single_large_span(buffer: SpansBuffer) -> None: + """A single span larger than max_bytes still gets its own message.""" + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload={"span_id": "a" * 16})], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": True, + "spans.buffer.max-segment-bytes": 10, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert messages[0]["skip_enrichment"] is True + + +def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None: + """When chunk-oversized-segments is disabled, always returns a single message.""" + segment = FlushedSegment( + queue_key=b"test", + spans=[OutputSpan(payload={"span_id": "a" * 16})], + project_id=1, + ) + with override_options( + { + **DEFAULT_OPTIONS, + "spans.buffer.chunk-oversized-segments": False, + "spans.buffer.max-segment-bytes": 10, + } + ): + messages = segment.to_messages() + assert len(messages) == 1 + assert "skip_enrichment" not in messages[0] + + def test_kafka_slice_id(buffer: SpansBuffer) -> None: with override_options(DEFAULT_OPTIONS): buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)