Para realizar el EOF del sistema, decidimos implementar una lógica de conteo utilizando un exchange direct CounterExchange usando el clientId en conjunto con el tipo de worker (Filter, Reducer, Joiner, etc) como routing key. La idea de este exchange es que tanto el Gateway como los workers Filter envíen mensajes (con el clientId correspondiente) indicando la cantidad de batches que fueron enviados a la siguiente etapa de procesamiento por cada batch recibido del worker anterior (en el caso del Gateway, los batches los recibirá del cliente).
Inicialmente, el Gateway debe indicarle cuántos batches recibió del cliente y, acto seguido, envió al Filter. Posteriormente, los Filter tienen que indicar cuántos de esos batches que recibieron fueron enviados a la siguiente etapa de procesamiento (el Group By en todas las tasks, menos en la 1, donde el Filter manda directamente la data al Gateway), ya que estos workers podrían filtrar esos batches y modificar la cantidad enviada. De acá en adelante, ninguno de los workers restantes va a modificar la cantidad de batches que se envían a lo largo del pipeline de procesamiento. Por lo tanto, la cantidad de batches que sale de los Filter será la cantidad que recibirá el Aggregator (o el Gateway en la Task 1). Es por esto que nos alcanza con controlar la cantidad de batches que se reciben y se envían solo en el Gateway y en el Filter.
Entonces, el Gateway comienza a enviar a la FilterQueue los batches de un cliente específico. Luego, por cada batch recibido, el Filter Worker enviará un MessageCounter a la CounterExchange usando la routing key clientId@filter, indicando desde que stage llega el mensaje (Filter), a que stage se lo envía (None para la task 1 y Group By para las task 2 a 4) y cuántos batches fueron enviados al siguiente worker a partir del batch procesado. Así, como el Controller sabe cuantos batches envió inicialmente el Gateway al Filter para ese cliente, puede llevar un conteo de cuántos mensajes debe recibir del Filter desde el CounterExchange para saber cuando el procesamiento de esa etapa fue terminado.
Una vez que la cantidad de batches enviados por el Gateway coincide con la cantidad de batches ya procesados por los workers Filter, el Controller sabe que, a partir de ese momento, la cantidad de batches no se modificará. Por lo tanto, el Controller envía un mensaje al finishExchange con la routing key aggregator, indicando la cantidad de batches que deben esperar los Aggregator antes de considerar como finalizado a ese cliente y empezar a mandarle el batch procesado al Joiner.
Esto permite que el Controller pueda llevar un conteo exacto de los batches enviados y recibidos por cada cliente en cada worker stage, pudiendo identificar el momento exacto en el que la cantidad de batches para un cliente no se modificará. Esto nos permite tener un seguimiento determinístico del flujo de mensajes por cliente, pudiendo así gestionar adecuadamente el EOF del sistema y notificarle al Aggregator (mediante el finishExchange) la cantidad de mensajes que debe esperar para ese cliente y para poder comenzar a mandarle su data procesada hacia el Joiner y posteriormente hacia el Gateway.
Finalmente, una vez que el Gateway termina de recibir toda la data procesada del Joiner, envía otro mensaje al Controller indicándole que el procesamiento para ese cliente finalizó, y que puede notificarle a todos los workers para que borren toda la data que tenían almacenada para ese cliente.