Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/sentry/spans/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,9 @@ event types are limited in terms of frequency.
breaks the structure of the trace as the missing spans may be anywhere in
the tree.

- As we extract the subsegments and reassemble them, if the segment is too big
we either drop it or chunk it depending on the
`spans.buffer.chunk-oversized-segments` option:
- **Default (disabled)**: The segment is dropped entirely and an `invalid`
outcome is recorded.
- **Enabled**: The segment is kept and split into multiple Kafka messages,
each within `max-segment-bytes`, and every chunk is sent with the flag
`skip_enrichment=True`.
- As we extract the subsegments and reassemble them, if the segment size exceeds
the `max-segment-bytes` limit, we chunk it into multiple Kafka messages, each within
the above size limit, and every chunk is sent with the flag `skip_enrichment=True`.

### Flushing segments

Expand Down
41 changes: 11 additions & 30 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,12 @@ def to_messages(self) -> list[dict[str, Any]]:
"""
Build producer messages for this segment.

If chunk-oversized-segments is enabled and the segment exceeds
max_segment_bytes, the segment is split into multiple messages with
skip_enrichment=True. Otherwise, returns a single message.
If the segment size exceeds `spans.buffer.max_segment_bytes`, the segment is split
into multiple messages with skip_enrichment=True. Otherwise, returns a single message.
"""
chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")

spans: list[SpanPayload] = [span.payload for span in self.spans]
if not chunk_oversized_segments:
return [{"spans": spans}]

sizes = [len(orjson.dumps(s)) for s in spans]
if sum(sizes) <= max_segment_bytes:
Expand All @@ -228,7 +224,11 @@ def to_messages(self) -> list[dict[str, Any]]:
messages.append({"spans": current, "skip_enrichment": True})

if len(messages) > 1:
metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages))
metrics.timing(
"spans.buffer.oversized_segments_chunked",
len(messages),
)
metrics.timing("spans.buffer.oversized_segments_size", sum(sizes))

return messages

Expand Down Expand Up @@ -764,11 +764,9 @@ def _load_segment_data(
"""

page_size = options.get("spans.buffer.segment-page-size")
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")

payloads: dict[SegmentKey, list[bytes]] = {key: [] for key in segment_keys}
payload_keys_map: dict[SegmentKey, list[PayloadKey]] = {key: [] for key in segment_keys}
sizes: dict[SegmentKey, int] = {key: 0 for key in segment_keys}
self._last_decompress_latency_ms = 0
decompress_latency_ms = 0.0

Expand All @@ -795,31 +793,18 @@ def _load_segment_data(
cursors[payload_key] = 0
payload_keys_map[key] = segment_payload_keys

chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
Comment thread
lvthanh03 marked this conversation as resolved.
dropped_segments: set[SegmentKey] = set()

def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
def _add_spans(key: SegmentKey, raw_data: bytes):
"""
Decompress and add spans to the segment. Returns False if the
segment exceeded max_segment_bytes and was dropped.
Decompress and add spans to the segment.
"""
nonlocal decompress_latency_ms

decompress_start = time.monotonic()
decompressed = self._decompress_batch(raw_data)
decompress_latency_ms += (time.monotonic() - decompress_start) * 1000

sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed)
if sizes[key] > max_segment_bytes and not chunk_oversized_segments:
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
logger.warning("Skipping too large segment, byte size %s", sizes[key])
payloads.pop(key, None)
sizes.pop(key, None)
dropped_segments.add(key)
return False

payloads[key].extend(decompressed)
return True

Comment thread
lvthanh03 marked this conversation as resolved.
while cursors:
with self.client.pipeline(transaction=False) as p:
Expand All @@ -836,15 +821,11 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
cursors.pop(key, None)
continue

size_exceeded = False
for scan_value in scan_values:
if segment_key in payloads:
if not _add_spans(segment_key, scan_value):
size_exceeded = True
_add_spans(segment_key, scan_value)

if size_exceeded:
cursors.pop(key, None)
elif cursor == 0:
if cursor == 0:
del cursors[key]
else:
cursors[key] = cursor
Expand Down
160 changes: 5 additions & 155 deletions tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
"spans.buffer.evalsha-cumulative-logger-enabled": True,
"spans.buffer.enforce-segment-size": False,
"spans.process-segments.schema-validation": 1.0,
"spans.buffer.chunk-oversized-segments": False,
}


Expand Down Expand Up @@ -838,9 +837,10 @@ def test_max_segment_spans_limit(mock_project_model, buffer: SpansBuffer) -> Non
buffer.process_spans(batch2, now=0)
rv = buffer.flush_segments(now=11)

# The entire segment should be dropped because it exceeds max_segment_bytes.
# The segment is kept even though it exceeds max_segment_bytes,
# because oversized segments are chunked at the message level.
segment = rv[_segment_id(1, "a" * 32, "a" * 16)]
assert segment.spans == []
assert len(segment.spans) == 5


@mock.patch("sentry.spans.buffer.Project")
Expand Down Expand Up @@ -960,135 +960,9 @@ def test_max_segment_bytes_under_limit_merges_normally(
assert span_ids == {"b" * 16, "c" * 16}


@mock.patch("sentry.spans.buffer.Project")
@mock.patch("sentry.spans.buffer.track_outcome")
@mock.patch("sentry.spans.buffer.metrics.timing")
def test_dropped_spans_emit_outcomes(
mock_metrics, mock_track_outcome, mock_project_model, buffer: SpansBuffer
) -> None:
"""Test that outcomes are emitted when Redis drops spans due to size limit."""
from sentry.constants import DataCategory
from sentry.utils.outcomes import Outcome

# Mock the project lookup
mock_project = mock.Mock()
mock_project.id = 1
mock_project.organization_id = 100
mock_project_model.objects.get_from_cache.return_value = mock_project

payload_a = _payload("a" * 16)
payload_b = _payload("b" * 16)
payload_c = _payload("c" * 16)
payload_d = _payload("d" * 16)
payload_e = _payload("e" * 16)
payload_f = _payload("f" * 16)

# Create a segment with many spans that will exceed the Redis memory limit
batch1 = [
Span(
payload=payload_b,
trace_id="a" * 32,
span_id="b" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=payload_c,
trace_id="a" * 32,
span_id="c" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=payload_d,
trace_id="a" * 32,
span_id="d" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
]
batch2 = [
Span(
payload=payload_e,
trace_id="a" * 32,
span_id="e" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=payload_f,
trace_id="a" * 32,
span_id="f" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=payload_a,
trace_id="a" * 32,
span_id="a" * 16,
parent_span_id=None,
project_id=1,
segment_id=None,
is_segment_span=True,
),
]

expected_bytes = sum(
len(p) for p in [payload_a, payload_b, payload_c, payload_d, payload_e, payload_f]
)

# Set a very small max-segment-bytes to force Redis to drop spans
with override_options({"spans.buffer.max-segment-bytes": 100}):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
buffer.flush_segments(now=11)

# Verify that track_outcome was called
assert mock_track_outcome.called, "track_outcome should be called when spans are dropped"

# Find the call with INVALID outcome
outcome_calls = [
call
for call in mock_track_outcome.call_args_list
if call.kwargs.get("outcome") == Outcome.INVALID
]
assert len(outcome_calls) > 0, "Should have at least one INVALID outcome"

# Verify the outcome details
outcome_call = outcome_calls[0]
assert outcome_call.kwargs["org_id"] == 100
assert outcome_call.kwargs["project_id"] == 1
assert outcome_call.kwargs["outcome"] == Outcome.INVALID
assert outcome_call.kwargs["reason"] == "segment_too_large"
assert outcome_call.kwargs["category"] == DataCategory.SPAN_INDEXED
assert outcome_call.kwargs["quantity"] > 0, "Should have dropped at least some spans"

# Verify ingested span count and byte count metrics were emitted
ingested_spans_timing_calls = [
call
for call in mock_metrics.call_args_list
if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_spans_per_segment"
]
assert len(ingested_spans_timing_calls) == 1, "Should emit ingested_spans_per_segment metric"
assert ingested_spans_timing_calls[0].args[1] == 6, "Should have ingested 6 spans"

ingested_bytes_timing_calls = [
call
for call in mock_metrics.call_args_list
if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_bytes_per_segment"
]
assert len(ingested_bytes_timing_calls) == 1, "Should emit ingested_bytes_per_segment metric"
assert ingested_bytes_timing_calls[0].args[1] == expected_bytes


@mock.patch("sentry.spans.buffer.Project")
def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None:
"""When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped."""
"""Test that oversized segments are kept instead of dropped."""
mock_project = mock.Mock()
mock_project.id = 1
mock_project.organization_id = 100
Expand Down Expand Up @@ -1140,9 +1014,7 @@ def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> No
),
]

with override_options(
{"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True}
):
with override_options({"spans.buffer.max-segment-bytes": 100}):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
rv = buffer.flush_segments(now=11)
Expand Down Expand Up @@ -1178,7 +1050,6 @@ def test_to_messages_under_limit(buffer: SpansBuffer) -> None:
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 10000,
}
):
Expand Down Expand Up @@ -1224,7 +1095,6 @@ def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None:
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 500,
}
):
Expand Down Expand Up @@ -1254,7 +1124,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None:
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 10,
}
):
Expand All @@ -1263,25 +1132,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None:
assert messages[0]["skip_enrichment"] is True


def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None:
"""When chunk-oversized-segments is disabled, always returns a single message."""
segment = FlushedSegment(
queue_key=b"test",
spans=[OutputSpan(payload={"span_id": "a" * 16})],
project_id=1,
)
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": False,
"spans.buffer.max-segment-bytes": 10,
}
):
messages = segment.to_messages()
assert len(messages) == 1
assert "skip_enrichment" not in messages[0]


def test_kafka_slice_id(buffer: SpansBuffer) -> None:
with override_options(DEFAULT_OPTIONS):
buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)
Expand Down
Loading