Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3172,6 +3172,12 @@
default=10 * 1024 * 1024,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# When enabled, oversized segments are split into chunks instead of being dropped.
register(
"spans.buffer.chunk-oversized-segments",
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# TTL for keys in Redis. This is a downside protection in case of bugs.
register(
"spans.buffer.redis-ttl",
Expand Down
8 changes: 7 additions & 1 deletion src/sentry/spans/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ event types are limited in terms of frequency.
the tree.

- As we extract the subsegments and reassemble them, if the segment is too big
we drop it entirely and record an `invalid` outcome.
we either drop it or chunk it depending on the
`spans.buffer.chunk-oversized-segments` option:
- **Default (disabled)**: The segment is dropped entirely and an `invalid`
outcome is recorded.
- **Enabled**: The segment is kept and split into multiple Kafka messages,
each within `max-segment-bytes`, and every chunk is sent with the flag
`skip_enrichment=True`.

### Flushing segments

Expand Down
47 changes: 45 additions & 2 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ def effective_parent_id(self):
return self.segment_id or self.parent_span_id or self.span_id


type SpanPayload = dict[str, Any]


class OutputSpan(NamedTuple):
payload: dict[str, Any]
payload: SpanPayload


class FlushedSegment(NamedTuple):
Expand All @@ -157,6 +160,45 @@ class FlushedSegment(NamedTuple):
ingested_count: int = 0 # Ingested count at flush time, used for conditional data cleanup
payload_keys: list[PayloadKey] = [] # For cleanup

def to_messages(self) -> list[dict[str, Any]]:
"""
Build producer messages for this segment.

If chunk-oversized-segments is enabled and the segment exceeds
max_segment_bytes, the segment is split into multiple messages with
skip_enrichment=True. Otherwise, returns a single message.
"""
chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
max_segment_bytes = options.get("spans.buffer.max-segment-bytes")

spans: list[SpanPayload] = [span.payload for span in self.spans]
if not chunk_oversized_segments:
return [{"spans": spans}]

sizes = [len(orjson.dumps(s)) for s in spans]
if sum(sizes) <= max_segment_bytes:
return [{"spans": spans}]

messages: list[dict[str, Any]] = []
current: list[SpanPayload] = []
current_size = 0

for span, size in zip(spans, sizes):
if current and current_size + size > max_segment_bytes:
messages.append({"spans": current, "skip_enrichment": True})
current = []
current_size = 0
current.append(span)
current_size += size

if current:
messages.append({"spans": current, "skip_enrichment": True})

if len(messages) > 1:
metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages))

return messages


class SpansBuffer:
def __init__(self, assigned_shards: list[int], slice_id: int | None = None):
Expand Down Expand Up @@ -710,6 +752,7 @@ def _load_segment_data(
cursors[payload_key] = 0
payload_keys_map[key] = segment_payload_keys

chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments")
dropped_segments: set[SegmentKey] = set()

def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
Expand All @@ -724,7 +767,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool:
decompress_latency_ms += (time.monotonic() - decompress_start) * 1000

sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed)
if sizes[key] > max_segment_bytes:
if sizes[key] > max_segment_bytes and not chunk_oversized_segments:
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
logger.warning("Skipping too large segment, byte size %s", sizes[key])
payloads.pop(key, None)
Expand Down
20 changes: 12 additions & 8 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,18 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None:
if not flushed_segment.spans:
continue

spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(flushed_segment.project_id, kafka_payload, len(spans))
for message in flushed_segment.to_messages():
kafka_payload = KafkaPayload(None, orjson.dumps(message), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(
flushed_segment.project_id,
kafka_payload,
len(message["spans"]),
)

with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
for project_id, future, dropped in producer_futures:
Expand Down
197 changes: 197 additions & 0 deletions tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"spans.buffer.debug-traces": [],
"spans.buffer.evalsha-cumulative-logger-enabled": True,
"spans.process-segments.schema-validation": 1.0,
"spans.buffer.chunk-oversized-segments": False,
}


Expand Down Expand Up @@ -967,6 +968,202 @@ def test_dropped_spans_emit_outcomes(
assert ingested_bytes_timing_calls[0].args[1] == expected_bytes


@mock.patch("sentry.spans.buffer.Project")
def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None:
"""When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped."""
mock_project = mock.Mock()
mock_project.id = 1
mock_project.organization_id = 100
mock_project_model.objects.get_from_cache.return_value = mock_project

batch1 = [
Span(
payload=_payload("c" * 16),
trace_id="a" * 32,
span_id="c" * 16,
parent_span_id="b" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("b" * 16),
trace_id="a" * 32,
span_id="b" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
]
batch2 = [
Span(
payload=_payload("d" * 16),
trace_id="a" * 32,
span_id="d" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("e" * 16),
trace_id="a" * 32,
span_id="e" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
),
Span(
payload=_payload("a" * 16),
trace_id="a" * 32,
span_id="a" * 16,
parent_span_id=None,
project_id=1,
segment_id=None,
is_segment_span=True,
),
]

with override_options(
{"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True}
):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
rv = buffer.flush_segments(now=11)

segment = rv[_segment_id(1, "a" * 32, "a" * 16)]
assert len(segment.spans) == 5
_normalize_output(rv)
assert rv == {
_segment_id(1, "a" * 32, "a" * 16): FlushedSegment(
queue_key=mock.ANY,
score=mock.ANY,
ingested_count=mock.ANY,
payload_keys=mock.ANY,
project_id=1,
spans=[
_output_segment(b"a" * 16, b"a" * 16, True),
_output_segment(b"b" * 16, b"a" * 16, False),
_output_segment(b"c" * 16, b"a" * 16, False),
_output_segment(b"d" * 16, b"a" * 16, False),
_output_segment(b"e" * 16, b"a" * 16, False),
],
)
}


def test_to_messages_under_limit(buffer: SpansBuffer) -> None:
spans = [{"span_id": "a"}, {"span_id": "b"}]
segment = FlushedSegment(
queue_key=b"test",
spans=[OutputSpan(payload=s) for s in spans],
project_id=1,
)
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 10000,
}
):
messages = segment.to_messages()
assert len(messages) == 1
assert messages[0] == {"spans": spans}
assert "skip_enrichment" not in messages[0]


def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None:
spans = [
{
"span_id": "a" * 16,
"is_segment": True,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "b" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "c" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "d" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
{
"span_id": "e" * 16,
"is_segment": False,
"attributes": {"sentry.segment.id": {"type": "string", "value": "a" * 16}},
},
]
segment = FlushedSegment(
queue_key=b"test",
spans=[OutputSpan(payload=s) for s in spans],
project_id=1,
)
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 500,
}
):
messages = segment.to_messages()

assert len(messages) == 2
assert [len(m["spans"]) for m in messages] == [3, 2]

all_spans = [span for m in messages for span in m["spans"]]
assert all_spans == spans

for message in messages:
assert message["skip_enrichment"] is True

for message in messages[:-1]:
chunk_size = sum(len(orjson.dumps(s)) for s in message["spans"])
assert chunk_size <= 500


def test_to_messages_single_large_span(buffer: SpansBuffer) -> None:
"""A single span larger than max_bytes still gets its own message."""
segment = FlushedSegment(
queue_key=b"test",
spans=[OutputSpan(payload={"span_id": "a" * 16})],
project_id=1,
)
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": True,
"spans.buffer.max-segment-bytes": 10,
}
):
messages = segment.to_messages()
assert len(messages) == 1
assert messages[0]["skip_enrichment"] is True


def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None:
"""When chunk-oversized-segments is disabled, always returns a single message."""
segment = FlushedSegment(
queue_key=b"test",
spans=[OutputSpan(payload={"span_id": "a" * 16})],
project_id=1,
)
with override_options(
{
**DEFAULT_OPTIONS,
"spans.buffer.chunk-oversized-segments": False,
"spans.buffer.max-segment-bytes": 10,
}
):
messages = segment.to_messages()
assert len(messages) == 1
assert "skip_enrichment" not in messages[0]


def test_kafka_slice_id(buffer: SpansBuffer) -> None:
with override_options(DEFAULT_OPTIONS):
buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)
Expand Down
Loading