@@ -55,11 +55,12 @@ def _process_message(message: Message[KafkaPayload]):
5555
5656def explode_segment (message : tuple [list [dict [str , Any ]], Mapping [Partition , int ]]):
5757 spans , committable = message
58- for span in spans :
58+ last = len (spans ) - 1
59+ for i , span in enumerate (spans ):
5960 if span is not None :
6061 yield Value (
6162 payload = KafkaPayload (key = None , value = orjson .dumps (span ), headers = []),
62- committable = committable ,
63+ committable = committable if i == last else None ,
6364 timestamp = None ,
6465 )
6566
@@ -96,9 +97,9 @@ def create_with_partitions(
9697 next_step = CommitOffsets (commit ),
9798 )
9899
99- # WORKAROUND: Since https://github.com/getsentry/arroyo/pull/371, Unfold
100- # no longer passes through the commit and there is no way to access it
101- # from the generator function.
100+ # XXX: Remove after https://github.com/getsentry/arroyo/pull/427: Unfold
101+ # does not pass through the commit and there is no way to access it from
102+ # the generator function.
102103 zip_commit = RunTask (
103104 function = lambda m : (m .payload , m .committable ),
104105 next_step = Unfold (generator = explode_segment , next_step = produce_step ),
0 commit comments