Skip to content

Commit e6b2d22

Browse files
committed
ref(spans): Cleanup distributed payload flags and add docs
1 parent 4cf2eaf commit e6b2d22

File tree

7 files changed

+192
-474
lines changed

7 files changed

+192
-474
lines changed

src/sentry/options/defaults.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3308,27 +3308,6 @@
33083308
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
33093309
)
33103310

3311-
# Write payload sets to per-span distributed keys AND merged keys.
3312-
# Flusher reads merged keys as before.
3313-
register(
3314-
"spans.buffer.write-distributed-payloads",
3315-
default=False,
3316-
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
3317-
)
3318-
# Switch flusher to read from distributed keys instead of merged.
3319-
register(
3320-
"spans.buffer.read-distributed-payloads",
3321-
default=False,
3322-
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
3323-
)
3324-
# Set to False to stop writing merged keys and skip set merges.
3325-
# Disable after read-distributed-payloads is stable. Rollback: re-enable
3326-
# this flag to resume merged writes before reverting read-distributed-payloads.
3327-
register(
3328-
"spans.buffer.write-merged-payloads",
3329-
default=True,
3330-
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
3331-
)
33323311
# List of trace_ids to enable debug logging for. Empty = debug off.
33333312
# When set, logs detailed metrics about zunionstore set sizes, key existence, and trace structure.
33343313
register(

src/sentry/scripts/spans/add-buffer.lua

Lines changed: 64 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ is received.
1414
This implies that it has to operate according to these steps:
1515
1616
1. Identify the highest level known span for a subsegment.
17-
2. Merge subsegments when a common parent is found.
17+
2. Update the member-keys index and counters when a common parent is found.
1818
3. Update the redirect set to reflect the current state of the tree.
1919
2020
@@ -26,14 +26,11 @@ ARGS:
2626
- parent_span_id -- str -- The parent span id of the root of the subsegment.
2727
- has_root_span -- "true" or "false" -- Whether the subsegment contains the root of the segment.
2828
- set_timeout -- int
29-
- max_segment_bytes -- int -- The maximum number of bytes the segment can contain.
3029
- byte_count -- int -- The total number of bytes in the subsegment.
31-
- write_distributed_payloads -- "true" or "false" -- When true, maintain member-keys tracking sets for distributed payload keys.
32-
- write_merged_payloads -- "true" or "false" -- When false, skip set merges and set keys expire cmds.
3330
- *span_id -- str[] -- The span ids in the subsegment.
3431
3532
RETURNS:
36-
- set_key -- str -- The Redis key of the segment this subsegment was merged into.
33+
- set_key -- str -- The key of the segment, used to look up member-keys index and identify the segment in the queue.
3734
- has_root_span -- bool -- Whether this segment contains a root span.
3835
- latency_ms -- number -- Milliseconds elapsed during script execution.
3936
- latency_table -- table -- Per-step latency measurements.
@@ -47,11 +44,8 @@ local num_spans = ARGV[1]
4744
local parent_span_id = ARGV[2]
4845
local has_root_span = ARGV[3] == "true"
4946
local set_timeout = tonumber(ARGV[4])
50-
local max_segment_bytes = tonumber(ARGV[5])
51-
local byte_count = tonumber(ARGV[6])
52-
local write_distributed_payloads = ARGV[7] == "true"
53-
local write_merged_payloads = ARGV[8] == "true"
54-
local NUM_ARGS = 8
47+
local byte_count = tonumber(ARGV[5])
48+
local NUM_ARGS = 5
5549

5650
local function get_time_ms()
5751
local time = redis.call("TIME")
@@ -82,11 +76,7 @@ local latency_table = {}
8276
local metrics_table = {}
8377
table.insert(metrics_table, {"redirect_table_size", redis.call("hlen", main_redirect_key)})
8478
table.insert(metrics_table, {"redirect_depth", redirect_depth})
85-
local redirect_end_time_ms = get_time_ms()
86-
table.insert(latency_table, {"redirect_step_latency_ms", redirect_end_time_ms - start_time_ms})
87-
8879
local set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, set_span_id)
89-
local parent_key = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id)
9080

9181
-- Reset the set expiry as we saw a new subsegment for this set
9282
local has_root_span_key = string.format("span-buf:hrs:%s", set_key)
@@ -96,201 +86,99 @@ if has_root_span then
9686
end
9787

9888
local hset_args = {}
99-
local sunionstore_args = {}
100-
101-
-- Merge the subsegment into the segment we are assembling.
102-
-- Merging the spans (`sunionstore_args`) is needed to compose the payloads in
103-
-- the same segment for them to be flushed later.
104-
-- Updating the redirect set instead is needed when we receive higher level spans
105-
-- for a tree we are assembling as the segment root each span points at in the
106-
-- redirect set changes when a new root is found.
107-
if write_merged_payloads and set_span_id ~= parent_span_id and redis.call("scard", parent_key) > 0 then
108-
table.insert(sunionstore_args, parent_key)
109-
end
11089

11190
for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
11291
local span_id = ARGV[i]
113-
local is_root_span = span_id == parent_span_id
11492

11593
table.insert(hset_args, span_id)
11694
table.insert(hset_args, set_span_id)
117-
118-
if not is_root_span and write_merged_payloads then
119-
local span_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id)
120-
table.insert(sunionstore_args, span_key)
121-
end
12295
end
12396

12497
redis.call("hset", main_redirect_key, unpack(hset_args))
12598
redis.call("expire", main_redirect_key, set_timeout)
12699

127-
local sunionstore_args_end_time_ms = get_time_ms()
128-
table.insert(latency_table, {"sunionstore_args_step_latency_ms", sunionstore_args_end_time_ms - redirect_end_time_ms})
100+
local redirect_end_time_ms = get_time_ms()
101+
table.insert(latency_table, {"redirect_step_latency_ms", redirect_end_time_ms - start_time_ms})
129102

130-
-- Merge spans into the parent span set.
131-
-- Used outside the if statement
132-
local arg_cleanup_end_time_ms = sunionstore_args_end_time_ms
133103
-- Maintain member-keys (span-buf:mk) tracking sets so the flusher
134-
-- knows which distributed keys to fetch. This runs in both write-only and
135-
-- full distributed mode.
136-
if write_distributed_payloads then
137-
local member_keys_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id)
138-
redis.call("sadd", member_keys_key, parent_span_id)
139-
140-
-- Merge child tracking sets from span_ids that were previously segment roots.
141-
for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
142-
local span_id = ARGV[i]
143-
if span_id ~= parent_span_id then
144-
local child_mk_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, span_id)
145-
local child_members = redis.call("smembers", child_mk_key)
146-
if #child_members > 0 then
147-
redis.call("sadd", member_keys_key, unpack(child_members))
148-
redis.call("del", child_mk_key)
149-
end
150-
end
151-
end
104+
-- knows which payload keys to fetch.
105+
local member_keys_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, set_span_id)
106+
redis.call("sadd", member_keys_key, parent_span_id)
152107

153-
-- Merge parent's tracking set if parent_span_id redirected to a different root.
154-
if set_span_id ~= parent_span_id then
155-
local parent_mk_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, parent_span_id)
156-
local parent_members = redis.call("smembers", parent_mk_key)
157-
if #parent_members > 0 then
158-
redis.call("sadd", member_keys_key, unpack(parent_members))
159-
redis.call("del", parent_mk_key)
108+
-- Merge child tracking sets from span_ids that were previously segment roots.
109+
for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
110+
local span_id = ARGV[i]
111+
if span_id ~= parent_span_id then
112+
local child_mk_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, span_id)
113+
local child_members = redis.call("smembers", child_mk_key)
114+
if #child_members > 0 then
115+
redis.call("sadd", member_keys_key, unpack(child_members))
116+
redis.call("del", child_mk_key)
160117
end
161118
end
162-
163-
redis.call("expire", member_keys_key, set_timeout)
164-
arg_cleanup_end_time_ms = get_time_ms()
165-
table.insert(latency_table, {"distributed_tracking_step_latency_ms", arg_cleanup_end_time_ms - sunionstore_args_end_time_ms})
166119
end
167120

168-
-- When write_merged_payloads is false, merged set merges are skipped but we
169-
-- still need to merge ic/ibc counters from child keys into the segment root.
170-
if not write_merged_payloads then
171-
local ingested_count_key = string.format("span-buf:ic:%s", set_key)
172-
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
173-
for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
174-
local span_id = ARGV[i]
175-
if span_id ~= parent_span_id then
176-
local child_merged = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id)
177-
local child_ic_key = string.format("span-buf:ic:%s", child_merged)
178-
local child_ibc_key = string.format("span-buf:ibc:%s", child_merged)
179-
local child_count = redis.call("get", child_ic_key)
180-
local child_byte_count = redis.call("get", child_ibc_key)
181-
if child_count then
182-
redis.call("incrby", ingested_count_key, child_count)
183-
redis.call("del", child_ic_key)
184-
end
185-
if child_byte_count then
186-
redis.call("incrby", ingested_byte_count_key, child_byte_count)
187-
redis.call("del", child_ibc_key)
188-
end
189-
end
121+
-- Merge parent's tracking set if parent_span_id redirected to a different root.
122+
if set_span_id ~= parent_span_id then
123+
local parent_mk_key = string.format("span-buf:mk:{%s}:%s", project_and_trace, parent_span_id)
124+
local parent_members = redis.call("smembers", parent_mk_key)
125+
if #parent_members > 0 then
126+
redis.call("sadd", member_keys_key, unpack(parent_members))
127+
redis.call("del", parent_mk_key)
190128
end
191-
if set_span_id ~= parent_span_id then
192-
local parent_merged = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id)
193-
local parent_ic_key = string.format("span-buf:ic:%s", parent_merged)
194-
local parent_ibc_key = string.format("span-buf:ibc:%s", parent_merged)
195-
local parent_count = redis.call("get", parent_ic_key)
196-
local parent_byte_count = redis.call("get", parent_ibc_key)
197-
if parent_count then
198-
redis.call("incrby", ingested_count_key, parent_count)
199-
redis.call("del", parent_ic_key)
129+
end
130+
131+
redis.call("expire", member_keys_key, set_timeout)
132+
local merge_payload_keys_end_time_ms = get_time_ms()
133+
table.insert(latency_table, {"merge_payload_keys_step_latency_ms", merge_payload_keys_end_time_ms - redirect_end_time_ms})
134+
135+
-- Merge ic/ibc counters from child keys into the segment root.
136+
local ingested_count_key = string.format("span-buf:ic:%s", set_key)
137+
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
138+
for i = NUM_ARGS + 1, NUM_ARGS + num_spans do
139+
local span_id = ARGV[i]
140+
if span_id ~= parent_span_id then
141+
local child_merged = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id)
142+
local child_ic_key = string.format("span-buf:ic:%s", child_merged)
143+
local child_ibc_key = string.format("span-buf:ibc:%s", child_merged)
144+
local child_count = redis.call("get", child_ic_key)
145+
local child_byte_count = redis.call("get", child_ibc_key)
146+
if child_count then
147+
redis.call("incrby", ingested_count_key, child_count)
148+
redis.call("del", child_ic_key)
200149
end
201-
if parent_byte_count then
202-
redis.call("incrby", ingested_byte_count_key, parent_byte_count)
203-
redis.call("del", parent_ibc_key)
150+
if child_byte_count then
151+
redis.call("incrby", ingested_byte_count_key, child_byte_count)
152+
redis.call("del", child_ibc_key)
204153
end
205154
end
206-
arg_cleanup_end_time_ms = get_time_ms()
207-
table.insert(latency_table, {"distributed_ibc_merge_step_latency_ms", arg_cleanup_end_time_ms - sunionstore_args_end_time_ms})
208-
209-
elseif #sunionstore_args > 0 then
210-
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
211-
local dest_bytes = tonumber(redis.call("get", ingested_byte_count_key) or 0)
212-
213-
local already_oversized = dest_bytes > max_segment_bytes
214-
table.insert(metrics_table, {"parent_span_set_already_oversized", already_oversized and 1 or 0})
215-
216-
local start_output_size = redis.call("scard", set_key)
217-
local scard_end_time_ms = get_time_ms()
218-
table.insert(latency_table, {"scard_step_latency_ms", scard_end_time_ms - sunionstore_args_end_time_ms})
219-
220-
local output_size
221-
if already_oversized then
222-
-- Dest already exceeds max_segment_bytes, skip merge entirely.
223-
output_size = start_output_size
224-
else
225-
-- Read each source set and SADD its members into dest.
226-
-- We use SMEMBERS+SADD instead of SUNIONSTORE because SUNIONSTORE
227-
-- re-serialises the entire destination set on every call, making it
228-
-- O(destination + sources). SMEMBERS+SADD is O(sources) only, which
229-
-- is significantly cheaper when the destination set is large.
230-
local all_members = {}
231-
for i = 1, #sunionstore_args do
232-
local members = redis.call("smembers", sunionstore_args[i])
233-
for j = 1, #members do
234-
all_members[#all_members + 1] = members[j]
235-
end
236-
end
237-
table.insert(metrics_table, {"zero_copy_dest_source_members", #all_members})
238-
-- Batch SADD in chunks to avoid exceeding Lua's unpack() stack limit.
239-
local BATCH = 7000
240-
for i = 1, #all_members, BATCH do
241-
local last = math.min(i + BATCH - 1, #all_members)
242-
redis.call("sadd", set_key, unpack(all_members, i, last))
243-
end
244-
output_size = redis.call("scard", set_key)
155+
end
156+
if set_span_id ~= parent_span_id then
157+
local parent_merged = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id)
158+
local parent_ic_key = string.format("span-buf:ic:%s", parent_merged)
159+
local parent_ibc_key = string.format("span-buf:ibc:%s", parent_merged)
160+
local parent_count = redis.call("get", parent_ic_key)
161+
local parent_byte_count = redis.call("get", parent_ibc_key)
162+
if parent_count then
163+
redis.call("incrby", ingested_count_key, parent_count)
164+
redis.call("del", parent_ic_key)
245165
end
246-
local sunionstore_end_time_ms = get_time_ms()
247-
table.insert(latency_table, {"sunionstore_step_latency_ms", sunionstore_end_time_ms - scard_end_time_ms})
248-
249-
redis.call("unlink", unpack(sunionstore_args))
250-
local unlink_end_time_ms = get_time_ms()
251-
table.insert(latency_table, {"unlink_step_latency_ms", unlink_end_time_ms - sunionstore_end_time_ms})
252-
253-
table.insert(metrics_table, {"parent_span_set_before_size", start_output_size})
254-
table.insert(metrics_table, {"parent_span_set_after_size", output_size})
255-
table.insert(metrics_table, {"elements_added", output_size - start_output_size})
256-
257-
-- Merge ingested count keys for merged spans
258-
local ingested_count_key = string.format("span-buf:ic:%s", set_key)
259-
for i = 1, #sunionstore_args do
260-
local merged_key = sunionstore_args[i]
261-
local merged_ic_key = string.format("span-buf:ic:%s", merged_key)
262-
local merged_ibc_key = string.format("span-buf:ibc:%s", merged_key)
263-
local merged_count = redis.call("get", merged_ic_key)
264-
local merged_byte_count = redis.call("get", merged_ibc_key)
265-
if merged_count then
266-
redis.call("incrby", ingested_count_key, merged_count)
267-
end
268-
if merged_byte_count then
269-
redis.call("incrby", ingested_byte_count_key, merged_byte_count)
270-
end
271-
redis.call("del", merged_ic_key)
272-
redis.call("del", merged_ibc_key)
166+
if parent_byte_count then
167+
redis.call("incrby", ingested_byte_count_key, parent_byte_count)
168+
redis.call("del", parent_ibc_key)
273169
end
274-
275-
arg_cleanup_end_time_ms = get_time_ms()
276-
table.insert(latency_table, {"arg_cleanup_step_latency_ms", arg_cleanup_end_time_ms - unlink_end_time_ms})
277170
end
278-
171+
local counter_merge_end_time_ms = get_time_ms()
172+
table.insert(latency_table, {"counter_merge_step_latency_ms", counter_merge_end_time_ms - merge_payload_keys_end_time_ms})
279173

280174
-- Track total number of spans ingested for this segment
281-
local ingested_count_key = string.format("span-buf:ic:%s", set_key)
282-
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
283175
redis.call("incrby", ingested_count_key, num_spans)
284176
redis.call("incrby", ingested_byte_count_key, byte_count)
285177
redis.call("expire", ingested_count_key, set_timeout)
286178
redis.call("expire", ingested_byte_count_key, set_timeout)
287179

288-
if write_merged_payloads then
289-
redis.call("expire", set_key, set_timeout)
290-
end
291-
292180
local ingested_count_end_time_ms = get_time_ms()
293-
local ingested_count_step_latency_ms = ingested_count_end_time_ms - arg_cleanup_end_time_ms
181+
local ingested_count_step_latency_ms = ingested_count_end_time_ms - counter_merge_end_time_ms
294182
table.insert(latency_table, {"ingested_count_step_latency_ms", ingested_count_step_latency_ms})
295183

296184
-- Capture end time and calculate latency in milliseconds

src/sentry/scripts/spans/done-flush-segment-data.lua

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
-- Conditionally delete segment data only if the ingested count hasn't changed.
1+
-- Conditionally delete segment metadata only if the ingested count hasn't changed.
22
-- This is atomic with add-buffer.lua on the same {project_id:trace_id} slot,
33
-- preventing data loss when process_spans adds new spans between flush and cleanup.
44
--
@@ -17,7 +17,6 @@ if ic and tonumber(ic) == expected_ic then
1717
redis.call("DEL", "span-buf:hrs:" .. segment_key)
1818
redis.call("DEL", ic_key)
1919
redis.call("DEL", "span-buf:ibc:" .. segment_key)
20-
redis.call("UNLINK", segment_key)
2120
return 1
2221
end
2322

0 commit comments

Comments
 (0)