-
Notifications
You must be signed in to change notification settings - Fork 2
Realtime Push Notification Strategy
in order to update different parts of the app on any new events affected by db updates I've used actor design patterns and TCP based pubsub pattern like so:
both publisher/subscriber actor can be a server or client or both be in a server or a client each of which publishes/subscribes to new event data to/from a pubsub channel and basically they're the very first strucutre of realtime streaming over a TCP based source like tokio tcp, redis, actix websocket and tonic gRPC that can be done by iterating constantly over the opned socket stream using
while let Some(...)syntax inside atokio::spawn()to accept inomcing data using the subscriber or send data using the publisher in form of utf8 bytes which can be packed into adata: Protobuf,payload: Multipart,stream: Payload.
a publisher can be local based sender like an
mpscchannel or an actor publisher (actor worker threadpool usesmpscjobq channel andtokio::spawn()behind the scence), a TCP based sender like the tokio tcp stream writer, tonic gRPC stream writer service or a redis based sender to publish new event data to a channel.
this actor will be called whenever a new event data want to be broadcasted to other parts of the app, in these situations we'll publish new event data to a redis pubsub channel constantly until a subscriber receives it. I'm using actix-redis crate to do so.
a subscriber streamer can be local based receiver like an
mpscchannel or an actor subscriber (actor worker threadpool usesmpscjobq channel andtokio::spawn()behind the scence), a TCP based receiver like the tokio tcp listener, tonic gRPC stream listener service, a websocket based receiver likeStreamHandlertrait which will be used to receive and stream overstream: Payloaddata coming from a websocket client or a redis based receiver to subscribe to a channel to receive new event data in form ofString.
this actor must be started in a place where the HTTP server is being started and passed as a safe shared data to the HTTP app_data() method so we can extract it and send message to it to receive new event data as a notification. basically inside the started() method of this actor an interval will be executed using ctx.run_interval(SUBSCRIPTION_INTERVAL_DURATION, |actor, ctx|{}) in which an async method inside of it is being executed periodically inside a tokio::spawn() threadpool, the codes inside that async method must be the logic of constant subscription of redis pubsub channel related to new event data using while let Some(...) syntax, once we receive the data we can update db records, store it in redis as key/value, update the state of the actor itself or notify other actors with newly event data using actix-broker crate which allows actors to talk to each other using a topic based local pubsub pattern. I've used redis to store the newly event data as a pair of key/value then later inside the get notifications HTTP or websocket api I'll read the value using a unique identifier like user_id and respond the caller with that.
basically in order to share data between threads and different parts of the app first the data must be of type
Arc<Mutex<Data>> + Send + Sync + 'staticto avoid deadlocks and race conditions and secondly we should use a jobq likempscto share it between threads safely, this is what actor workers based listeners like actix and stream handlers like tcp, redis, gRPC and websocket are doing behind the scene.
there must be either a websocket route or HTTP api which can be invoked to send message to the subscriber actor to receive new event data related to the passed in client id (user_id) or retrieve the new event data value from redis by passing the user_id as the key, in the case of HTTP api, the api must be invoked periodically from the client to receive new event data constantly but since websockets are full-duplex connections there is no need to invoke the route as long as the connection is open. I've used an HTTP api to receive notifications since it uses less traffics and server loads!
note that in the case of websocket logic server actor must send the received new event data to the related ws session actor.