Skip to content

Commit 78bd357

Browse files
authored
ref(spans): remove chunk-oversized-segments option (#112606)
Removing the chunking option from being used in the business logic so that the default behaviour is chunking up a segment before flushing.
1 parent eb2c7fe commit 78bd357

File tree

3 files changed

+19
-193
lines changed

3 files changed

+19
-193
lines changed

src/sentry/spans/README.md

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,9 @@ event types are limited in terms of frequency.
8383
breaks the structure of the trace as the missing spans may be anywhere in
8484
the tree.
8585

86-
- As we extract the subsegments and reassemble them, if the segment is too big
87-
we either drop it or chunk it depending on the
88-
`spans.buffer.chunk-oversized-segments` option:
89-
- **Default (disabled)**: The segment is dropped entirely and an `invalid`
90-
outcome is recorded.
91-
- **Enabled**: The segment is kept and split into multiple Kafka messages,
92-
each within `max-segment-bytes`, and every chunk is sent with the flag
93-
`skip_enrichment=True`.
86+
- As we extract the subsegments and reassemble them, if the segment size exceeds
87+
the `max-segment-bytes` limit, we chunk it into multiple Kafka messages, each within
88+
the above size limit, and every chunk is sent with the flag `skip_enrichment=True`.
9489

9590
### Flushing segments
9691

src/sentry/spans/buffer.py

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,12 @@ def to_messages(self) -> list[dict[str, Any]]:
197197
"""
198198
Build producer messages for this segment.
199199
200-
If chunk-oversized-segments is enabled and the segment exceeds
201-
max_segment_bytes, the segment is split into multiple messages with
202-
skip_enrichment=True. Otherwise, returns a single message.
200+
If the segment size exceeds `spans.buffer.max_segment_bytes`, the segment is split
201+
into multiple messages with skip_enrichment=True. Otherwise, returns a single message.
203202
"""
204-
chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
205203
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")
206204

207205
spans: list[SpanPayload] = [span.payload for span in self.spans]
208-
if not chunk_oversized_segments:
209-
return [{"spans": spans}]
210206

211207
sizes = [len(orjson.dumps(s)) for s in spans]
212208
if sum(sizes) <= max_segment_bytes:
@@ -228,7 +224,11 @@ def to_messages(self) -> list[dict[str, Any]]:
228224
messages.append({"spans": current, "skip_enrichment": True})
229225

230226
if len(messages) > 1:
231-
metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages))
227+
metrics.timing(
228+
"spans.buffer.oversized_segments_chunked",
229+
len(messages),
230+
)
231+
metrics.timing("spans.buffer.oversized_segments_size", sum(sizes))
232232

233233
return messages
234234

@@ -764,11 +764,9 @@ def _load_segment_data(
764764
"""
765765

766766
page_size = options.get("spans.buffer.segment-page-size")
767-
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")
768767

769768
payloads: dict[SegmentKey, list[bytes]] = {key: [] for key in segment_keys}
770769
payload_keys_map: dict[SegmentKey, list[PayloadKey]] = {key: [] for key in segment_keys}
771-
sizes: dict[SegmentKey, int] = {key: 0 for key in segment_keys}
772770
self._last_decompress_latency_ms = 0
773771
decompress_latency_ms = 0.0
774772

@@ -795,31 +793,18 @@ def _load_segment_data(
795793
cursors[payload_key] = 0
796794
payload_keys_map[key] = segment_payload_keys
797795

798-
chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
799796
dropped_segments: set[SegmentKey] = set()
800797

801-
def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
798+
def _add_spans(key: SegmentKey, raw_data: bytes):
802799
"""
803-
Decompress and add spans to the segment. Returns False if the
804-
segment exceeded max_segment_bytes and was dropped.
800+
Decompress and add spans to the segment.
805801
"""
806802
nonlocal decompress_latency_ms
807803

808804
decompress_start = time.monotonic()
809805
decompressed = self._decompress_batch(raw_data)
810806
decompress_latency_ms += (time.monotonic() - decompress_start) * 1000
811-
812-
sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed)
813-
if sizes[key] > max_segment_bytes and not chunk_oversized_segments:
814-
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
815-
logger.warning("Skipping too large segment, byte size %s", sizes[key])
816-
payloads.pop(key, None)
817-
sizes.pop(key, None)
818-
dropped_segments.add(key)
819-
return False
820-
821807
payloads[key].extend(decompressed)
822-
return True
823808

824809
while cursors:
825810
with self.client.pipeline(transaction=False) as p:
@@ -836,15 +821,11 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
836821
cursors.pop(key, None)
837822
continue
838823

839-
size_exceeded = False
840824
for scan_value in scan_values:
841825
if segment_key in payloads:
842-
if not _add_spans(segment_key, scan_value):
843-
size_exceeded = True
826+
_add_spans(segment_key, scan_value)
844827

845-
if size_exceeded:
846-
cursors.pop(key, None)
847-
elif cursor == 0:
828+
if cursor == 0:
848829
del cursors[key]
849830
else:
850831
cursors[key] = cursor

tests/sentry/spans/test_buffer.py

Lines changed: 5 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
"spans.buffer.evalsha-cumulative-logger-enabled": True,
4141
"spans.buffer.enforce-segment-size": False,
4242
"spans.process-segments.schema-validation": 1.0,
43-
"spans.buffer.chunk-oversized-segments": False,
4443
}
4544

4645

@@ -838,9 +837,10 @@ def test_max_segment_spans_limit(mock_project_model, buffer: SpansBuffer) -> Non
838837
buffer.process_spans(batch2, now=0)
839838
rv = buffer.flush_segments(now=11)
840839

841-
# The entire segment should be dropped because it exceeds max_segment_bytes.
840+
# The segment is kept even though it exceeds max_segment_bytes,
841+
# because oversized segments are chunked at the message level.
842842
segment = rv[_segment_id(1, "a" * 32, "a" * 16)]
843-
assert segment.spans == []
843+
assert len(segment.spans) == 5
844844

845845

846846
@mock.patch("sentry.spans.buffer.Project")
@@ -960,135 +960,9 @@ def test_max_segment_bytes_under_limit_merges_normally(
960960
assert span_ids == {"b" * 16, "c" * 16}
961961

962962

963-
@mock.patch("sentry.spans.buffer.Project")
964-
@mock.patch("sentry.spans.buffer.track_outcome")
965-
@mock.patch("sentry.spans.buffer.metrics.timing")
966-
def test_dropped_spans_emit_outcomes(
967-
mock_metrics, mock_track_outcome, mock_project_model, buffer: SpansBuffer
968-
) -> None:
969-
"""Test that outcomes are emitted when Redis drops spans due to size limit."""
970-
from sentry.constants import DataCategory
971-
from sentry.utils.outcomes import Outcome
972-
973-
# Mock the project lookup
974-
mock_project = mock.Mock()
975-
mock_project.id = 1
976-
mock_project.organization_id = 100
977-
mock_project_model.objects.get_from_cache.return_value = mock_project
978-
979-
payload_a = _payload("a" * 16)
980-
payload_b = _payload("b" * 16)
981-
payload_c = _payload("c" * 16)
982-
payload_d = _payload("d" * 16)
983-
payload_e = _payload("e" * 16)
984-
payload_f = _payload("f" * 16)
985-
986-
# Create a segment with many spans that will exceed the Redis memory limit
987-
batch1 = [
988-
Span(
989-
payload=payload_b,
990-
trace_id="a" * 32,
991-
span_id="b" * 16,
992-
parent_span_id="a" * 16,
993-
segment_id=None,
994-
project_id=1,
995-
),
996-
Span(
997-
payload=payload_c,
998-
trace_id="a" * 32,
999-
span_id="c" * 16,
1000-
parent_span_id="a" * 16,
1001-
segment_id=None,
1002-
project_id=1,
1003-
),
1004-
Span(
1005-
payload=payload_d,
1006-
trace_id="a" * 32,
1007-
span_id="d" * 16,
1008-
parent_span_id="a" * 16,
1009-
segment_id=None,
1010-
project_id=1,
1011-
),
1012-
]
1013-
batch2 = [
1014-
Span(
1015-
payload=payload_e,
1016-
trace_id="a" * 32,
1017-
span_id="e" * 16,
1018-
parent_span_id="a" * 16,
1019-
segment_id=None,
1020-
project_id=1,
1021-
),
1022-
Span(
1023-
payload=payload_f,
1024-
trace_id="a" * 32,
1025-
span_id="f" * 16,
1026-
parent_span_id="a" * 16,
1027-
segment_id=None,
1028-
project_id=1,
1029-
),
1030-
Span(
1031-
payload=payload_a,
1032-
trace_id="a" * 32,
1033-
span_id="a" * 16,
1034-
parent_span_id=None,
1035-
project_id=1,
1036-
segment_id=None,
1037-
is_segment_span=True,
1038-
),
1039-
]
1040-
1041-
expected_bytes = sum(
1042-
len(p) for p in [payload_a, payload_b, payload_c, payload_d, payload_e, payload_f]
1043-
)
1044-
1045-
# Set a very small max-segment-bytes to force Redis to drop spans
1046-
with override_options({"spans.buffer.max-segment-bytes": 100}):
1047-
buffer.process_spans(batch1, now=0)
1048-
buffer.process_spans(batch2, now=0)
1049-
buffer.flush_segments(now=11)
1050-
1051-
# Verify that track_outcome was called
1052-
assert mock_track_outcome.called, "track_outcome should be called when spans are dropped"
1053-
1054-
# Find the call with INVALID outcome
1055-
outcome_calls = [
1056-
call
1057-
for call in mock_track_outcome.call_args_list
1058-
if call.kwargs.get("outcome") == Outcome.INVALID
1059-
]
1060-
assert len(outcome_calls) > 0, "Should have at least one INVALID outcome"
1061-
1062-
# Verify the outcome details
1063-
outcome_call = outcome_calls[0]
1064-
assert outcome_call.kwargs["org_id"] == 100
1065-
assert outcome_call.kwargs["project_id"] == 1
1066-
assert outcome_call.kwargs["outcome"] == Outcome.INVALID
1067-
assert outcome_call.kwargs["reason"] == "segment_too_large"
1068-
assert outcome_call.kwargs["category"] == DataCategory.SPAN_INDEXED
1069-
assert outcome_call.kwargs["quantity"] > 0, "Should have dropped at least some spans"
1070-
1071-
# Verify ingested span count and byte count metrics were emitted
1072-
ingested_spans_timing_calls = [
1073-
call
1074-
for call in mock_metrics.call_args_list
1075-
if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_spans_per_segment"
1076-
]
1077-
assert len(ingested_spans_timing_calls) == 1, "Should emit ingested_spans_per_segment metric"
1078-
assert ingested_spans_timing_calls[0].args[1] == 6, "Should have ingested 6 spans"
1079-
1080-
ingested_bytes_timing_calls = [
1081-
call
1082-
for call in mock_metrics.call_args_list
1083-
if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_bytes_per_segment"
1084-
]
1085-
assert len(ingested_bytes_timing_calls) == 1, "Should emit ingested_bytes_per_segment metric"
1086-
assert ingested_bytes_timing_calls[0].args[1] == expected_bytes
1087-
1088-
1089963
@mock.patch("sentry.spans.buffer.Project")
1090964
def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None:
1091-
"""When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped."""
965+
"""Test that oversized segments are kept instead of dropped."""
1092966
mock_project = mock.Mock()
1093967
mock_project.id = 1
1094968
mock_project.organization_id = 100
@@ -1140,9 +1014,7 @@ def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> No
11401014
),
11411015
]
11421016

1143-
with override_options(
1144-
{"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True}
1145-
):
1017+
with override_options({"spans.buffer.max-segment-bytes": 100}):
11461018
buffer.process_spans(batch1, now=0)
11471019
buffer.process_spans(batch2, now=0)
11481020
rv = buffer.flush_segments(now=11)
@@ -1178,7 +1050,6 @@ def test_to_messages_under_limit(buffer: SpansBuffer) -> None:
11781050
with override_options(
11791051
{
11801052
**DEFAULT_OPTIONS,
1181-
"spans.buffer.chunk-oversized-segments": True,
11821053
"spans.buffer.max-segment-bytes": 10000,
11831054
}
11841055
):
@@ -1224,7 +1095,6 @@ def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None:
12241095
with override_options(
12251096
{
12261097
**DEFAULT_OPTIONS,
1227-
"spans.buffer.chunk-oversized-segments": True,
12281098
"spans.buffer.max-segment-bytes": 500,
12291099
}
12301100
):
@@ -1254,7 +1124,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None:
12541124
with override_options(
12551125
{
12561126
**DEFAULT_OPTIONS,
1257-
"spans.buffer.chunk-oversized-segments": True,
12581127
"spans.buffer.max-segment-bytes": 10,
12591128
}
12601129
):
@@ -1263,25 +1132,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None:
12631132
assert messages[0]["skip_enrichment"] is True
12641133

12651134

1266-
def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None:
1267-
"""When chunk-oversized-segments is disabled, always returns a single message."""
1268-
segment = FlushedSegment(
1269-
queue_key=b"test",
1270-
spans=[OutputSpan(payload={"span_id": "a" * 16})],
1271-
project_id=1,
1272-
)
1273-
with override_options(
1274-
{
1275-
**DEFAULT_OPTIONS,
1276-
"spans.buffer.chunk-oversized-segments": False,
1277-
"spans.buffer.max-segment-bytes": 10,
1278-
}
1279-
):
1280-
messages = segment.to_messages()
1281-
assert len(messages) == 1
1282-
assert "skip_enrichment" not in messages[0]
1283-
1284-
12851135
def test_kafka_slice_id(buffer: SpansBuffer) -> None:
12861136
with override_options(DEFAULT_OPTIONS):
12871137
buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)

0 commit comments

Comments
 (0)