diff --git a/src/main/java/org/folio/service/event/DomainEventPublisher.java b/src/main/java/org/folio/service/event/DomainEventPublisher.java index 2af900a6..b0641fb8 100644 --- a/src/main/java/org/folio/service/event/DomainEventPublisher.java +++ b/src/main/java/org/folio/service/event/DomainEventPublisher.java @@ -51,18 +51,32 @@ public Future publish(K key, DomainEvent event, Map oka .build(); log.info("publish:: kafkaRecord = [{}]", producerRecord); - KafkaProducer producer = getOrCreateProducer(); - log.info("publish:: Producer created, sending the record..."); - - return producer.send(producerRecord) - .onSuccess(r -> log.info("publish:: Succeeded sending domain event with key [{}], " + - "kafka record [{}]", key, producerRecord)) - .mapEmpty() - .onFailure(cause -> { - log.error("publish:: Unable to send domain event with key [{}], kafka record [{}]", - key, producerRecord, cause); - failureHandler.handle(cause, producerRecord); - }); + KafkaProducer producer = null; + try { + producer = getOrCreateProducer(); + log.info("publish:: Producer created, sending the record..."); + + producer.send(producerRecord) + .onSuccess(r -> log.info("publish:: Succeeded sending domain event with key [{}], " + + "kafka record [{}]", key, producerRecord)) + .onFailure(cause -> { + log.error("publish:: Unable to send domain event with key [{}], kafka record [{}]", + key, producerRecord, cause); + failureHandler.handle(cause, producerRecord); + }) + .eventually(producer::flush) + .eventually(producer::close); + } catch (Exception e) { + log.error("publish:: Failed to initiate send for domain event with key [{}], kafka record [{}]", + key, producerRecord, e); + if (producer != null) { + log.info("publish:: Producer is not null, trying to close. Event key: {}.", key); + producer.close(); + } + failureHandler.handle(e, producerRecord); + } + + return Future.succeededFuture(); } private KafkaProducer getOrCreateProducer() {