1- from dramatiq import set_broker
1+ from dramatiq import set_broker , set_encoder
22from dramatiq .brokers .stub import StubBroker
33from dramatiq .brokers .redis import RedisBroker
4+ from dramatiq .encoder import Encoder , DecodeError , MessageData
45from dramatiq .middleware import AsyncIO
56from opentelemetry_instrumentor_dramatiq import DramatiqInstrumentor
6-
7+ import orjson
78from .config import AppConfig
89
910
11+ class ORJSONEncoder (Encoder ):
12+ """Encodes messages as JSON. orjson is much faster than json.
13+ """
14+
15+ def encode (self , data : MessageData ) -> bytes :
16+ return orjson .dumps (data )
17+
18+ def decode (self , data : bytes ) -> MessageData :
19+ try :
20+ return orjson .loads (data )
21+ except orjson .JSONDecodeError as e :
22+ raise DecodeError ("failed to decode message %r" % (data ,), data , e ) from None
23+
24+
1025def init_dramatiq (config : AppConfig ):
1126 DramatiqInstrumentor ().instrument ()
1227 if config .ENVIRONMENT == "test" :
@@ -18,3 +33,4 @@ def init_dramatiq(config: AppConfig):
1833 raise RuntimeError ("Running a non-test environment without Redis URL set" )
1934 broker .add_middleware (AsyncIO ())
2035 set_broker (broker )
36+ set_encoder (ORJSONEncoder ())
0 commit comments