From ed154ecf91af81401c74a00f61b1a36ee5b92742 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sat, 25 Oct 2025 20:41:12 +0100 Subject: [PATCH] Add method for transactional publishing --- .../vertx/adapter/EventBusMessageBroker.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java index ba6c2ed..6f83631 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java @@ -4,6 +4,7 @@ import com.p14n.postevent.broker.AsyncExecutor; import com.p14n.postevent.broker.EventMessageBroker; import com.p14n.postevent.broker.MessageSubscriber; +import com.p14n.postevent.broker.TransactionalEvent; import com.p14n.postevent.vertx.codec.EventCodec; import com.p14n.postevent.data.Event; import io.opentelemetry.api.OpenTelemetry; @@ -140,6 +141,38 @@ public void publish(String topic, Event event) { throw new RuntimeException("Failed to publish event", e); } } + public void publish(String topic, TransactionalEvent event) { + + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Publishing event to topic {} with id {}"); + + try { + + Publisher.publish(event.event(), event.connection(), topic); + + // Then, publish to EventBus for real-time distribution + String eventBusAddress = "events." + topic; + eventBus.publish(eventBusAddress, event); + + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Successfully published event to topic {} with id {}"); + + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .addArgument(event.id()) + .setCause(e) + .log("Failed to publish event to topic {} with id {}"); + + } + + // First, persist to database using existing Publisher + + } /** * Subscribes to events on a specific topic via the EventBus.