3232
3333Now how does that look like in Redis? For each incoming span, we:
3434
35- 1. Store the span payload in a payload key:
36- "span-buf:s:{project_id:trace_id:span_id}:span_id". Each subsegment
37- gets its own key, distributed across Redis cluster nodes.
35+ 1. Store the span payload in a payload key. Each subsegment gets its own key,
36+ distributed across Redis cluster nodes.
37+ a. When segment size enforcement is disabled, the key uses the parent_span_id to
38+ determine where to write span payloads to.
39+ Key: `span-buf:s:{project_id:trace_id:parent_span_id}:parent_span_id`
40+ b. When segment size enforcement is enabled, the key uses a unique salt per
41+ subsegment. This allows us to skip merging the subsegment into the parent segment
42+ and not lose any data, since the subsegment will become its own separate segment
43+ and be flushed out independently.
44+ Key: `span-buf:s:{project_id:trace_id:salt}:salt`
38452. The Lua script (add-buffer.lua) receives the span IDs and:
3946 a. Follows redirects from parent_span_id (hashmap at
4047 "span-buf:ssr:{project_id:trace_id}") to find the segment root.
4148 b. Updates the redirect table so future spans can find the segment root.
4249 c. Merges member-keys indexes and counters (ingested count, byte count)
4350 from span IDs that were previously separate segment roots into the
4451 current segment root.
52+ d. If segment size enforcement is enabled and the segment exceeds
53+ max_segment_bytes, detaches the subsegment into its own segment
54+ keyed by the salt.
45553. To a "global queue", we write the segment key, sorted by timeout.
4656
4757Eventually, flushing cronjob looks at that global queue, and removes all timed
5868or using spillover topics, especially when their new partition count is lower
5969than the original topic.
6070
71+ Segment size enforcement:
72+
73+ Segments can grow unboundedly as spans arrive. To prevent oversized segments from
74+ consuming excessive memory during flush, the buffer enforces a maximum byte limit
75+ per segment (controlled by `spans.buffer.max-segment-bytes` and gated behind
76+ `spans.buffer.enforce-segment-size`).
77+
78+ Each subsegment is assigned a unique salt (UUID). The Lua script tracks cumulative
79+ ingested bytes per segment via `span-buf:ibc` keys. If adding a subsegment would
80+ push the segment over the byte limit, the script detaches it into a new segment
81+ keyed by the salt instead of merging it into the parent. The detached segment is
82+ independently tracked and flushed.
83+
84+ During flush, segments that exceed `max-segment-bytes` are chunked into multiple
85+ Kafka messages to stay within downstream size limits.
86+
6187Glossary for types of keys:
6288
6389 * span-buf:s:{project_id:trace_id:span_id}:span_id -- payload keys containing span payloads, distributed across cluster nodes.
76102import logging
77103import math
78104import time
105+ import uuid
79106from collections .abc import Generator , MutableMapping , Sequence
80107from typing import Any , NamedTuple , cast
81108
@@ -146,6 +173,12 @@ def effective_parent_id(self):
146173type SpanPayload = dict [str , Any ]
147174
148175
176+ class Subsegment (NamedTuple ):
177+ project_and_trace : tuple [str , str ]
178+ salt : str
179+ subsegment : list [Span ]
180+
181+
149182class OutputSpan (NamedTuple ):
150183 payload : SpanPayload
151184
@@ -254,6 +287,8 @@ def process_spans(self, spans: Sequence[Span], now: int):
254287 timeout = options .get ("spans.buffer.timeout" )
255288 root_timeout = options .get ("spans.buffer.root-timeout" )
256289 max_spans_per_evalsha = options .get ("spans.buffer.max-spans-per-evalsha" )
290+ max_segment_bytes = options .get ("spans.buffer.max-segment-bytes" )
291+ enforce_segment_size = options .get ("spans.buffer.enforce-segment-size" )
257292 result_meta = []
258293 is_root_span_count = 0
259294
@@ -263,25 +298,27 @@ def process_spans(self, spans: Sequence[Span], now: int):
263298
264299 # Split large subsegments into chunks to avoid Lua unpack() limits.
265300 # Chunks share the same parent_span_id but are processed separately.
266- tree_items : list [tuple [ tuple [ str , str ], list [ Span ]] ] = []
301+ tree_items : list [Subsegment ] = []
267302 for key , subsegment in trees .items ():
268303 if max_spans_per_evalsha > 0 and len (subsegment ) > max_spans_per_evalsha :
269304 for chunk in itertools .batched (subsegment , max_spans_per_evalsha ):
270- tree_items .append ((key , list (chunk )))
305+ tree_items .append (Subsegment (key , uuid . uuid4 (). hex , list (chunk )))
271306 else :
272- tree_items .append ((key , subsegment ))
307+ tree_items .append (Subsegment (key , uuid . uuid4 (). hex , subsegment ))
273308
274- tree_batches : Sequence [Sequence [tuple [ tuple [ str , str ], list [ Span ]] ]]
309+ tree_batches : Sequence [Sequence [Subsegment ]]
275310 if pipeline_batch_size > 0 :
276311 tree_batches = list (itertools .batched (tree_items , pipeline_batch_size ))
277312 else :
278313 tree_batches = [tree_items ]
279314
280315 for batch in tree_batches :
281316 with self .client .pipeline (transaction = False ) as p :
282- for (project_and_trace , parent_span_id ), subsegment in batch :
317+ for (project_and_trace , parent_span_id ), salt , subsegment in batch :
283318 set_members = self ._prepare_payloads (subsegment )
284319 payload_key = self ._get_payload_key (project_and_trace , parent_span_id )
320+ if enforce_segment_size :
321+ payload_key = self ._get_payload_key (project_and_trace , salt )
285322 p .sadd (payload_key , * set_members )
286323 p .expire (payload_key , redis_ttl )
287324
@@ -296,7 +333,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
296333 results : list [Any ] = []
297334 for batch in tree_batches :
298335 with self .client .pipeline (transaction = False ) as p :
299- for (project_and_trace , parent_span_id ), subsegment in batch :
336+ for (project_and_trace , parent_span_id ), salt , subsegment in batch :
300337 byte_count = sum (len (span .payload ) for span in subsegment )
301338
302339 try :
@@ -323,6 +360,8 @@ def process_spans(self, spans: Sequence[Span], now: int):
323360 is_segment_span ,
324361 redis_ttl ,
325362 byte_count ,
363+ max_segment_bytes ,
364+ salt if enforce_segment_size else "" ,
326365 * span_ids ,
327366 )
328367
@@ -331,7 +370,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
331370 # All spans in a subsegment share the same trace_id,
332371 # so they all came from the same Kafka partition.
333372 partition = subsegment [0 ].partition
334- result_meta .append ((project_and_trace , parent_span_id , partition ))
373+ result_meta .append ((project_and_trace , parent_span_id , partition , salt ))
335374
336375 results .extend (p .execute ())
337376
@@ -349,7 +388,9 @@ def process_spans(self, spans: Sequence[Span], now: int):
349388
350389 assert len (result_meta ) == len (results )
351390
352- for (project_and_trace , parent_span_id , partition ), result in zip (result_meta , results ):
391+ for (project_and_trace , parent_span_id , partition , salt ), result in zip (
392+ result_meta , results
393+ ):
353394 (
354395 segment_key ,
355396 has_root_span ,
@@ -402,9 +443,11 @@ def process_spans(self, spans: Sequence[Span], now: int):
402443
403444 subsegment_spans = trees [project_and_trace , parent_span_id ]
404445 delete_set = queue_deletes .setdefault (queue_key , set ())
405- delete_set .update (
406- self ._get_span_key (project_and_trace , span .span_id ) for span in subsegment_spans
407- )
446+ if not segment_key .endswith (salt .encode ("ascii" )):
447+ delete_set .update (
448+ self ._get_span_key (project_and_trace , span .span_id )
449+ for span in subsegment_spans
450+ )
408451 delete_set .discard (segment_key )
409452
410453 for result in results :
0 commit comments