From 3d9be6b14c9d6625d5470fdf727299d046f5523d Mon Sep 17 00:00:00 2001
From: Dean Chapman
Date: Tue, 23 Sep 2025 21:08:44 +0100
Subject: [PATCH 1/4] Remove snapshot
---
core/build.gradle | 2 +-
debezium/build.gradle | 2 +-
grpc/build.gradle | 2 +-
vertx/build.gradle | 2 +-
4 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/core/build.gradle b/core/build.gradle
index ed92d21..f8ebc25 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -9,7 +9,7 @@ compileJava {
}
group = 'com.p14n'
-version = '1.2.0-SNAPSHOT'
+version = '1.2.0'
repositories {
mavenCentral()
diff --git a/debezium/build.gradle b/debezium/build.gradle
index 0c0d708..ace4429 100644
--- a/debezium/build.gradle
+++ b/debezium/build.gradle
@@ -9,7 +9,7 @@ compileJava {
}
group = 'com.p14n'
-version = '1.2.0-SNAPSHOT'
+version = '1.2.0'
repositories {
mavenCentral()
diff --git a/grpc/build.gradle b/grpc/build.gradle
index c5fa77f..dd099c3 100644
--- a/grpc/build.gradle
+++ b/grpc/build.gradle
@@ -15,7 +15,7 @@ compileJava {
}
group = 'com.p14n'
-version = '1.2.0-SNAPSHOT'
+version = '1.2.0'
repositories {
mavenCentral()
diff --git a/vertx/build.gradle b/vertx/build.gradle
index 88b247a..27b07ed 100644
--- a/vertx/build.gradle
+++ b/vertx/build.gradle
@@ -9,7 +9,7 @@ compileJava {
}
group = 'com.p14n'
-version = '1.2.0-SNAPSHOT'
+version = '1.2.0'
repositories {
mavenCentral()
From a0b7010a0b59fa590e803ba077d46622d76ff2ee Mon Sep 17 00:00:00 2001
From: Dean Chapman
Date: Tue, 23 Sep 2025 21:20:45 +0100
Subject: [PATCH 2/4] Correct buildfile
---
vertx/build.gradle | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/vertx/build.gradle b/vertx/build.gradle
index 27b07ed..b7a6b10 100644
--- a/vertx/build.gradle
+++ b/vertx/build.gradle
@@ -1,6 +1,11 @@
+import com.vanniktech.maven.publish.SonatypeHost
+import com.vanniktech.maven.publish.JavaLibrary
+import com.vanniktech.maven.publish.JavadocJar
+
plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
+ id "com.vanniktech.maven.publish" version "0.31.0"
}
compileJava {
From 56e84440fb8a921aeb9c45b9850913cd8c97068b Mon Sep 17 00:00:00 2001
From: Dean Chapman
Date: Tue, 23 Sep 2025 21:39:31 +0100
Subject: [PATCH 3/4] Correct buildfile
---
vertx/build.gradle | 13 ++-----------
1 file changed, 2 insertions(+), 11 deletions(-)
diff --git a/vertx/build.gradle b/vertx/build.gradle
index b7a6b10..88d0777 100644
--- a/vertx/build.gradle
+++ b/vertx/build.gradle
@@ -1,11 +1,8 @@
-import com.vanniktech.maven.publish.SonatypeHost
-import com.vanniktech.maven.publish.JavaLibrary
-import com.vanniktech.maven.publish.JavadocJar
plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
- id "com.vanniktech.maven.publish" version "0.31.0"
+ id "com.vanniktech.maven.publish" version "0.34.0"
}
compileJava {
@@ -85,12 +82,6 @@ tasks.withType(Jar) {
}
mavenPublishing {
-
- configure(new JavaLibrary(new JavadocJar.Javadoc(), true))
-
- publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)
-
- signAllPublications()
coordinates("com.p14n", "postevent-vertx", version)
@@ -118,4 +109,4 @@ mavenPublishing {
url = 'https://github.com/p14n/postevent'
}
}
-}
\ No newline at end of file
+}
From 883821ee748e739d3e92eaa8bce9f89423a96230 Mon Sep 17 00:00:00 2001
From: Dean Chapman
Date: Tue, 23 Sep 2025 22:01:10 +0100
Subject: [PATCH 4/4] Javadoc and buildfile fixes
---
vertx/build.gradle | 15 +++++--
.../postevent/vertx/VertxConsumerServer.java | 41 +++++++++++++++----
.../vertx/VertxPersistentConsumer.java | 33 +++++++++++++--
.../vertx/adapter/EventBusCatchupService.java | 18 ++++----
.../vertx/adapter/EventBusMessageBroker.java | 9 ++--
.../postevent/vertx/codec/EventCodec.java | 19 ++++++---
6 files changed, 103 insertions(+), 32 deletions(-)
diff --git a/vertx/build.gradle b/vertx/build.gradle
index 88d0777..63e7b92 100644
--- a/vertx/build.gradle
+++ b/vertx/build.gradle
@@ -1,8 +1,12 @@
+import com.vanniktech.maven.publish.SonatypeHost
+import com.vanniktech.maven.publish.JavaLibrary
+import com.vanniktech.maven.publish.JavadocJar
+
plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
- id "com.vanniktech.maven.publish" version "0.34.0"
+ id "com.vanniktech.maven.publish" version "0.31.0"
}
compileJava {
@@ -83,11 +87,16 @@ tasks.withType(Jar) {
mavenPublishing {
+ configure(new JavaLibrary(new JavadocJar.Javadoc(), true))
+
+ publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)
+ signAllPublications()
+
coordinates("com.p14n", "postevent-vertx", version)
-
+
pom {
name = "Postevent Vert.x"
- description = 'A reliable event publishing and consumption system using PostgreSQL and vert.x'
+ description = 'A reliable event publishing and consumption system using PostgreSQL and Vert.x EventBus'
inceptionYear = "2025"
url = "https://github.com/p14n/postevent/"
licenses {
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
index 7740aac..53a64e5 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
@@ -15,28 +15,53 @@
import java.util.List;
import java.util.Set;
+/**
+ * Vert.x-based consumer server that provides event consumption capabilities
+ * using the EventBus for communication and coordination.
+ *
+ *
+ * This server sets up the necessary infrastructure for event processing
+ * including database setup, message brokers, and catchup services.
+ *
+ */
public class VertxConsumerServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class);
- private DataSource ds;
- //private ConfigData cfg;
+ private final DataSource ds;
private List closeables;
- private AsyncExecutor asyncExecutor;
+ private final AsyncExecutor asyncExecutor;
OpenTelemetry ot;
+ /**
+ * Creates a new VertxConsumerServer.
+ *
+ * @param ds The DataSource for database operations
+ * @param asyncExecutor The async executor for handling operations
+ * @param ot The OpenTelemetry instance for observability
+ */
public VertxConsumerServer(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.ds = ds;
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}
- public void start(EventBus eb, EventBusMessageBroker mb, Set topics) throws IOException, InterruptedException {
+ /**
+ * Starts the consumer server with the specified configuration.
+ *
+ * @param eb The Vert.x EventBus to use for communication
+ * @param mb The EventBus message broker for event handling
+ * @param topics The set of topics to handle
+ * @throws IOException If database setup fails
+ * @throws InterruptedException If the operation is interrupted
+ */
+ public void start(EventBus eb, EventBusMessageBroker mb, Set topics)
+ throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");
var db = new DatabaseSetup(ds);
db.setupServer(topics);
var catchupServer = new CatchupServer(ds);
- var catchupService = new EventBusCatchupService(catchupServer,eb,topics,this.asyncExecutor);
+ var catchupService = new EventBusCatchupService(catchupServer, eb, topics, this.asyncExecutor);
closeables = List.of(catchupService, mb, asyncExecutor);
System.out.println("🌐 Vert.x EventBus server started");
@@ -45,11 +70,11 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) thr
@Override
public void close() {
- if(closeables != null){
- for(var c : closeables){
+ if (closeables != null) {
+ for (var c : closeables) {
try {
c.close();
- } catch (Exception e){
+ } catch (Exception e) {
}
}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
index c228766..03e949f 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
@@ -20,6 +20,16 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
+/**
+ * Vert.x-based persistent consumer that provides transactional event processing
+ * capabilities using the EventBus for coordination.
+ *
+ *
+ * This consumer handles persistent event processing with transactional
+ * guarantees
+ * and integrates with the Vert.x EventBus for distributed coordination.
+ *
+ */
public class VertxPersistentConsumer implements AutoCloseable, MessageBroker {
private static final Logger logger = LoggerFactory.getLogger(VertxPersistentConsumer.class);
@@ -31,13 +41,28 @@ public class VertxPersistentConsumer implements AutoCloseable, MessageBroker topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) {
+ /**
+ * Starts the persistent consumer with the specified configuration.
+ *
+ * @param topics The set of topics to handle
+ * @param ds The DataSource for database operations
+ * @param eb The Vert.x EventBus for communication
+ * @param mb The EventBus message broker for event handling
+ */
+ public void start(Set topics, DataSource ds, EventBus eb, EventBusMessageBroker mb) {
logger.atInfo().log("Starting consumer client");
if (tb != null) {
@@ -54,7 +79,7 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage
var catchupClient = new EventBusCatchupClient(eb);
for (var topic : topics) {
- mb.subscribeToEventBus(topic,pb);
+ mb.subscribeToEventBus(topic, pb);
}
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
@@ -72,7 +97,6 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage
logger.atInfo().log("Consumer client started successfully");
-
} catch (Exception e) {
logger.atError()
.setCause(e)
@@ -92,7 +116,8 @@ public void close() {
for (AutoCloseable c : closeables) {
try {
- if(c != null) c.close();
+ if (c != null)
+ c.close();
} catch (Exception e) {
logger.atWarn()
.setCause(e)
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
index aa06861..79a7220 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
@@ -34,7 +34,6 @@
* {@code catchup.getLatestMessageId} - Get the latest message ID for a
* topic
*
- *
*
*
* Example usage:
@@ -53,7 +52,10 @@
public class EventBusCatchupService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class);
+ /** EventBus address prefix for fetch events requests. */
public static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events.";
+
+ /** EventBus address prefix for get latest message ID requests. */
public static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest.";
private final CatchupServerInterface catchupServer;
@@ -68,11 +70,13 @@ public class EventBusCatchupService implements AutoCloseable {
*
* @param catchupServer The underlying catchup server implementation
* @param eventBus The Vert.x EventBus to use for messaging
+ * @param topics The set of topics to handle catchup requests for
+ * @param executor The async executor for handling requests
*/
public EventBusCatchupService(CatchupServerInterface catchupServer,
- EventBus eventBus,
- Set topics,
- AsyncExecutor executor) {
+ EventBus eventBus,
+ Set topics,
+ AsyncExecutor executor) {
this.catchupServer = catchupServer;
this.eventBus = eventBus;
this.topics = topics;
@@ -85,7 +89,7 @@ public EventBusCatchupService(CatchupServerInterface catchupServer,
* requests.
*/
public void start() {
- if(fetchEventsConsumers == null) {
+ if (fetchEventsConsumers == null) {
logger.atInfo().log("Starting EventBusCatchupService");
@@ -168,7 +172,7 @@ private void handleFetchEvents(Message message) {
executor.submit(() -> {
- try{
+ try {
List events = catchupServer.fetchEvents(fromId, toId, limit, topic);
// Serialize events to JSON and reply
@@ -180,7 +184,7 @@ private void handleFetchEvents(Message message) {
.addArgument(topic)
.log("Successfully fetched {} events for topic {}", events.size(), topic);
- } catch (Exception e){
+ } catch (Exception e) {
logger.atError()
.setCause(e)
.log("Error handling fetchEvents request");
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
index f96e068..ba6c2ed 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
@@ -31,7 +31,6 @@
* Events are then published to the Vert.x EventBus for real-time
* distribution
*
- *
*
*
* Subscribers receive events from the EventBus, providing low-latency
@@ -187,8 +186,8 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe
});
// Store consumer for potential cleanup
- consumers.compute(topic, (k,l) -> {
- if(l == null){
+ consumers.compute(topic, (k, l) -> {
+ if (l == null) {
l = new ArrayList<>();
}
l.add(consumer);
@@ -209,7 +208,7 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe
public void unsubscribe(String topic) {
List> consumerList = consumers.remove(topic);
if (consumerList != null) {
- for(var consumer: consumerList){
+ for (var consumer : consumerList) {
consumer.unregister();
}
logger.atInfo()
@@ -226,7 +225,7 @@ public void close() {
logger.atInfo().log("Closing EventBusMessageBroker");
// Unregister all consumers
- consumers.values().forEach( l -> {
+ consumers.values().forEach(l -> {
l.forEach(MessageConsumer::unregister);
});
consumers.clear();
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
index 10271d9..c7e57fa 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
@@ -14,7 +14,8 @@
*
*
* The codec uses JSON serialization for simplicity and debugging ease.
- * Events are encoded as JSON strings with a length prefix for efficient parsing.
+ * Events are encoded as JSON strings with a length prefix for efficient
+ * parsing.
*
*
*
@@ -24,6 +25,14 @@
*/
public class EventCodec implements MessageCodec {
+ /**
+ * Creates a new EventCodec for serializing Event objects on the Vert.x
+ * EventBus.
+ */
+ public EventCodec() {
+ // Default constructor
+ }
+
/**
* Encodes an Event object to the wire format.
* The event is serialized to JSON and prefixed with its length.
@@ -35,7 +44,7 @@ public class EventCodec implements MessageCodec {
public void encodeToWire(Buffer buffer, Event event) {
String json = Json.encode(event);
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
-
+
// Write length prefix followed by JSON bytes
buffer.appendInt(jsonBytes.length);
buffer.appendBytes(jsonBytes);
@@ -53,11 +62,11 @@ public void encodeToWire(Buffer buffer, Event event) {
public Event decodeFromWire(int pos, Buffer buffer) {
// Read length prefix
int length = buffer.getInt(pos);
-
+
// Read JSON bytes and convert to string
byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length);
- String json = new String(jsonBytes,StandardCharsets.UTF_8);
-
+ String json = new String(jsonBytes, StandardCharsets.UTF_8);
+
// Deserialize from JSON
return Json.decodeValue(json, Event.class);
}