Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.3.3-SNAPSHOT'
version = '1.3.4-SNAPSHOT'

repositories {
mavenCentral()
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/java/com/p14n/postevent/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;

import static com.p14n.postevent.db.SQL.setEventOnStatement;

Expand Down Expand Up @@ -66,21 +67,26 @@ private Publisher() {
* @throws IllegalArgumentException if the topic is null, empty, or contains
* invalid characters
*/
public static void publish(Event event, Connection connection, String topic) throws SQLException {
public static Long publish(Event event, Connection connection, String topic) throws SQLException {
if (topic == null || topic.trim().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Topic validation logic is inconsistent between overloads.

Unify topic validation logic across both overloads to ensure consistent behavior.

Suggested implementation:

    public static Long publish(Event event, Connection connection, String topic) throws SQLException {
        validateTopic(topic);

        String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s) RETURNING idn",
                topic, SQL.CORE_COLS, SQL.CORE_PH);

        try (PreparedStatement stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
            setEventOnStatement(stmt, event);
    private static void validateTopic(String topic) {
        if (topic == null || topic.trim().isEmpty()) {
            throw new IllegalArgumentException("Topic name cannot be null or empty");
        }
        if (!topic.matches("^[a-z_]+$")) {
            throw new IllegalArgumentException("Topic name must contain only lowercase letters and underscores");
        }
    }

You must also update any other overloads of publish in this file to use validateTopic(topic); instead of their own topic validation logic.

throw new IllegalArgumentException("Topic name cannot be null or empty");
}
if (!topic.matches("^[a-z_]+$")) {
throw new IllegalArgumentException("Topic name must contain only lowercase letters and underscores");
}

String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s)",
String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s) RETURNING idn",
topic, SQL.CORE_COLS, SQL.CORE_PH);

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
try (PreparedStatement stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
setEventOnStatement(stmt, event);
stmt.executeUpdate();
var rs = stmt.getGeneratedKeys();
if(rs.next()){
return (Long)rs.getObject(1);
}
}
return null;
}

/**
Expand All @@ -94,13 +100,13 @@ public static void publish(Event event, Connection connection, String topic) thr
* @throws IllegalArgumentException if the topic is null, empty, or contains
* invalid characters
*/
public static void publish(Event event, DataSource ds, String topic) throws SQLException {
public static Long publish(Event event, DataSource ds, String topic) throws SQLException {
if (topic == null || topic.trim().isEmpty() || !topic.matches("[a-z_]+")) {
throw new IllegalArgumentException("Invalid topic name: must be non-null, non-empty, and only contain lowercase letters and underscores.");
}

try (Connection c = ds.getConnection()) {
publish(event, c, topic);
return publish(event, c, topic);
}
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/com/p14n/postevent/data/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,7 @@ public static Event create(String id, String source, String type, String datacon

return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent);
}
public Event withIdn(Long idn){
return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent);
}
}
2 changes: 1 addition & 1 deletion debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.3.3-SNAPSHOT'
version = '1.3.4-SNAPSHOT'

repositories {
mavenCentral()
Expand Down
4 changes: 2 additions & 2 deletions grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.3.3-SNAPSHOT'
version = '1.3.4-SNAPSHOT'

repositories {
mavenCentral()
Expand All @@ -32,7 +32,7 @@ dependencies {
implementation 'io.grpc:grpc-stub:1.53.0'
implementation 'io.grpc:grpc-api:1.53.0'

implementation 'javax.annotation:javax.annotation-api:1.3.3-SNAPSHOT'
implementation 'javax.annotation:javax.annotation-api:1.3.2'

// For code generation
implementation 'com.google.protobuf:protobuf-java:3.21.7'
Expand Down
2 changes: 1 addition & 1 deletion vertx/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.3.3-SNAPSHOT'
version = '1.3.4-SNAPSHOT'


repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ public void publish(String topic, Event event) {

executor.submit(() -> {
try {
Publisher.publish(event, dataSource, topic);
Long idn = Publisher.publish(event, dataSource, topic);

// Then, publish to EventBus for real-time distribution
String eventBusAddress = "events." + topic;
eventBus.publish(eventBusAddress, event);

eventBus.publish(eventBusAddress, event.withIdn(idn));

logger.atDebug()
.addArgument(topic)
Expand Down Expand Up @@ -150,11 +151,11 @@ public void publish(String topic, TransactionalEvent event) {

try {

Publisher.publish(event.event(), event.connection(), topic);
Long idn = Publisher.publish(event.event(), event.connection(), topic);

// Then, publish to EventBus for real-time distribution
String eventBusAddress = "events." + topic;
eventBus.publish(eventBusAddress, event.event());
eventBus.publish(eventBusAddress, event.event().withIdn(idn));

logger.atDebug()
.addArgument(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException
"text",
null,
"test",
"hello".getBytes(), Instant.now(),1L ,"order",null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): No test added to verify that the generated idn is returned and propagated.

Please update the test to assert that the idn is generated, returned by publish, and included in the event received by the consumer.

Suggested implementation:

                    "text",
                    null,
                    "test",
                    "hello".getBytes(), Instant.now(),null ,"order",null));

            // Capture the returned idn from publish
            String publishedIdn = client.publish(
                "text",
                null,
                "test",
                "hello".getBytes(), Instant.now(),null ,"order",null
            );

            // Assert that the idn is generated
            assertNotNull(publishedIdn, "Published idn should not be null");

            // Use an AtomicReference to capture the received idn in the consumer
            AtomicReference<String> receivedIdn = new AtomicReference<>();

            client.subscribe("order", message -> {
                System.out.println("Got message");
                // Extract idn from the received message/event
                String eventIdn = message.getIdn(); // Adjust this if your message object uses a different getter
                receivedIdn.set(eventIdn);
                // Assert that the received idn matches the published idn
                assertEquals(publishedIdn, eventIdn, "Received idn should match published idn");
                    "text",
                    null,
                    "test",
                    "hello".getBytes(), Instant.now(),null ,"order",null));

            latch.await(10, TimeUnit.SECONDS);

            // Final assertion after latch to ensure propagation
            assertEquals(publishedIdn, receivedIdn.get(), "idn should be propagated to consumer");
  • Ensure that client.publish returns the idn. If it does not, update its implementation accordingly.
  • Ensure that the message object in the consumer has a getIdn() method or equivalent to extract the idn.
  • Make sure assertNotNull and assertEquals are statically imported from your test framework (e.g., JUnit).
  • Add AtomicReference<String> receivedIdn = new AtomicReference<>(); at the appropriate scope if not already present.

"hello".getBytes(), Instant.now(),null ,"order",null));

client.subscribe("order", message -> {
System.out.println("Got message");
Expand All @@ -56,7 +56,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException
"text",
null,
"test",
"hello".getBytes(), Instant.now(),2L ,"order",null));
"hello".getBytes(), Instant.now(),null ,"order",null));

latch.await(10, TimeUnit.SECONDS);

Expand Down
Loading