feat(spans): Flush oversized segments in chunks#111820
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
There was a problem hiding this comment.
We also drop spans during ingestion into the redis lua script, those spans are still dropped. Instead we should stop merging sets and flush them out individually. This will make it unnecessary to do the same in the flusher, and preserves the invariant that there is only one root span per flushed chunk. Right now it's possible that multiple unrelated spans (i.e. distinct trees) are flushed in a single chunk and it's not clear to me how the segments consumer handles that (or that it's supposed to)
|
A few questions on the requirements we are working towards:
|
fpacifici
left a comment
There was a problem hiding this comment.
Please update the README as this is changing the flush policy.
|
|
||
| if flush_oversized_segments: | ||
| chunks = _chunk_segment(spans) | ||
| else: | ||
| chunks = [spans] | ||
|
|
||
| for chunk in chunks: | ||
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": chunk}), []) | ||
| metrics.timing( | ||
| "spans.buffer.segment_size_bytes", | ||
| len(kafka_payload.value), | ||
| tags={"shard": shard_tag}, | ||
| ) | ||
| produce(flushed_segment.project_id, kafka_payload, len(chunk)) |
There was a problem hiding this comment.
I don't think this logic belongs here.
The design of this system is such that buffer.py contains all the business logic to manage segments and flushing.
This is the consumer code, which takes care of run the business logic into a kafka consumer. If (unlikely) tomorrow, we started flushing into ObjectStore we would touch this but the business logic would not change.
Deciding to chunk the segment is a business logic concern not a kafka consumer concern. This logic should go into the buffer.py file.
Is there any specific reason you added it here ?
There was a problem hiding this comment.
Yeah it makes sense to have business logic exclusively in buffer.py, I added it in the flusher since it seemed straightforward to me at the time that, we want to chunk the spans before producing, so let's chunk the spans where we produce them to Kafka.
I've pushed a fix for this by adding a to_messages method to the FlushedSegment dataclass.
Sorry, what's "those" referring to? In general the product understanding is in the proposal on Notion. The limits listed there are fine with product. Any other limitations would have to be discussed.
Once a segment reaches its max size, we'd like spans in that segment to start skipping enrichment entirely, going straight to snuba-items/EAP. (Instead of the current behaviour, where they're dropped.) Depending on how we structure this that might need changes to segment consumer, just so it knows to emit certain spans as trace items immediately.
Yes. Their segment ID will already be present so it won't be broken down into different segments, but the enrichment will be incorrect. Product accepts that. (It's in the doc too). If we really wanted to protect ourselves agains that we could query EAP to see if spans in that segment have already been ingested, but that has its own scalability concerns. |
…ue flag (#112024) Going off of product requirements from #111820 (comment), we want to have a way to signal the process-segments consumer to skip enrichment once a segment hits the size limit. When this happens, the process-spans consumer would produce the segment in chunks to the buffered-segments topic, where each message contains one chunk along with the flag `skip_enrichment=True`.
Refs STREAM-826 - Adds an option `spans.buffer.flush-oversized-segments` to allow for flushing entire segments that exceed `spans.buffer.max-segments-bytes` bytes. - Adds the `_chunk_segment()` function in the flusher that splits span payloads into chunks, each chunk under `max-segment-bytes` When the option is enabled, the flusher produces one Kafka message per chunk instead of one per segment.
…ue flag (#112024) Going off of product requirements from #111820 (comment), we want to have a way to signal the process-segments consumer to skip enrichment once a segment hits the size limit. When this happens, the process-spans consumer would produce the segment in chunks to the buffered-segments topic, where each message contains one chunk along with the flag `skip_enrichment=True`.

Refs STREAM-826
spans.buffer.flush-oversized-segmentsto allow for flushing entire segments that exceedspans.buffer.max-segments-bytesbytes._chunk_segment()function in the flusher that splits span payloads into chunks, each chunk undermax-segment-bytesWhen the option is enabled, the flusher produces one Kafka message per chunk instead of one per segment.