diff --git a/core/build.gradle b/core/build.gradle index ed92d21..f8ebc25 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -9,7 +9,7 @@ compileJava { } group = 'com.p14n' -version = '1.2.0-SNAPSHOT' +version = '1.2.0' repositories { mavenCentral() diff --git a/debezium/build.gradle b/debezium/build.gradle index 0c0d708..ace4429 100644 --- a/debezium/build.gradle +++ b/debezium/build.gradle @@ -9,7 +9,7 @@ compileJava { } group = 'com.p14n' -version = '1.2.0-SNAPSHOT' +version = '1.2.0' repositories { mavenCentral() diff --git a/grpc/build.gradle b/grpc/build.gradle index c5fa77f..dd099c3 100644 --- a/grpc/build.gradle +++ b/grpc/build.gradle @@ -15,7 +15,7 @@ compileJava { } group = 'com.p14n' -version = '1.2.0-SNAPSHOT' +version = '1.2.0' repositories { mavenCentral() diff --git a/vertx/build.gradle b/vertx/build.gradle index 88b247a..63e7b92 100644 --- a/vertx/build.gradle +++ b/vertx/build.gradle @@ -1,6 +1,12 @@ + +import com.vanniktech.maven.publish.SonatypeHost +import com.vanniktech.maven.publish.JavaLibrary +import com.vanniktech.maven.publish.JavadocJar + plugins { id 'java' id 'com.adarshr.test-logger' version '4.0.0' + id "com.vanniktech.maven.publish" version "0.31.0" } compileJava { @@ -9,7 +15,7 @@ compileJava { } group = 'com.p14n' -version = '1.2.0-SNAPSHOT' +version = '1.2.0' repositories { mavenCentral() @@ -80,18 +86,17 @@ tasks.withType(Jar) { } mavenPublishing { - + configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true) - signAllPublications() coordinates("com.p14n", "postevent-vertx", version) - + pom { name = "Postevent Vert.x" - description = 'A reliable event publishing and consumption system using PostgreSQL and vert.x' + description = 'A reliable event publishing and consumption system using PostgreSQL and Vert.x EventBus' inceptionYear = "2025" url = "https://github.com/p14n/postevent/" licenses { @@ -113,4 +118,4 @@ mavenPublishing { url = 'https://github.com/p14n/postevent' } } -} \ No newline at end of file +} diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java index 7740aac..53a64e5 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java @@ -15,28 +15,53 @@ import java.util.List; import java.util.Set; +/** + * Vert.x-based consumer server that provides event consumption capabilities + * using the EventBus for communication and coordination. + * + *

+ * This server sets up the necessary infrastructure for event processing + * including database setup, message brokers, and catchup services. + *

+ */ public class VertxConsumerServer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class); - private DataSource ds; - //private ConfigData cfg; + private final DataSource ds; private List closeables; - private AsyncExecutor asyncExecutor; + private final AsyncExecutor asyncExecutor; OpenTelemetry ot; + /** + * Creates a new VertxConsumerServer. + * + * @param ds The DataSource for database operations + * @param asyncExecutor The async executor for handling operations + * @param ot The OpenTelemetry instance for observability + */ public VertxConsumerServer(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot) { this.ds = ds; this.asyncExecutor = asyncExecutor; this.ot = ot; } - public void start(EventBus eb, EventBusMessageBroker mb, Set topics) throws IOException, InterruptedException { + /** + * Starts the consumer server with the specified configuration. + * + * @param eb The Vert.x EventBus to use for communication + * @param mb The EventBus message broker for event handling + * @param topics The set of topics to handle + * @throws IOException If database setup fails + * @throws InterruptedException If the operation is interrupted + */ + public void start(EventBus eb, EventBusMessageBroker mb, Set topics) + throws IOException, InterruptedException { logger.atInfo().log("Starting consumer server"); var db = new DatabaseSetup(ds); db.setupServer(topics); var catchupServer = new CatchupServer(ds); - var catchupService = new EventBusCatchupService(catchupServer,eb,topics,this.asyncExecutor); + var catchupService = new EventBusCatchupService(catchupServer, eb, topics, this.asyncExecutor); closeables = List.of(catchupService, mb, asyncExecutor); System.out.println("🌐 Vert.x EventBus server started"); @@ -45,11 +70,11 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) thr @Override public void close() { - if(closeables != null){ - for(var c : closeables){ + if (closeables != null) { + for (var c : closeables) { try { c.close(); - } catch (Exception e){ + } catch (Exception e) { } } diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java index c228766..03e949f 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java @@ -20,6 +20,16 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +/** + * Vert.x-based persistent consumer that provides transactional event processing + * capabilities using the EventBus for coordination. + * + *

+ * This consumer handles persistent event processing with transactional + * guarantees + * and integrates with the Vert.x EventBus for distributed coordination. + *

+ */ public class VertxPersistentConsumer implements AutoCloseable, MessageBroker { private static final Logger logger = LoggerFactory.getLogger(VertxPersistentConsumer.class); @@ -31,13 +41,28 @@ public class VertxPersistentConsumer implements AutoCloseable, MessageBroker topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) { + /** + * Starts the persistent consumer with the specified configuration. + * + * @param topics The set of topics to handle + * @param ds The DataSource for database operations + * @param eb The Vert.x EventBus for communication + * @param mb The EventBus message broker for event handling + */ + public void start(Set topics, DataSource ds, EventBus eb, EventBusMessageBroker mb) { logger.atInfo().log("Starting consumer client"); if (tb != null) { @@ -54,7 +79,7 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage var catchupClient = new EventBusCatchupClient(eb); for (var topic : topics) { - mb.subscribeToEventBus(topic,pb); + mb.subscribeToEventBus(topic, pb); } seb.subscribe(new CatchupService(ds, catchupClient, seb)); seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize)); @@ -72,7 +97,6 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage logger.atInfo().log("Consumer client started successfully"); - } catch (Exception e) { logger.atError() .setCause(e) @@ -92,7 +116,8 @@ public void close() { for (AutoCloseable c : closeables) { try { - if(c != null) c.close(); + if (c != null) + c.close(); } catch (Exception e) { logger.atWarn() .setCause(e) diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java index aa06861..79a7220 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java @@ -34,7 +34,6 @@ *
  • {@code catchup.getLatestMessageId} - Get the latest message ID for a * topic
  • * - *

    * *

    * Example usage: @@ -53,7 +52,10 @@ public class EventBusCatchupService implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class); + /** EventBus address prefix for fetch events requests. */ public static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events."; + + /** EventBus address prefix for get latest message ID requests. */ public static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest."; private final CatchupServerInterface catchupServer; @@ -68,11 +70,13 @@ public class EventBusCatchupService implements AutoCloseable { * * @param catchupServer The underlying catchup server implementation * @param eventBus The Vert.x EventBus to use for messaging + * @param topics The set of topics to handle catchup requests for + * @param executor The async executor for handling requests */ public EventBusCatchupService(CatchupServerInterface catchupServer, - EventBus eventBus, - Set topics, - AsyncExecutor executor) { + EventBus eventBus, + Set topics, + AsyncExecutor executor) { this.catchupServer = catchupServer; this.eventBus = eventBus; this.topics = topics; @@ -85,7 +89,7 @@ public EventBusCatchupService(CatchupServerInterface catchupServer, * requests. */ public void start() { - if(fetchEventsConsumers == null) { + if (fetchEventsConsumers == null) { logger.atInfo().log("Starting EventBusCatchupService"); @@ -168,7 +172,7 @@ private void handleFetchEvents(Message message) { executor.submit(() -> { - try{ + try { List events = catchupServer.fetchEvents(fromId, toId, limit, topic); // Serialize events to JSON and reply @@ -180,7 +184,7 @@ private void handleFetchEvents(Message message) { .addArgument(topic) .log("Successfully fetched {} events for topic {}", events.size(), topic); - } catch (Exception e){ + } catch (Exception e) { logger.atError() .setCause(e) .log("Error handling fetchEvents request"); diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java index f96e068..ba6c2ed 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java @@ -31,7 +31,6 @@ *

  • Events are then published to the Vert.x EventBus for real-time * distribution
  • * - *

    * *

    * Subscribers receive events from the EventBus, providing low-latency @@ -187,8 +186,8 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe }); // Store consumer for potential cleanup - consumers.compute(topic, (k,l) -> { - if(l == null){ + consumers.compute(topic, (k, l) -> { + if (l == null) { l = new ArrayList<>(); } l.add(consumer); @@ -209,7 +208,7 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe public void unsubscribe(String topic) { List> consumerList = consumers.remove(topic); if (consumerList != null) { - for(var consumer: consumerList){ + for (var consumer : consumerList) { consumer.unregister(); } logger.atInfo() @@ -226,7 +225,7 @@ public void close() { logger.atInfo().log("Closing EventBusMessageBroker"); // Unregister all consumers - consumers.values().forEach( l -> { + consumers.values().forEach(l -> { l.forEach(MessageConsumer::unregister); }); consumers.clear(); diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java index 10271d9..c7e57fa 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java @@ -14,7 +14,8 @@ * *

    * The codec uses JSON serialization for simplicity and debugging ease. - * Events are encoded as JSON strings with a length prefix for efficient parsing. + * Events are encoded as JSON strings with a length prefix for efficient + * parsing. *

    * *

    @@ -24,6 +25,14 @@ */ public class EventCodec implements MessageCodec { + /** + * Creates a new EventCodec for serializing Event objects on the Vert.x + * EventBus. + */ + public EventCodec() { + // Default constructor + } + /** * Encodes an Event object to the wire format. * The event is serialized to JSON and prefixed with its length. @@ -35,7 +44,7 @@ public class EventCodec implements MessageCodec { public void encodeToWire(Buffer buffer, Event event) { String json = Json.encode(event); byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8); - + // Write length prefix followed by JSON bytes buffer.appendInt(jsonBytes.length); buffer.appendBytes(jsonBytes); @@ -53,11 +62,11 @@ public void encodeToWire(Buffer buffer, Event event) { public Event decodeFromWire(int pos, Buffer buffer) { // Read length prefix int length = buffer.getInt(pos); - + // Read JSON bytes and convert to string byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length); - String json = new String(jsonBytes,StandardCharsets.UTF_8); - + String json = new String(jsonBytes, StandardCharsets.UTF_8); + // Deserialize from JSON return Json.decodeValue(json, Event.class); }