From d2fca2eb28b63fba7307219982e02db7041ea15d Mon Sep 17 00:00:00 2001 From: alexanderkurash Date: Thu, 26 Feb 2026 16:54:56 +0200 Subject: [PATCH 1/2] CIRCSTORE-639 Flush and close Kafka producer --- src/main/java/org/folio/service/event/DomainEventPublisher.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/folio/service/event/DomainEventPublisher.java b/src/main/java/org/folio/service/event/DomainEventPublisher.java index 2af900a6..60317c4e 100644 --- a/src/main/java/org/folio/service/event/DomainEventPublisher.java +++ b/src/main/java/org/folio/service/event/DomainEventPublisher.java @@ -58,6 +58,8 @@ public Future publish(K key, DomainEvent event, Map oka .onSuccess(r -> log.info("publish:: Succeeded sending domain event with key [{}], " + "kafka record [{}]", key, producerRecord)) .mapEmpty() + .eventually(producer::flush) + .eventually(producer::close) .onFailure(cause -> { log.error("publish:: Unable to send domain event with key [{}], kafka record [{}]", key, producerRecord, cause); From e9f7b2008e9589e500300a66d1cd1fee8970d689 Mon Sep 17 00:00:00 2001 From: alexanderkurash Date: Fri, 27 Feb 2026 15:29:51 +0200 Subject: [PATCH 2/2] CIRCSTORE-639 Make publishing async --- .../service/event/DomainEventPublisher.java | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/folio/service/event/DomainEventPublisher.java b/src/main/java/org/folio/service/event/DomainEventPublisher.java index 60317c4e..b0641fb8 100644 --- a/src/main/java/org/folio/service/event/DomainEventPublisher.java +++ b/src/main/java/org/folio/service/event/DomainEventPublisher.java @@ -51,20 +51,32 @@ public Future publish(K key, DomainEvent event, Map oka .build(); log.info("publish:: kafkaRecord = [{}]", producerRecord); - KafkaProducer producer = getOrCreateProducer(); - log.info("publish:: Producer created, sending the record..."); - - return producer.send(producerRecord) - .onSuccess(r -> log.info("publish:: Succeeded sending domain event with key [{}], " + - "kafka record [{}]", key, producerRecord)) - .mapEmpty() - .eventually(producer::flush) - .eventually(producer::close) - .onFailure(cause -> { - log.error("publish:: Unable to send domain event with key [{}], kafka record [{}]", - key, producerRecord, cause); - failureHandler.handle(cause, producerRecord); - }); + KafkaProducer producer = null; + try { + producer = getOrCreateProducer(); + log.info("publish:: Producer created, sending the record..."); + + producer.send(producerRecord) + .onSuccess(r -> log.info("publish:: Succeeded sending domain event with key [{}], " + + "kafka record [{}]", key, producerRecord)) + .onFailure(cause -> { + log.error("publish:: Unable to send domain event with key [{}], kafka record [{}]", + key, producerRecord, cause); + failureHandler.handle(cause, producerRecord); + }) + .eventually(producer::flush) + .eventually(producer::close); + } catch (Exception e) { + log.error("publish:: Failed to initiate send for domain event with key [{}], kafka record [{}]", + key, producerRecord, e); + if (producer != null) { + log.info("publish:: Producer is not null, trying to close. Event key: {}.", key); + producer.close(); + } + failureHandler.handle(e, producerRecord); + } + + return Future.succeededFuture(); } private KafkaProducer getOrCreateProducer() {