diff --git a/core/build.gradle b/core/build.gradle index 46a9e08..4df188a 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -9,7 +9,7 @@ compileJava { } group = 'com.p14n' -version = '1.3.3-SNAPSHOT' +version = '1.3.4-SNAPSHOT' repositories { mavenCentral() diff --git a/core/src/main/java/com/p14n/postevent/Publisher.java b/core/src/main/java/com/p14n/postevent/Publisher.java index d00acd0..a886c6e 100644 --- a/core/src/main/java/com/p14n/postevent/Publisher.java +++ b/core/src/main/java/com/p14n/postevent/Publisher.java @@ -7,6 +7,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import static com.p14n.postevent.db.SQL.setEventOnStatement; @@ -66,7 +67,7 @@ private Publisher() { * @throws IllegalArgumentException if the topic is null, empty, or contains * invalid characters */ - public static void publish(Event event, Connection connection, String topic) throws SQLException { + public static Long publish(Event event, Connection connection, String topic) throws SQLException { if (topic == null || topic.trim().isEmpty()) { throw new IllegalArgumentException("Topic name cannot be null or empty"); } @@ -74,13 +75,18 @@ public static void publish(Event event, Connection connection, String topic) thr throw new IllegalArgumentException("Topic name must contain only lowercase letters and underscores"); } - String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s)", + String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s) RETURNING idn", topic, SQL.CORE_COLS, SQL.CORE_PH); - try (PreparedStatement stmt = connection.prepareStatement(sql)) { + try (PreparedStatement stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { setEventOnStatement(stmt, event); stmt.executeUpdate(); + var rs = stmt.getGeneratedKeys(); + if(rs.next()){ + return (Long)rs.getObject(1); + } } + return null; } /** @@ -94,13 +100,13 @@ public static void publish(Event event, Connection connection, String topic) thr * @throws IllegalArgumentException if the topic is null, empty, or contains * invalid characters */ - public static void publish(Event event, DataSource ds, String topic) throws SQLException { + public static Long publish(Event event, DataSource ds, String topic) throws SQLException { if (topic == null || topic.trim().isEmpty() || !topic.matches("[a-z_]+")) { throw new IllegalArgumentException("Invalid topic name: must be non-null, non-empty, and only contain lowercase letters and underscores."); } try (Connection c = ds.getConnection()) { - publish(event, c, topic); + return publish(event, c, topic); } } } diff --git a/core/src/main/java/com/p14n/postevent/data/Event.java b/core/src/main/java/com/p14n/postevent/data/Event.java index 89c00e8..8191032 100644 --- a/core/src/main/java/com/p14n/postevent/data/Event.java +++ b/core/src/main/java/com/p14n/postevent/data/Event.java @@ -111,4 +111,7 @@ public static Event create(String id, String source, String type, String datacon return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent); } + public Event withIdn(Long idn){ + return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent); + } } diff --git a/debezium/build.gradle b/debezium/build.gradle index ec747fd..b7a0353 100644 --- a/debezium/build.gradle +++ b/debezium/build.gradle @@ -9,7 +9,7 @@ compileJava { } group = 'com.p14n' -version = '1.3.3-SNAPSHOT' +version = '1.3.4-SNAPSHOT' repositories { mavenCentral() diff --git a/grpc/build.gradle b/grpc/build.gradle index 2a0aaef..c50b7ee 100644 --- a/grpc/build.gradle +++ b/grpc/build.gradle @@ -15,7 +15,7 @@ compileJava { } group = 'com.p14n' -version = '1.3.3-SNAPSHOT' +version = '1.3.4-SNAPSHOT' repositories { mavenCentral() @@ -32,7 +32,7 @@ dependencies { implementation 'io.grpc:grpc-stub:1.53.0' implementation 'io.grpc:grpc-api:1.53.0' - implementation 'javax.annotation:javax.annotation-api:1.3.3-SNAPSHOT' + implementation 'javax.annotation:javax.annotation-api:1.3.2' // For code generation implementation 'com.google.protobuf:protobuf-java:3.21.7' diff --git a/vertx/build.gradle b/vertx/build.gradle index 8ab2f20..4c14730 100644 --- a/vertx/build.gradle +++ b/vertx/build.gradle @@ -15,7 +15,7 @@ compileJava { } group = 'com.p14n' -version = '1.3.3-SNAPSHOT' +version = '1.3.4-SNAPSHOT' repositories { diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java index d2bb5b9..66b8ab5 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java @@ -108,11 +108,12 @@ public void publish(String topic, Event event) { executor.submit(() -> { try { - Publisher.publish(event, dataSource, topic); + Long idn = Publisher.publish(event, dataSource, topic); // Then, publish to EventBus for real-time distribution String eventBusAddress = "events." + topic; - eventBus.publish(eventBusAddress, event); + + eventBus.publish(eventBusAddress, event.withIdn(idn)); logger.atDebug() .addArgument(topic) @@ -150,11 +151,11 @@ public void publish(String topic, TransactionalEvent event) { try { - Publisher.publish(event.event(), event.connection(), topic); + Long idn = Publisher.publish(event.event(), event.connection(), topic); // Then, publish to EventBus for real-time distribution String eventBusAddress = "events." + topic; - eventBus.publish(eventBusAddress, event.event()); + eventBus.publish(eventBusAddress, event.event().withIdn(idn)); logger.atDebug() .addArgument(topic) diff --git a/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java index 830754b..18f7af9 100644 --- a/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java +++ b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java @@ -43,7 +43,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException "text", null, "test", - "hello".getBytes(), Instant.now(),1L ,"order",null)); + "hello".getBytes(), Instant.now(),null ,"order",null)); client.subscribe("order", message -> { System.out.println("Got message"); @@ -56,7 +56,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException "text", null, "test", - "hello".getBytes(), Instant.now(),2L ,"order",null)); + "hello".getBytes(), Instant.now(),null ,"order",null)); latch.await(10, TimeUnit.SECONDS);