@@ -14,7 +14,7 @@ is received.
1414This implies that it has to operate according to these steps:
1515
16161. Identify the highest level known span for a subsegment.
17- 2. Merge subsegments when a common parent is found.
17+ 2. Update the member-keys index and counters when a common parent is found.
18183. Update the redirect set to reflect the current state of the tree.
1919
2020
@@ -26,14 +26,11 @@ ARGS:
2626- parent_span_id -- str -- The parent span id of the root of the subsegment.
2727- has_root_span -- "true" or "false" -- Whether the subsegment contains the root of the segment.
2828- set_timeout -- int
29- - max_segment_bytes -- int -- The maximum number of bytes the segment can contain.
3029- byte_count -- int -- The total number of bytes in the subsegment.
31- - write_distributed_payloads -- "true" or "false" -- When true, maintain member-keys tracking sets for distributed payload keys.
32- - write_merged_payloads -- "true" or "false" -- When false, skip set merges and set keys expire cmds.
3330- *span_id -- str[] -- The span ids in the subsegment.
3431
3532RETURNS:
36- - set_key -- str -- The Redis key of the segment this subsegment was merged into .
33+ - set_key -- str -- The key of the segment, used to look up member-keys index and identify the segment in the queue .
3734- has_root_span -- bool -- Whether this segment contains a root span.
3835- latency_ms -- number -- Milliseconds elapsed during script execution.
3936- latency_table -- table -- Per-step latency measurements.
@@ -47,11 +44,8 @@ local num_spans = ARGV[1]
4744local parent_span_id = ARGV [2 ]
4845local has_root_span = ARGV [3 ] == " true"
4946local set_timeout = tonumber (ARGV [4 ])
50- local max_segment_bytes = tonumber (ARGV [5 ])
51- local byte_count = tonumber (ARGV [6 ])
52- local write_distributed_payloads = ARGV [7 ] == " true"
53- local write_merged_payloads = ARGV [8 ] == " true"
54- local NUM_ARGS = 8
47+ local byte_count = tonumber (ARGV [5 ])
48+ local NUM_ARGS = 5
5549
5650local function get_time_ms ()
5751 local time = redis .call (" TIME" )
@@ -82,11 +76,7 @@ local latency_table = {}
8276local metrics_table = {}
8377table.insert (metrics_table , {" redirect_table_size" , redis .call (" hlen" , main_redirect_key )})
8478table.insert (metrics_table , {" redirect_depth" , redirect_depth })
85- local redirect_end_time_ms = get_time_ms ()
86- table.insert (latency_table , {" redirect_step_latency_ms" , redirect_end_time_ms - start_time_ms })
87-
8879local set_key = string.format (" span-buf:s:{%s}:%s" , project_and_trace , set_span_id )
89- local parent_key = string.format (" span-buf:s:{%s}:%s" , project_and_trace , parent_span_id )
9080
9181-- Reset the set expiry as we saw a new subsegment for this set
9282local has_root_span_key = string.format (" span-buf:hrs:%s" , set_key )
@@ -96,201 +86,99 @@ if has_root_span then
9686end
9787
9888local hset_args = {}
99- local sunionstore_args = {}
100-
101- -- Merge the subsegment into the segment we are assembling.
102- -- Merging the spans (`sunionstore_args`) is needed to compose the payloads in
103- -- the same segment for them to be flushed later.
104- -- Updating the redirect set instead is needed when we receive higher level spans
105- -- for a tree we are assembling as the segment root each span points at in the
106- -- redirect set changes when a new root is found.
107- if write_merged_payloads and set_span_id ~= parent_span_id and redis .call (" scard" , parent_key ) > 0 then
108- table.insert (sunionstore_args , parent_key )
109- end
11089
11190for i = NUM_ARGS + 1 , NUM_ARGS + num_spans do
11291 local span_id = ARGV [i ]
113- local is_root_span = span_id == parent_span_id
11492
11593 table.insert (hset_args , span_id )
11694 table.insert (hset_args , set_span_id )
117-
118- if not is_root_span and write_merged_payloads then
119- local span_key = string.format (" span-buf:s:{%s}:%s" , project_and_trace , span_id )
120- table.insert (sunionstore_args , span_key )
121- end
12295end
12396
12497redis .call (" hset" , main_redirect_key , unpack (hset_args ))
12598redis .call (" expire" , main_redirect_key , set_timeout )
12699
127- local sunionstore_args_end_time_ms = get_time_ms ()
128- table.insert (latency_table , {" sunionstore_args_step_latency_ms " , sunionstore_args_end_time_ms - redirect_end_time_ms })
100+ local redirect_end_time_ms = get_time_ms ()
101+ table.insert (latency_table , {" redirect_step_latency_ms " , redirect_end_time_ms - start_time_ms })
129102
130- -- Merge spans into the parent span set.
131- -- Used outside the if statement
132- local arg_cleanup_end_time_ms = sunionstore_args_end_time_ms
133103-- Maintain member-keys (span-buf:mk) tracking sets so the flusher
134- -- knows which distributed keys to fetch. This runs in both write-only and
135- -- full distributed mode.
136- if write_distributed_payloads then
137- local member_keys_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , set_span_id )
138- redis .call (" sadd" , member_keys_key , parent_span_id )
139-
140- -- Merge child tracking sets from span_ids that were previously segment roots.
141- for i = NUM_ARGS + 1 , NUM_ARGS + num_spans do
142- local span_id = ARGV [i ]
143- if span_id ~= parent_span_id then
144- local child_mk_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , span_id )
145- local child_members = redis .call (" smembers" , child_mk_key )
146- if # child_members > 0 then
147- redis .call (" sadd" , member_keys_key , unpack (child_members ))
148- redis .call (" del" , child_mk_key )
149- end
150- end
151- end
104+ -- knows which payload keys to fetch.
105+ local member_keys_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , set_span_id )
106+ redis .call (" sadd" , member_keys_key , parent_span_id )
152107
153- -- Merge parent's tracking set if parent_span_id redirected to a different root.
154- if set_span_id ~= parent_span_id then
155- local parent_mk_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , parent_span_id )
156- local parent_members = redis .call (" smembers" , parent_mk_key )
157- if # parent_members > 0 then
158- redis .call (" sadd" , member_keys_key , unpack (parent_members ))
159- redis .call (" del" , parent_mk_key )
108+ -- Merge child tracking sets from span_ids that were previously segment roots.
109+ for i = NUM_ARGS + 1 , NUM_ARGS + num_spans do
110+ local span_id = ARGV [i ]
111+ if span_id ~= parent_span_id then
112+ local child_mk_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , span_id )
113+ local child_members = redis .call (" smembers" , child_mk_key )
114+ if # child_members > 0 then
115+ redis .call (" sadd" , member_keys_key , unpack (child_members ))
116+ redis .call (" del" , child_mk_key )
160117 end
161118 end
162-
163- redis .call (" expire" , member_keys_key , set_timeout )
164- arg_cleanup_end_time_ms = get_time_ms ()
165- table.insert (latency_table , {" distributed_tracking_step_latency_ms" , arg_cleanup_end_time_ms - sunionstore_args_end_time_ms })
166119end
167120
168- -- When write_merged_payloads is false, merged set merges are skipped but we
169- -- still need to merge ic/ibc counters from child keys into the segment root.
170- if not write_merged_payloads then
171- local ingested_count_key = string.format (" span-buf:ic:%s" , set_key )
172- local ingested_byte_count_key = string.format (" span-buf:ibc:%s" , set_key )
173- for i = NUM_ARGS + 1 , NUM_ARGS + num_spans do
174- local span_id = ARGV [i ]
175- if span_id ~= parent_span_id then
176- local child_merged = string.format (" span-buf:s:{%s}:%s" , project_and_trace , span_id )
177- local child_ic_key = string.format (" span-buf:ic:%s" , child_merged )
178- local child_ibc_key = string.format (" span-buf:ibc:%s" , child_merged )
179- local child_count = redis .call (" get" , child_ic_key )
180- local child_byte_count = redis .call (" get" , child_ibc_key )
181- if child_count then
182- redis .call (" incrby" , ingested_count_key , child_count )
183- redis .call (" del" , child_ic_key )
184- end
185- if child_byte_count then
186- redis .call (" incrby" , ingested_byte_count_key , child_byte_count )
187- redis .call (" del" , child_ibc_key )
188- end
189- end
121+ -- Merge parent's tracking set if parent_span_id redirected to a different root.
122+ if set_span_id ~= parent_span_id then
123+ local parent_mk_key = string.format (" span-buf:mk:{%s}:%s" , project_and_trace , parent_span_id )
124+ local parent_members = redis .call (" smembers" , parent_mk_key )
125+ if # parent_members > 0 then
126+ redis .call (" sadd" , member_keys_key , unpack (parent_members ))
127+ redis .call (" del" , parent_mk_key )
190128 end
191- if set_span_id ~= parent_span_id then
192- local parent_merged = string.format (" span-buf:s:{%s}:%s" , project_and_trace , parent_span_id )
193- local parent_ic_key = string.format (" span-buf:ic:%s" , parent_merged )
194- local parent_ibc_key = string.format (" span-buf:ibc:%s" , parent_merged )
195- local parent_count = redis .call (" get" , parent_ic_key )
196- local parent_byte_count = redis .call (" get" , parent_ibc_key )
197- if parent_count then
198- redis .call (" incrby" , ingested_count_key , parent_count )
199- redis .call (" del" , parent_ic_key )
129+ end
130+
131+ redis .call (" expire" , member_keys_key , set_timeout )
132+ local merge_payload_keys_end_time_ms = get_time_ms ()
133+ table.insert (latency_table , {" merge_payload_keys_step_latency_ms" , merge_payload_keys_end_time_ms - redirect_end_time_ms })
134+
135+ -- Merge ic/ibc counters from child keys into the segment root.
136+ local ingested_count_key = string.format (" span-buf:ic:%s" , set_key )
137+ local ingested_byte_count_key = string.format (" span-buf:ibc:%s" , set_key )
138+ for i = NUM_ARGS + 1 , NUM_ARGS + num_spans do
139+ local span_id = ARGV [i ]
140+ if span_id ~= parent_span_id then
141+ local child_merged = string.format (" span-buf:s:{%s}:%s" , project_and_trace , span_id )
142+ local child_ic_key = string.format (" span-buf:ic:%s" , child_merged )
143+ local child_ibc_key = string.format (" span-buf:ibc:%s" , child_merged )
144+ local child_count = redis .call (" get" , child_ic_key )
145+ local child_byte_count = redis .call (" get" , child_ibc_key )
146+ if child_count then
147+ redis .call (" incrby" , ingested_count_key , child_count )
148+ redis .call (" del" , child_ic_key )
200149 end
201- if parent_byte_count then
202- redis .call (" incrby" , ingested_byte_count_key , parent_byte_count )
203- redis .call (" del" , parent_ibc_key )
150+ if child_byte_count then
151+ redis .call (" incrby" , ingested_byte_count_key , child_byte_count )
152+ redis .call (" del" , child_ibc_key )
204153 end
205154 end
206- arg_cleanup_end_time_ms = get_time_ms ()
207- table.insert (latency_table , {" distributed_ibc_merge_step_latency_ms" , arg_cleanup_end_time_ms - sunionstore_args_end_time_ms })
208-
209- elseif # sunionstore_args > 0 then
210- local ingested_byte_count_key = string.format (" span-buf:ibc:%s" , set_key )
211- local dest_bytes = tonumber (redis .call (" get" , ingested_byte_count_key ) or 0 )
212-
213- local already_oversized = dest_bytes > max_segment_bytes
214- table.insert (metrics_table , {" parent_span_set_already_oversized" , already_oversized and 1 or 0 })
215-
216- local start_output_size = redis .call (" scard" , set_key )
217- local scard_end_time_ms = get_time_ms ()
218- table.insert (latency_table , {" scard_step_latency_ms" , scard_end_time_ms - sunionstore_args_end_time_ms })
219-
220- local output_size
221- if already_oversized then
222- -- Dest already exceeds max_segment_bytes, skip merge entirely.
223- output_size = start_output_size
224- else
225- -- Read each source set and SADD its members into dest.
226- -- We use SMEMBERS+SADD instead of SUNIONSTORE because SUNIONSTORE
227- -- re-serialises the entire destination set on every call, making it
228- -- O(destination + sources). SMEMBERS+SADD is O(sources) only, which
229- -- is significantly cheaper when the destination set is large.
230- local all_members = {}
231- for i = 1 , # sunionstore_args do
232- local members = redis .call (" smembers" , sunionstore_args [i ])
233- for j = 1 , # members do
234- all_members [# all_members + 1 ] = members [j ]
235- end
236- end
237- table.insert (metrics_table , {" zero_copy_dest_source_members" , # all_members })
238- -- Batch SADD in chunks to avoid exceeding Lua's unpack() stack limit.
239- local BATCH = 7000
240- for i = 1 , # all_members , BATCH do
241- local last = math.min (i + BATCH - 1 , # all_members )
242- redis .call (" sadd" , set_key , unpack (all_members , i , last ))
243- end
244- output_size = redis .call (" scard" , set_key )
155+ end
156+ if set_span_id ~= parent_span_id then
157+ local parent_merged = string.format (" span-buf:s:{%s}:%s" , project_and_trace , parent_span_id )
158+ local parent_ic_key = string.format (" span-buf:ic:%s" , parent_merged )
159+ local parent_ibc_key = string.format (" span-buf:ibc:%s" , parent_merged )
160+ local parent_count = redis .call (" get" , parent_ic_key )
161+ local parent_byte_count = redis .call (" get" , parent_ibc_key )
162+ if parent_count then
163+ redis .call (" incrby" , ingested_count_key , parent_count )
164+ redis .call (" del" , parent_ic_key )
245165 end
246- local sunionstore_end_time_ms = get_time_ms ()
247- table.insert (latency_table , {" sunionstore_step_latency_ms" , sunionstore_end_time_ms - scard_end_time_ms })
248-
249- redis .call (" unlink" , unpack (sunionstore_args ))
250- local unlink_end_time_ms = get_time_ms ()
251- table.insert (latency_table , {" unlink_step_latency_ms" , unlink_end_time_ms - sunionstore_end_time_ms })
252-
253- table.insert (metrics_table , {" parent_span_set_before_size" , start_output_size })
254- table.insert (metrics_table , {" parent_span_set_after_size" , output_size })
255- table.insert (metrics_table , {" elements_added" , output_size - start_output_size })
256-
257- -- Merge ingested count keys for merged spans
258- local ingested_count_key = string.format (" span-buf:ic:%s" , set_key )
259- for i = 1 , # sunionstore_args do
260- local merged_key = sunionstore_args [i ]
261- local merged_ic_key = string.format (" span-buf:ic:%s" , merged_key )
262- local merged_ibc_key = string.format (" span-buf:ibc:%s" , merged_key )
263- local merged_count = redis .call (" get" , merged_ic_key )
264- local merged_byte_count = redis .call (" get" , merged_ibc_key )
265- if merged_count then
266- redis .call (" incrby" , ingested_count_key , merged_count )
267- end
268- if merged_byte_count then
269- redis .call (" incrby" , ingested_byte_count_key , merged_byte_count )
270- end
271- redis .call (" del" , merged_ic_key )
272- redis .call (" del" , merged_ibc_key )
166+ if parent_byte_count then
167+ redis .call (" incrby" , ingested_byte_count_key , parent_byte_count )
168+ redis .call (" del" , parent_ibc_key )
273169 end
274-
275- arg_cleanup_end_time_ms = get_time_ms ()
276- table.insert (latency_table , {" arg_cleanup_step_latency_ms" , arg_cleanup_end_time_ms - unlink_end_time_ms })
277170end
278-
171+ local counter_merge_end_time_ms = get_time_ms ()
172+ table.insert (latency_table , {" counter_merge_step_latency_ms" , counter_merge_end_time_ms - merge_payload_keys_end_time_ms })
279173
280174-- Track total number of spans ingested for this segment
281- local ingested_count_key = string.format (" span-buf:ic:%s" , set_key )
282- local ingested_byte_count_key = string.format (" span-buf:ibc:%s" , set_key )
283175redis .call (" incrby" , ingested_count_key , num_spans )
284176redis .call (" incrby" , ingested_byte_count_key , byte_count )
285177redis .call (" expire" , ingested_count_key , set_timeout )
286178redis .call (" expire" , ingested_byte_count_key , set_timeout )
287179
288- if write_merged_payloads then
289- redis .call (" expire" , set_key , set_timeout )
290- end
291-
292180local ingested_count_end_time_ms = get_time_ms ()
293- local ingested_count_step_latency_ms = ingested_count_end_time_ms - arg_cleanup_end_time_ms
181+ local ingested_count_step_latency_ms = ingested_count_end_time_ms - counter_merge_end_time_ms
294182table.insert (latency_table , {" ingested_count_step_latency_ms" , ingested_count_step_latency_ms })
295183
296184-- Capture end time and calculate latency in milliseconds
0 commit comments