Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
mavenCentralUsername=Kln1O3JI
mavenCentralPassword=RcbimI2WYchCROzFmBMERuas8n5wf3ZeW0TY0nJ/IWce

signing.keyId=0x872438F8
signing.password=enjoy8thesilence
#signing.secretKeyRingFile=/Users/dean.chapman/.gnupg/trustdb.gpg

#gpg --keyserver 'hkp://keyserver.ubuntu.com:11371' --send-keys 708DD1C3FC4137A2669B64E1D5348B50872438F8

# -----BEGIN PGP PRIVATE KEY BLOCK-----

# lIYEaA1U9xYJKwYBBAHaRw8BAQdAugFkVpzb99CpAFitVidMOxdf+YQtdn0Vuo6v
# GjASd8r+BwMCFIXC0hp7wwr7ZA4WB9AOBQ19DPgwudWhyXQLMnU90JEr0b3+B5/G
# tnZ5H7Sv0HyBLKMdd741J1rV/l89blVRIOYjpRkkV1ehzCVHNrcbo7QcRGVhbiBD
# aGFwbWFuIDxkZWFuQHAxNG4uY29tPoiZBBMWCgBBFiEEcI3Rw/xBN6Jmm2Th1TSL
# UIckOPgFAmgNVPcCGwMFCQWjmoAFCwkIBwICIgIGFQoJCAsCBBYCAwECHgcCF4AA
# CgkQ1TSLUIckOPgnfAD9Ed4wOn+itwIQREbjhw4dWvBtEs7opM15CASO048/+nUA
# /0ZbzarkL6kA77+6I5rr0MGb4DI+Vq5L28uIpqf/iHwAnIsEaA1U9xIKKwYBBAGX
# VQEFAQEHQBb0/CcvgKQ+k8pL/l4tcZ74MKQ9hu7rdmRC85l1r4QOAwEIB/4HAwJ/
# QhTOJDDBQ/vDzLxprGLao7v1+egxs7SwF2qMZYxdt0UOI3Rf4+V88XYDxc/U+qxN
# Bp77yZCQmgbEALgHrjxj5PX+j57wZJusATleT04NiH4EGBYKACYWIQRwjdHD/EE3
# omabZOHVNItQhyQ4+AUCaA1U9wIbDAUJBaOagAAKCRDVNItQhyQ4+Gz+AQCI2Tsn
# fputBoCbvdhTW1dZvtOrYh388LMlB0oAyERWPQD/SCeqKp/ra+39ESJ7HAAqpAp3
# dprt5bZ1tkPmaU0F5go=
# =6Ynw
# -----END PGP PRIVATE KEY BLOCK-----
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
public enum SystemEvent implements Traceable {

CatchupRequired,
UnprocessedCheckRequired;
UnprocessedCheckRequired,
FetchLatest;

public String topic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ public void subscribe(MessageSubscriber<SystemEvent> subscriber) {
subscribe("system", subscriber);
}

public void triggerFetchLatest(String topic) {
publish(SystemEvent.FetchLatest.withTopic(topic));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,36 @@ public List<Event> fetchEvents(long startAfter, long end, int maxResults, String
throw new RuntimeException("Failed to fetch events", e);
}
}

@Override
public long getLatestMessageId(String topic) {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}

String sql = String.format("SELECT MAX(idn) FROM postevent.%s", topic);

try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
long latestId = rs.getLong(1);
if (rs.wasNull()) {
logger.atInfo().log("No messages found for topic {}", topic);
return 0;
}
logger.atInfo().log("Latest message ID for topic {} is {}", topic, latestId);
return latestId;
} else {
logger.atInfo().log("No messages found for topic {}", topic);
return 0;
}
}

} catch (SQLException e) {
logger.atError().setCause(e).log("Error getting latest message ID for topic {}", topic);
throw new RuntimeException("Failed to get latest message ID", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,12 @@ public interface CatchupServerInterface {
* @return A list of events within the specified range
*/
List<Event> fetchEvents(long startAfter, long end, int maxResults, String topic);

/**
* Gets the latest (highest) message ID for a given topic.
*
* @param topic The name of the topic
* @return The latest message ID, or 0 if no messages exist for the topic
*/
long getLatestMessageId(String topic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,64 @@ private void updateHwm(Connection connection, String topicName, long currentHwm,
}
}

/**
* Fetches the latest message from the server if it's greater than the current HWM
* and inserts it into the messages table.
*
* @param topicName The name of the topic
* @return The number of events processed (0 or 1)
*/
public int fetchLatest(String topicName) {
try (Connection conn = datasource.getConnection()) {
conn.setAutoCommit(false);

long currentHwm = getCurrentHwm(conn, topicName);

// Get the latest message ID from the server
long latestId = catchupServer.getLatestMessageId(topicName);

LOGGER.info(String.format("Current HWM %d, latest server message ID %d for topic %s",
currentHwm, latestId, topicName));

if (latestId <= currentHwm) {
LOGGER.info("No new messages to fetch for topic: " + topicName);
return 0;
}

// Fetch the latest message from the server
List<Event> events = catchupServer.fetchEvents(latestId - 1, latestId, 1, topicName);

if (events.isEmpty()) {
LOGGER.info("Latest message not found for topic: " + topicName);
return 0;
}

// Write the latest event to the messages table
int processedCount = writeEventsToMessagesTable(conn, events);

LOGGER.info(String.format("Processed %d latest event for topic %s with ID %d",
processedCount, topicName, latestId));

conn.commit();

// Trigger a catchup to fill any gaps
if (processedCount > 0) {
systemEventBroker.publish(SystemEvent.CatchupRequired.withTopic(topicName));
}

return processedCount;
} catch (SQLException e) {
LOGGER.error("Failed to fetch latest message", e);
return 0;
}
}

@Override
public void onMessage(SystemEvent message) {
if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) {
oneAtATime(() -> catchup(message.topic), () -> onMessage(message));
} else if (message == SystemEvent.FetchLatest) {
oneAtATime(() -> fetchLatest(message.topic), () -> onMessage(message));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@ public List<Event> fetchEvents(long startAfter, long end, int maxResults, String
return events;
}

@Override
public long getLatestMessageId(String topic) {
logger.atInfo()
.addArgument(topic)
.log("Getting latest message ID for topic {}");

GetLatestMessageIdRequest request = GetLatestMessageIdRequest.newBuilder()
.setTopic(topic)
.build();

GetLatestMessageIdResponse response;
try {
response = blockingStub.getLatestMessageId(request);
} catch (StatusRuntimeException e) {
logger.atWarn().setCause(e).log("RPC failed: {}", e.getStatus());
throw new RuntimeException("Failed to get latest message ID via gRPC", e);
}

long latestId = response.getLatestId();
logger.info("Latest message ID for topic {} is {}", topic, latestId);
return latestId;
}

private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEvent, String topic) {
OffsetDateTime time = null;
if (!grpcEvent.getTime().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ public void fetchEvents(FetchEventsRequest request, StreamObserver<FetchEventsRe
}
}

@Override
public void getLatestMessageId(GetLatestMessageIdRequest request, StreamObserver<GetLatestMessageIdResponse> responseObserver) {
try {
long latestId = catchupServer.getLatestMessageId(request.getTopic());

GetLatestMessageIdResponse response = GetLatestMessageIdResponse.newBuilder()
.setLatestId(latestId)
.build();

responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error getting latest message ID", e);
responseObserver.onError(e);
}
}

private com.p14n.postevent.catchup.grpc.Event convertToGrpcEvent(Event event) {
return com.p14n.postevent.catchup.grpc.Event.newBuilder()
.setId(event.id())
Expand Down
13 changes: 13 additions & 0 deletions library/src/main/proto/catchup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package catchup;
service CatchupService {
// Fetch events from a specific point
rpc FetchEvents (FetchEventsRequest) returns (FetchEventsResponse) {}

// Get the latest message ID for a topic
rpc GetLatestMessageId (GetLatestMessageIdRequest) returns (GetLatestMessageIdResponse) {}
}

// The request message containing fetch parameters
Expand All @@ -25,6 +28,16 @@ message FetchEventsResponse {
repeated Event events = 1;
}

// The request message for getting latest message ID
message GetLatestMessageIdRequest {
string topic = 1;
}

// The response message containing latest message ID
message GetLatestMessageIdResponse {
int64 latest_id = 1;
}

// Event message
message Event {
string id = 1;
Expand Down
77 changes: 77 additions & 0 deletions library/src/test/java/com/p14n/postevent/CatchupServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,72 @@ public void testHasSequenceGapWithGap() throws Exception {
}
}

@Test
public void testFetchLatestWithNewerMessage() throws Exception {
try (Connection connection = pg.getPostgresDatabase().getConnection()) {
connection.setAutoCommit(false);

// Publish some initial events to establish a baseline
log.debug("Publishing initial 5 events");
for (int i = 1; i <= 5; i++) {
Event event = createTestEvent(i);
Publisher.publish(event, connection, TEST_TOPIC);
}

// Copy first 3 events to messages table and set HWM to 3
copyEventsToMessages(connection, 1);
initializeHwm(connection, TEST_TOPIC, 3);

long initialHwm = getCurrentHwm(connection, TEST_TOPIC);
log.debug("Initial HWM: {}", initialHwm);

// Fetch latest should get the newest message (ID 5)
int processedCount = catchupService.fetchLatest(TEST_TOPIC);

// Should have processed 1 event (the latest one)
assertEquals(1, processedCount, "Should have processed 1 latest event");

// Verify that the latest message was added to the messages table
int messagesCount = countMessagesInTable(connection);
assertEquals(4, messagesCount, "Should have 4 messages in messages table (3 initial + 1 latest)");

// Verify that the latest message ID is 5
long latestIdInMessages = getMaxIdnFromMessagesTable(connection);
assertEquals(5, latestIdInMessages, "Latest message ID should be 5");
}
}

@Test
public void testFetchLatestWithNoNewerMessage() throws Exception {
try (Connection connection = pg.getPostgresDatabase().getConnection()) {
connection.setAutoCommit(false);

// Publish some events
log.debug("Publishing 3 events");
for (int i = 1; i <= 3; i++) {
Event event = createTestEvent(i);
Publisher.publish(event, connection, TEST_TOPIC);
}

// Copy all events to messages table and set HWM to the latest
copyEventsToMessages(connection, 1);
initializeHwm(connection, TEST_TOPIC, 3);

long initialHwm = getCurrentHwm(connection, TEST_TOPIC);
log.debug("Initial HWM: {}", initialHwm);

// Fetch latest should find no newer messages
int processedCount = catchupService.fetchLatest(TEST_TOPIC);

// Should have processed 0 events
assertEquals(0, processedCount, "Should have processed 0 events when no newer messages exist");

// Verify messages count remains the same
int messagesCount = countMessagesInTable(connection);
assertEquals(3, messagesCount, "Should still have 3 messages in messages table");
}
}

/**
* Helper method to initialize HWM for a subscriber
*/
Expand Down Expand Up @@ -300,4 +366,15 @@ private int countMessagesInTable(Connection connection) throws Exception {
}
}

private long getMaxIdnFromMessagesTable(Connection connection) throws Exception {
String sql = "SELECT MAX(idn) FROM postevent.messages";
try (PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getLong(1);
}
return 0;
}
}

}