-
Notifications
You must be signed in to change notification settings - Fork 5
SSE Events SpringBoot
SSE or Server-Side-Event is a functionality for create a event listener in client side from server side.
Usually we send request to server and wait a response, in this case is reverse operation, we receive a request from server when event occurs,this event in our project is a rabbitMQ message reception.
For this purpose we have to use Spring ApplicationEvent events engine,when we read a rabbit message,send a event to controller.
Let's to see the reception message implementation for CustomerMsg Bean:
@Component
public class ListenerCustomer implements RabbitListener<CustomerMsg>{
private static final Logger LOGGER = LoggerFactory.getLogger(ListenerCustomer.class);
@Autowired
private EventsPublisher publisher;
@Override
public void receiveMessage(CustomerMsg message) {
try {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(message);
LOGGER.debug("Receive Nessage: \n"+json);
publisher.publishCustomer(message);
} catch (JsonProcessingException e) {
LOGGER.error("Error: ", e);
}
}
}EventsPublisher create an ApplicationEvent and publish a object,we get it via autowired, and we publish a object when receive a rabbitMQ message.Method receiveMessage is configured for listen customer messages.
Go to see Controller:
@Controller
public class StreamCtrl {
private List<SseEmitter> lsEmitters = new ArrayList<SseEmitter>();
private static final Logger LOGGER = LoggerFactory.getLogger(StreamCtrl.class);
@RequestMapping("/stream.action")
public SseEmitter stream()
{
SseEmitter emitter = new SseEmitter();
lsEmitters.add(emitter);
emitter.onCompletion(()->lsEmitters.remove(emitter));
emitter.onTimeout(()->lsEmitters.remove(emitter));
return emitter;
}
@EventListener({CustomerEvent.class})
public void handleCustomerEvt(CustomerEvent evt)
{
System.out.println("EVENT RECEIVED: "+evt.getMsg().getCustName());
List<SseEmitter> deadEmitters = new ArrayList<SseEmitter>();
this.lsEmitters.forEach(emitter -> {
try {
emitter.send(evt.getMsg());
}
catch (Exception e) {
LOGGER.error("Error ",e);
deadEmitters.add(emitter);
}
});
this.lsEmitters.removeAll(deadEmitters);
}
}For listen ApplicationEvents have the @EventListener annotation, if not define parameters,listen all events,but we only want to listen a type of event,then we set the Bean that we want to receive.
For send a SSE event, the client side have to declare a EventSource object and configure it:
const eventSource = new EventSource('/stream.action');
eventSource.onmessage = e => {
const msg = JSON.parse(e.data);
// do something...
}
eventSource.onopen = e => console.log('open');
eventSource.onerror = e => {
if (e.readyState == EventSource.CLOSED) {
console.log('close');
}
else {
console.log(e);
}
};
EventSource open a connection with '/stream.action',server side,and wait for events,not is a loop,is reactive.
Here Event Source MDN
When declare this object the connection is opened, then in server side,controller '/stream.action' create a new SseEmitter and add it to list.
When @EventListener receive a rabbit message, send the object via SseEmitter.send(),and parse the Bean into JSON,then we'll receive a JSON object in client side function 'onmessage'.
I used AngularJS 1.6 for client side and create a little application for show in a table all the customer messages.
Only with this steps we can write and read easily working with rabbitMQ.
Complete example have 2 queues for show how to configure multiple listeners and multiple writers.