diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..f2e32bc --- /dev/null +++ b/gradle.properties @@ -0,0 +1,26 @@ +mavenCentralUsername=Kln1O3JI +mavenCentralPassword=RcbimI2WYchCROzFmBMERuas8n5wf3ZeW0TY0nJ/IWce + +signing.keyId=0x872438F8 +signing.password=enjoy8thesilence +#signing.secretKeyRingFile=/Users/dean.chapman/.gnupg/trustdb.gpg + +#gpg --keyserver 'hkp://keyserver.ubuntu.com:11371' --send-keys 708DD1C3FC4137A2669B64E1D5348B50872438F8 + +# -----BEGIN PGP PRIVATE KEY BLOCK----- + +# lIYEaA1U9xYJKwYBBAHaRw8BAQdAugFkVpzb99CpAFitVidMOxdf+YQtdn0Vuo6v +# GjASd8r+BwMCFIXC0hp7wwr7ZA4WB9AOBQ19DPgwudWhyXQLMnU90JEr0b3+B5/G +# tnZ5H7Sv0HyBLKMdd741J1rV/l89blVRIOYjpRkkV1ehzCVHNrcbo7QcRGVhbiBD +# aGFwbWFuIDxkZWFuQHAxNG4uY29tPoiZBBMWCgBBFiEEcI3Rw/xBN6Jmm2Th1TSL +# UIckOPgFAmgNVPcCGwMFCQWjmoAFCwkIBwICIgIGFQoJCAsCBBYCAwECHgcCF4AA +# CgkQ1TSLUIckOPgnfAD9Ed4wOn+itwIQREbjhw4dWvBtEs7opM15CASO048/+nUA +# /0ZbzarkL6kA77+6I5rr0MGb4DI+Vq5L28uIpqf/iHwAnIsEaA1U9xIKKwYBBAGX +# VQEFAQEHQBb0/CcvgKQ+k8pL/l4tcZ74MKQ9hu7rdmRC85l1r4QOAwEIB/4HAwJ/ +# QhTOJDDBQ/vDzLxprGLao7v1+egxs7SwF2qMZYxdt0UOI3Rf4+V88XYDxc/U+qxN +# Bp77yZCQmgbEALgHrjxj5PX+j57wZJusATleT04NiH4EGBYKACYWIQRwjdHD/EE3 +# omabZOHVNItQhyQ4+AUCaA1U9wIbDAUJBaOagAAKCRDVNItQhyQ4+Gz+AQCI2Tsn +# fputBoCbvdhTW1dZvtOrYh388LMlB0oAyERWPQD/SCeqKp/ra+39ESJ7HAAqpAp3 +# dprt5bZ1tkPmaU0F5go= +# =6Ynw +# -----END PGP PRIVATE KEY BLOCK----- \ No newline at end of file diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java index fea6fa5..fb16364 100644 --- a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java +++ b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java @@ -5,7 +5,8 @@ public enum SystemEvent implements Traceable { CatchupRequired, - UnprocessedCheckRequired; + UnprocessedCheckRequired, + FetchLatest; public String topic; diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java index 1c2d672..bcb6601 100644 --- a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java +++ b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java @@ -26,4 +26,8 @@ public void subscribe(MessageSubscriber subscriber) { subscribe("system", subscriber); } + public void triggerFetchLatest(String topic) { + publish(SystemEvent.FetchLatest.withTopic(topic)); + } + } diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java index ab4a316..5085e7b 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java @@ -65,4 +65,36 @@ public List fetchEvents(long startAfter, long end, int maxResults, String throw new RuntimeException("Failed to fetch events", e); } } + + @Override + public long getLatestMessageId(String topic) { + if (topic == null || topic.trim().isEmpty()) { + throw new IllegalArgumentException("Topic name cannot be null or empty"); + } + + String sql = String.format("SELECT MAX(idn) FROM postevent.%s", topic); + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + try (ResultSet rs = stmt.executeQuery()) { + if (rs.next()) { + long latestId = rs.getLong(1); + if (rs.wasNull()) { + logger.atInfo().log("No messages found for topic {}", topic); + return 0; + } + logger.atInfo().log("Latest message ID for topic {} is {}", topic, latestId); + return latestId; + } else { + logger.atInfo().log("No messages found for topic {}", topic); + return 0; + } + } + + } catch (SQLException e) { + logger.atError().setCause(e).log("Error getting latest message ID for topic {}", topic); + throw new RuntimeException("Failed to get latest message ID", e); + } + } } \ No newline at end of file diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java index 8105a5d..e3f8514 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java @@ -17,4 +17,12 @@ public interface CatchupServerInterface { * @return A list of events within the specified range */ List fetchEvents(long startAfter, long end, int maxResults, String topic); + + /** + * Gets the latest (highest) message ID for a given topic. + * + * @param topic The name of the topic + * @return The latest message ID, or 0 if no messages exist for the topic + */ + long getLatestMessageId(String topic); } \ No newline at end of file diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java index 76a5b3b..51fcf7a 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java @@ -182,10 +182,64 @@ private void updateHwm(Connection connection, String topicName, long currentHwm, } } + /** + * Fetches the latest message from the server if it's greater than the current HWM + * and inserts it into the messages table. + * + * @param topicName The name of the topic + * @return The number of events processed (0 or 1) + */ + public int fetchLatest(String topicName) { + try (Connection conn = datasource.getConnection()) { + conn.setAutoCommit(false); + + long currentHwm = getCurrentHwm(conn, topicName); + + // Get the latest message ID from the server + long latestId = catchupServer.getLatestMessageId(topicName); + + LOGGER.info(String.format("Current HWM %d, latest server message ID %d for topic %s", + currentHwm, latestId, topicName)); + + if (latestId <= currentHwm) { + LOGGER.info("No new messages to fetch for topic: " + topicName); + return 0; + } + + // Fetch the latest message from the server + List events = catchupServer.fetchEvents(latestId - 1, latestId, 1, topicName); + + if (events.isEmpty()) { + LOGGER.info("Latest message not found for topic: " + topicName); + return 0; + } + + // Write the latest event to the messages table + int processedCount = writeEventsToMessagesTable(conn, events); + + LOGGER.info(String.format("Processed %d latest event for topic %s with ID %d", + processedCount, topicName, latestId)); + + conn.commit(); + + // Trigger a catchup to fill any gaps + if (processedCount > 0) { + systemEventBroker.publish(SystemEvent.CatchupRequired.withTopic(topicName)); + } + + return processedCount; + } catch (SQLException e) { + LOGGER.error("Failed to fetch latest message", e); + return 0; + } + } + @Override public void onMessage(SystemEvent message) { if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) { oneAtATime(() -> catchup(message.topic), () -> onMessage(message)); + } else if (message == SystemEvent.FetchLatest) { + oneAtATime(() -> fetchLatest(message.topic), () -> onMessage(message)); } } diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java b/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java index b7bcc46..2eef3dc 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java +++ b/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java @@ -63,6 +63,29 @@ public List fetchEvents(long startAfter, long end, int maxResults, String return events; } + @Override + public long getLatestMessageId(String topic) { + logger.atInfo() + .addArgument(topic) + .log("Getting latest message ID for topic {}"); + + GetLatestMessageIdRequest request = GetLatestMessageIdRequest.newBuilder() + .setTopic(topic) + .build(); + + GetLatestMessageIdResponse response; + try { + response = blockingStub.getLatestMessageId(request); + } catch (StatusRuntimeException e) { + logger.atWarn().setCause(e).log("RPC failed: {}", e.getStatus()); + throw new RuntimeException("Failed to get latest message ID via gRPC", e); + } + + long latestId = response.getLatestId(); + logger.info("Latest message ID for topic {} is {}", topic, latestId); + return latestId; + } + private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEvent, String topic) { OffsetDateTime time = null; if (!grpcEvent.getTime().isEmpty()) { diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java b/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java index f799570..253286a 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java +++ b/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java @@ -84,6 +84,23 @@ public void fetchEvents(FetchEventsRequest request, StreamObserver responseObserver) { + try { + long latestId = catchupServer.getLatestMessageId(request.getTopic()); + + GetLatestMessageIdResponse response = GetLatestMessageIdResponse.newBuilder() + .setLatestId(latestId) + .build(); + + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + logger.error("Error getting latest message ID", e); + responseObserver.onError(e); + } + } + private com.p14n.postevent.catchup.grpc.Event convertToGrpcEvent(Event event) { return com.p14n.postevent.catchup.grpc.Event.newBuilder() .setId(event.id()) diff --git a/library/src/main/proto/catchup.proto b/library/src/main/proto/catchup.proto index e3433d1..4e2e6a1 100644 --- a/library/src/main/proto/catchup.proto +++ b/library/src/main/proto/catchup.proto @@ -10,6 +10,9 @@ package catchup; service CatchupService { // Fetch events from a specific point rpc FetchEvents (FetchEventsRequest) returns (FetchEventsResponse) {} + + // Get the latest message ID for a topic + rpc GetLatestMessageId (GetLatestMessageIdRequest) returns (GetLatestMessageIdResponse) {} } // The request message containing fetch parameters @@ -25,6 +28,16 @@ message FetchEventsResponse { repeated Event events = 1; } +// The request message for getting latest message ID +message GetLatestMessageIdRequest { + string topic = 1; +} + +// The response message containing latest message ID +message GetLatestMessageIdResponse { + int64 latest_id = 1; +} + // Event message message Event { string id = 1; diff --git a/library/src/test/java/com/p14n/postevent/CatchupServiceTest.java b/library/src/test/java/com/p14n/postevent/CatchupServiceTest.java index 0669a67..e65a790 100644 --- a/library/src/test/java/com/p14n/postevent/CatchupServiceTest.java +++ b/library/src/test/java/com/p14n/postevent/CatchupServiceTest.java @@ -253,6 +253,72 @@ public void testHasSequenceGapWithGap() throws Exception { } } + @Test + public void testFetchLatestWithNewerMessage() throws Exception { + try (Connection connection = pg.getPostgresDatabase().getConnection()) { + connection.setAutoCommit(false); + + // Publish some initial events to establish a baseline + log.debug("Publishing initial 5 events"); + for (int i = 1; i <= 5; i++) { + Event event = createTestEvent(i); + Publisher.publish(event, connection, TEST_TOPIC); + } + + // Copy first 3 events to messages table and set HWM to 3 + copyEventsToMessages(connection, 1); + initializeHwm(connection, TEST_TOPIC, 3); + + long initialHwm = getCurrentHwm(connection, TEST_TOPIC); + log.debug("Initial HWM: {}", initialHwm); + + // Fetch latest should get the newest message (ID 5) + int processedCount = catchupService.fetchLatest(TEST_TOPIC); + + // Should have processed 1 event (the latest one) + assertEquals(1, processedCount, "Should have processed 1 latest event"); + + // Verify that the latest message was added to the messages table + int messagesCount = countMessagesInTable(connection); + assertEquals(4, messagesCount, "Should have 4 messages in messages table (3 initial + 1 latest)"); + + // Verify that the latest message ID is 5 + long latestIdInMessages = getMaxIdnFromMessagesTable(connection); + assertEquals(5, latestIdInMessages, "Latest message ID should be 5"); + } + } + + @Test + public void testFetchLatestWithNoNewerMessage() throws Exception { + try (Connection connection = pg.getPostgresDatabase().getConnection()) { + connection.setAutoCommit(false); + + // Publish some events + log.debug("Publishing 3 events"); + for (int i = 1; i <= 3; i++) { + Event event = createTestEvent(i); + Publisher.publish(event, connection, TEST_TOPIC); + } + + // Copy all events to messages table and set HWM to the latest + copyEventsToMessages(connection, 1); + initializeHwm(connection, TEST_TOPIC, 3); + + long initialHwm = getCurrentHwm(connection, TEST_TOPIC); + log.debug("Initial HWM: {}", initialHwm); + + // Fetch latest should find no newer messages + int processedCount = catchupService.fetchLatest(TEST_TOPIC); + + // Should have processed 0 events + assertEquals(0, processedCount, "Should have processed 0 events when no newer messages exist"); + + // Verify messages count remains the same + int messagesCount = countMessagesInTable(connection); + assertEquals(3, messagesCount, "Should still have 3 messages in messages table"); + } + } + /** * Helper method to initialize HWM for a subscriber */ @@ -300,4 +366,15 @@ private int countMessagesInTable(Connection connection) throws Exception { } } + private long getMaxIdnFromMessagesTable(Connection connection) throws Exception { + String sql = "SELECT MAX(idn) FROM postevent.messages"; + try (PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet rs = stmt.executeQuery()) { + if (rs.next()) { + return rs.getLong(1); + } + return 0; + } + } + } \ No newline at end of file