44from arroyo .backends .kafka import KafkaPayload
55from arroyo .types import BrokerValue , Message , Partition
66from arroyo .types import Topic as ArroyoTopic
7+ from sentry_kafka_schemas .codecs import Codec
8+ from sentry_kafka_schemas .schema_types .snuba_spans_v1 import SpanEvent
79
8- from sentry .conf .types .kafka_definition import Topic
10+ from sentry .conf .types .kafka_definition import Topic , get_topic_codec
911from sentry .spans .consumers .process_segments .factory import DetectPerformanceIssuesStrategyFactory
1012from sentry .testutils .helpers .options import override_options
1113from sentry .utils import json
1214from sentry .utils .kafka_config import get_topic_definition
1315from tests .sentry .spans .consumers .process .test_factory import build_mock_span
1416
17+ SNUBA_SPANS_CODEC : Codec [SpanEvent ] = get_topic_codec (Topic .SNUBA_SPANS )
18+
1519
1620def build_mock_message (data , topic = None ):
1721 message = mock .Mock ()
@@ -26,58 +30,66 @@ def build_mock_message(data, topic=None):
2630 "standalone-spans.process-segments-consumer.enable" : True ,
2731 }
2832)
29- @mock .patch ("sentry.spans.consumers.process_segments.factory.process_segment" )
33+ @mock .patch (
34+ "sentry.spans.consumers.process_segments.factory.process_segment" , side_effect = lambda x : x
35+ )
3036def test_segment_deserialized_correctly (mock_process_segment ):
3137 topic = ArroyoTopic (get_topic_definition (Topic .BUFFERED_SEGMENTS )["real_topic_name" ])
3238 partition_1 = Partition (topic , 0 )
3339 partition_2 = Partition (topic , 1 )
3440 mock_commit = mock .Mock ()
35- strategy = DetectPerformanceIssuesStrategyFactory (
41+ factory = DetectPerformanceIssuesStrategyFactory (
3642 num_processes = 2 ,
3743 input_block_size = 1 ,
3844 max_batch_size = 2 ,
3945 max_batch_time = 1 ,
4046 output_block_size = 1 ,
41- ).create_with_partitions (
42- commit = mock_commit ,
43- partitions = {},
4447 )
4548
46- span_data = build_mock_span (project_id = 1 , is_segment = True )
47- segment_data = {"spans" : [span_data ]}
48- message = build_mock_message (segment_data , topic )
49+ with mock .patch .object (factory , "producer" , new = mock .Mock ()) as mock_producer :
50+ strategy = factory .create_with_partitions (
51+ commit = mock_commit ,
52+ partitions = {},
53+ )
54+
55+ span_data = build_mock_span (project_id = 1 , is_segment = True )
56+ segment_data = {"spans" : [span_data ]}
57+ message = build_mock_message (segment_data , topic )
4958
50- strategy .submit (
51- Message (
52- BrokerValue (
53- KafkaPayload (b"key" , message .value ().encode ("utf-8" ), []),
54- partition_1 ,
55- 1 ,
56- datetime .now (),
59+ strategy .submit (
60+ Message (
61+ BrokerValue (
62+ KafkaPayload (b"key" , message .value ().encode ("utf-8" ), []),
63+ partition_1 ,
64+ 1 ,
65+ datetime .now (),
66+ )
5767 )
5868 )
59- )
6069
61- strategy .submit (
62- Message (
63- BrokerValue (
64- KafkaPayload (b"key" , message .value ().encode ("utf-8" ), []),
65- partition_2 ,
66- 1 ,
67- datetime .now (),
70+ strategy .submit (
71+ Message (
72+ BrokerValue (
73+ KafkaPayload (b"key" , message .value ().encode ("utf-8" ), []),
74+ partition_2 ,
75+ 1 ,
76+ datetime .now (),
77+ )
6878 )
6979 )
70- )
7180
72- calls = [
73- mock .call ({partition_1 : 2 }),
74- mock .call ({partition_2 : 2 }),
75- ]
81+ calls = [
82+ mock .call ({partition_1 : 2 }),
83+ mock .call ({partition_2 : 2 }),
84+ ]
7685
77- mock_commit .assert_has_calls (calls = calls , any_order = True )
86+ strategy .poll ()
87+ strategy .join (1 )
88+ strategy .terminate ()
7889
79- strategy .poll ()
80- strategy .join (1 )
81- strategy .terminate ()
90+ mock_commit .assert_has_calls (calls = calls , any_order = True )
91+ assert mock_process_segment .call_args .args [0 ] == segment_data ["spans" ]
8292
83- assert mock_process_segment .call_args .args [0 ] == segment_data ["spans" ]
93+ assert mock_producer .produce .call_count == 2
94+ assert mock_producer .produce .call_args .args [0 ] == ArroyoTopic ("snuba-spans" )
95+ SNUBA_SPANS_CODEC .decode (mock_producer .produce .call_args .args [1 ].value )
0 commit comments