Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.2.0-SNAPSHOT'
version = '1.2.0'

repositories {
mavenCentral()
Expand Down
2 changes: 1 addition & 1 deletion debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.2.0-SNAPSHOT'
version = '1.2.0'

repositories {
mavenCentral()
Expand Down
2 changes: 1 addition & 1 deletion grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.2.0-SNAPSHOT'
version = '1.2.0'

repositories {
mavenCentral()
Expand Down
17 changes: 11 additions & 6 deletions vertx/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

import com.vanniktech.maven.publish.SonatypeHost
import com.vanniktech.maven.publish.JavaLibrary
import com.vanniktech.maven.publish.JavadocJar

plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
id "com.vanniktech.maven.publish" version "0.31.0"
}

compileJava {
Expand All @@ -9,7 +15,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.2.0-SNAPSHOT'
version = '1.2.0'

repositories {
mavenCentral()
Expand Down Expand Up @@ -80,18 +86,17 @@ tasks.withType(Jar) {
}

mavenPublishing {

configure(new JavaLibrary(new JavadocJar.Javadoc(), true))

publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)

signAllPublications()

coordinates("com.p14n", "postevent-vertx", version)

pom {
name = "Postevent Vert.x"
description = 'A reliable event publishing and consumption system using PostgreSQL and vert.x'
description = 'A reliable event publishing and consumption system using PostgreSQL and Vert.x EventBus'
inceptionYear = "2025"
url = "https://github.com/p14n/postevent/"
licenses {
Expand All @@ -113,4 +118,4 @@ mavenPublishing {
url = 'https://github.com/p14n/postevent'
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,53 @@
import java.util.List;
import java.util.Set;

/**
* Vert.x-based consumer server that provides event consumption capabilities
* using the EventBus for communication and coordination.
*
* <p>
* This server sets up the necessary infrastructure for event processing
* including database setup, message brokers, and catchup services.
* </p>
*/
public class VertxConsumerServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class);

private DataSource ds;
//private ConfigData cfg;
private final DataSource ds;
private List<AutoCloseable> closeables;
private AsyncExecutor asyncExecutor;
private final AsyncExecutor asyncExecutor;
OpenTelemetry ot;

/**
* Creates a new VertxConsumerServer.
*
* @param ds The DataSource for database operations
* @param asyncExecutor The async executor for handling operations
* @param ot The OpenTelemetry instance for observability
*/
public VertxConsumerServer(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.ds = ds;
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}

public void start(EventBus eb, EventBusMessageBroker mb, Set<String> topics) throws IOException, InterruptedException {
/**
* Starts the consumer server with the specified configuration.
*
* @param eb The Vert.x EventBus to use for communication
* @param mb The EventBus message broker for event handling
* @param topics The set of topics to handle
* @throws IOException If database setup fails
* @throws InterruptedException If the operation is interrupted
*/
public void start(EventBus eb, EventBusMessageBroker mb, Set<String> topics)
throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");

var db = new DatabaseSetup(ds);
db.setupServer(topics);
var catchupServer = new CatchupServer(ds);
var catchupService = new EventBusCatchupService(catchupServer,eb,topics,this.asyncExecutor);
var catchupService = new EventBusCatchupService(catchupServer, eb, topics, this.asyncExecutor);

closeables = List.of(catchupService, mb, asyncExecutor);
System.out.println("🌐 Vert.x EventBus server started");
Expand All @@ -45,11 +70,11 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set<String> topics) thr

@Override
public void close() {
if(closeables != null){
for(var c : closeables){
if (closeables != null) {
for (var c : closeables) {
try {
c.close();
} catch (Exception e){
} catch (Exception e) {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* Vert.x-based persistent consumer that provides transactional event processing
* capabilities using the EventBus for coordination.
*
* <p>
* This consumer handles persistent event processing with transactional
* guarantees
* and integrates with the Vert.x EventBus for distributed coordination.
* </p>
*/
public class VertxPersistentConsumer implements AutoCloseable, MessageBroker<TransactionalEvent, TransactionalEvent> {

private static final Logger logger = LoggerFactory.getLogger(VertxPersistentConsumer.class);
Expand All @@ -31,13 +41,28 @@ public class VertxPersistentConsumer implements AutoCloseable, MessageBroker<Tra
OpenTelemetry ot;
private final int batchSize;

/**
* Creates a new VertxPersistentConsumer.
*
* @param ot The OpenTelemetry instance for observability
* @param asyncExecutor The async executor for handling operations
* @param batchSize The batch size for processing events
*/
public VertxPersistentConsumer(OpenTelemetry ot, AsyncExecutor asyncExecutor, int batchSize) {
this.asyncExecutor = asyncExecutor;
this.ot = ot;
this.batchSize = batchSize;
}

public void start(Set<String> topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) {
/**
* Starts the persistent consumer with the specified configuration.
*
* @param topics The set of topics to handle
* @param ds The DataSource for database operations
* @param eb The Vert.x EventBus for communication
* @param mb The EventBus message broker for event handling
*/
public void start(Set<String> topics, DataSource ds, EventBus eb, EventBusMessageBroker mb) {
logger.atInfo().log("Starting consumer client");

if (tb != null) {
Expand All @@ -54,7 +79,7 @@ public void start(Set<String> topics, DataSource ds,EventBus eb, EventBusMessage
var catchupClient = new EventBusCatchupClient(eb);

for (var topic : topics) {
mb.subscribeToEventBus(topic,pb);
mb.subscribeToEventBus(topic, pb);
}
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
Expand All @@ -72,7 +97,6 @@ public void start(Set<String> topics, DataSource ds,EventBus eb, EventBusMessage

logger.atInfo().log("Consumer client started successfully");


} catch (Exception e) {
logger.atError()
.setCause(e)
Expand All @@ -92,7 +116,8 @@ public void close() {

for (AutoCloseable c : closeables) {
try {
if(c != null) c.close();
if (c != null)
c.close();
} catch (Exception e) {
logger.atWarn()
.setCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
* <li>{@code catchup.getLatestMessageId} - Get the latest message ID for a
* topic</li>
* </ul>
* </p>
*
* <p>
* Example usage:
Expand All @@ -53,7 +52,10 @@
public class EventBusCatchupService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class);

/** EventBus address prefix for fetch events requests. */
public static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events.";

/** EventBus address prefix for get latest message ID requests. */
public static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest.";

private final CatchupServerInterface catchupServer;
Expand All @@ -68,11 +70,13 @@ public class EventBusCatchupService implements AutoCloseable {
*
* @param catchupServer The underlying catchup server implementation
* @param eventBus The Vert.x EventBus to use for messaging
* @param topics The set of topics to handle catchup requests for
* @param executor The async executor for handling requests
*/
public EventBusCatchupService(CatchupServerInterface catchupServer,
EventBus eventBus,
Set<String> topics,
AsyncExecutor executor) {
EventBus eventBus,
Set<String> topics,
AsyncExecutor executor) {
this.catchupServer = catchupServer;
this.eventBus = eventBus;
this.topics = topics;
Expand All @@ -85,7 +89,7 @@ public EventBusCatchupService(CatchupServerInterface catchupServer,
* requests.
*/
public void start() {
if(fetchEventsConsumers == null) {
if (fetchEventsConsumers == null) {

logger.atInfo().log("Starting EventBusCatchupService");

Expand Down Expand Up @@ -168,7 +172,7 @@ private void handleFetchEvents(Message<JsonObject> message) {

executor.submit(() -> {

try{
try {
List<Event> events = catchupServer.fetchEvents(fromId, toId, limit, topic);

// Serialize events to JSON and reply
Expand All @@ -180,7 +184,7 @@ private void handleFetchEvents(Message<JsonObject> message) {
.addArgument(topic)
.log("Successfully fetched {} events for topic {}", events.size(), topic);

} catch (Exception e){
} catch (Exception e) {
logger.atError()
.setCause(e)
.log("Error handling fetchEvents request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* <li>Events are then published to the Vert.x EventBus for real-time
* distribution</li>
* </ol>
* </p>
*
* <p>
* Subscribers receive events from the EventBus, providing low-latency
Expand Down Expand Up @@ -187,8 +186,8 @@ public void subscribeToEventBus(String topic, MessageSubscriber<Event> subscribe
});

// Store consumer for potential cleanup
consumers.compute(topic, (k,l) -> {
if(l == null){
consumers.compute(topic, (k, l) -> {
if (l == null) {
l = new ArrayList<>();
}
l.add(consumer);
Expand All @@ -209,7 +208,7 @@ public void subscribeToEventBus(String topic, MessageSubscriber<Event> subscribe
public void unsubscribe(String topic) {
List<MessageConsumer<Event>> consumerList = consumers.remove(topic);
if (consumerList != null) {
for(var consumer: consumerList){
for (var consumer : consumerList) {
consumer.unregister();
}
logger.atInfo()
Expand All @@ -226,7 +225,7 @@ public void close() {
logger.atInfo().log("Closing EventBusMessageBroker");

// Unregister all consumers
consumers.values().forEach( l -> {
consumers.values().forEach(l -> {
l.forEach(MessageConsumer::unregister);
});
consumers.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
*
* <p>
* The codec uses JSON serialization for simplicity and debugging ease.
* Events are encoded as JSON strings with a length prefix for efficient parsing.
* Events are encoded as JSON strings with a length prefix for efficient
* parsing.
* </p>
*
* <p>
Expand All @@ -24,6 +25,14 @@
*/
public class EventCodec implements MessageCodec<Event, Event> {

/**
* Creates a new EventCodec for serializing Event objects on the Vert.x
* EventBus.
*/
public EventCodec() {
// Default constructor
}

/**
* Encodes an Event object to the wire format.
* The event is serialized to JSON and prefixed with its length.
Expand All @@ -35,7 +44,7 @@ public class EventCodec implements MessageCodec<Event, Event> {
public void encodeToWire(Buffer buffer, Event event) {
String json = Json.encode(event);
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

// Write length prefix followed by JSON bytes
buffer.appendInt(jsonBytes.length);
buffer.appendBytes(jsonBytes);
Expand All @@ -53,11 +62,11 @@ public void encodeToWire(Buffer buffer, Event event) {
public Event decodeFromWire(int pos, Buffer buffer) {
// Read length prefix
int length = buffer.getInt(pos);

// Read JSON bytes and convert to string
byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length);
String json = new String(jsonBytes,StandardCharsets.UTF_8);
String json = new String(jsonBytes, StandardCharsets.UTF_8);

// Deserialize from JSON
return Json.decodeValue(json, Event.class);
}
Expand Down
Loading