Skip to content

Commit 16c1ae3

Browse files
author
ben
committed
Add api to send messages via mysql
1 parent 20e9107 commit 16c1ae3

File tree

2 files changed

+368
-0
lines changed

2 files changed

+368
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.bencodez.simpleapi.servercomm.mysql;
2+
3+
import java.sql.Connection;
4+
import java.sql.PreparedStatement;
5+
import java.sql.ResultSet;
6+
import java.sql.SQLException;
7+
import java.sql.Statement;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.function.Consumer;
11+
12+
import javax.sql.DataSource;
13+
14+
/**
15+
* Backend-side messenger: listens on "backend-channel-<serverId>" and sends to
16+
* proxy. Also reuses dedicated publisher connection to conserve pool. Ensures
17+
* table exists before connecting.
18+
*/
19+
public class BackendMessenger {
20+
private final String myServerId;
21+
private final String CHANNEL;
22+
private final DataSource ds;
23+
private Connection lockConn;
24+
private Connection workConn;
25+
private Connection pubConn;
26+
private volatile boolean running = true;
27+
private long lastSeenId = 0;
28+
29+
private final Consumer<BackendMessage> onMessage;
30+
31+
private String tableName;
32+
33+
public BackendMessenger(String tableName, DataSource dataSource, String serverId,
34+
Consumer<BackendMessage> onMessage) throws SQLException {
35+
this.ds = dataSource;
36+
this.myServerId = serverId;
37+
this.CHANNEL = "backend-channel-" + serverId;
38+
this.onMessage = onMessage;
39+
this.tableName = tableName;
40+
ensureSchema(tableName);
41+
initConnections();
42+
startListener();
43+
}
44+
45+
private void ensureSchema(String tableName) throws SQLException {
46+
String ddl = "CREATE TABLE IF NOT EXISTS " + tableName + "_message_queue ("
47+
+ "id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, " + "source VARCHAR(36) NOT NULL, "
48+
+ "destination VARCHAR(36) NOT NULL, " + "created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
49+
+ "payload LONGTEXT NOT NULL, " + "PRIMARY KEY (id), " + "INDEX idx_dest_id (destination, id)"
50+
+ ") ENGINE=InnoDB;";
51+
try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) {
52+
stmt.execute(ddl);
53+
}
54+
}
55+
56+
private void initConnections() throws SQLException {
57+
this.lockConn = ds.getConnection();
58+
this.workConn = ds.getConnection();
59+
this.pubConn = ds.getConnection();
60+
if (!acquireLock(lockConn, CHANNEL, 10)) {
61+
throw new IllegalStateException("Could not acquire " + CHANNEL + " lock on startup");
62+
}
63+
}
64+
65+
private void startListener() {
66+
Thread t = new Thread(() -> {
67+
while (running) {
68+
try {
69+
if (acquireLock(lockConn, CHANNEL, 300)) {
70+
fetchBatch().forEach(onMessage);
71+
}
72+
} catch (SQLException e) {
73+
e.printStackTrace();
74+
reconnectOnError();
75+
}
76+
}
77+
}, "BackendMessenger-Listener-" + myServerId);
78+
t.setDaemon(true);
79+
t.start();
80+
}
81+
82+
private void deleteMessageById(long id) throws SQLException {
83+
String delSql = "DELETE FROM " + tableName + "_message_queue WHERE id = ?";
84+
try (PreparedStatement del = workConn.prepareStatement(delSql)) {
85+
del.setLong(1, id);
86+
del.executeUpdate();
87+
}
88+
}
89+
90+
private List<BackendMessage> fetchBatch() throws SQLException {
91+
String sql = "SELECT id, source, payload FROM " + tableName
92+
+ "_message_queue WHERE destination = ? AND id > ? ORDER BY id";
93+
List<BackendMessage> results = new ArrayList<>();
94+
try (PreparedStatement ps = workConn.prepareStatement(sql)) {
95+
ps.setString(1, myServerId);
96+
ps.setLong(2, lastSeenId);
97+
try (ResultSet rs = ps.executeQuery()) {
98+
while (rs.next()) {
99+
long id = rs.getLong("id");
100+
String from = rs.getString("source");
101+
String payload = rs.getString("payload");
102+
results.add(new BackendMessage(id, from, payload));
103+
lastSeenId = id;
104+
// Delete the message after processing
105+
deleteMessageById(id);
106+
}
107+
}
108+
}
109+
return results;
110+
}
111+
112+
public synchronized void sendToProxy(String payload) throws SQLException {
113+
String insertSql = "INSERT INTO " + tableName
114+
+ "_message_queue (source, destination, payload) VALUES (?, 'proxy', ?)";
115+
try (PreparedStatement ins = pubConn.prepareStatement(insertSql)) {
116+
ins.setString(1, myServerId);
117+
ins.setString(2, payload);
118+
ins.executeUpdate();
119+
}
120+
try (PreparedStatement rel = pubConn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
121+
rel.setString(1, "proxy-channel");
122+
rel.executeQuery();
123+
}
124+
}
125+
126+
private boolean acquireLock(Connection conn, String name, int timeout) throws SQLException {
127+
try (PreparedStatement ps = conn.prepareStatement("SELECT GET_LOCK(?, ?)")) {
128+
ps.setString(1, name);
129+
ps.setInt(2, timeout);
130+
try (ResultSet rs = ps.executeQuery()) {
131+
return rs.next() && rs.getInt(1) == 1;
132+
}
133+
}
134+
}
135+
136+
private void reconnectOnError() {
137+
try {
138+
if (lockConn != null)
139+
lockConn.close();
140+
if (workConn != null)
141+
workConn.close();
142+
if (pubConn != null)
143+
pubConn.close();
144+
initConnections();
145+
} catch (SQLException e) {
146+
e.printStackTrace();
147+
}
148+
}
149+
150+
public void shutdown() {
151+
running = false;
152+
closeQuiet(lockConn);
153+
closeQuiet(workConn);
154+
closeQuiet(pubConn);
155+
}
156+
157+
private void closeQuiet(Connection c) {
158+
if (c != null)
159+
try {
160+
c.close();
161+
} catch (SQLException ignored) {
162+
}
163+
}
164+
165+
public static class BackendMessage {
166+
public final long id;
167+
public final String fromServerId;
168+
public final String payload;
169+
170+
public BackendMessage(long id, String fromServerId, String payload) {
171+
this.id = id;
172+
this.fromServerId = fromServerId;
173+
this.payload = payload;
174+
}
175+
}
176+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.bencodez.simpleapi.servercomm.mysql;
2+
3+
import java.sql.Connection;
4+
import java.sql.PreparedStatement;
5+
import java.sql.ResultSet;
6+
import java.sql.SQLException;
7+
import java.sql.Statement;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.function.Consumer;
11+
12+
import javax.sql.DataSource;
13+
14+
/**
15+
* Proxy-side messenger: listens on "proxy-channel" and forwards to servers.
16+
* Reuses dedicated lock, worker, and publisher connections to avoid exhausting
17+
* the pool. Ensures the required table exists on initialization.
18+
*/
19+
public class ProxyMessenger {
20+
private static final String PROXY_CHANNEL = "proxy-channel";
21+
private final DataSource ds;
22+
private Connection lockConn;
23+
private Connection workConn;
24+
private Connection pubConn;
25+
private volatile boolean running = true;
26+
private long lastSeenId = 0;
27+
28+
private final Consumer<ProxyMessage> onMessage;
29+
private String tableName;
30+
31+
public ProxyMessenger(String tableName, DataSource dataSource, Consumer<ProxyMessage> onMessage)
32+
throws SQLException {
33+
this.ds = dataSource;
34+
this.onMessage = onMessage;
35+
this.tableName = tableName;
36+
ensureSchema(tableName);
37+
initConnections();
38+
startListener();
39+
}
40+
41+
private void ensureSchema(String tableName) throws SQLException {
42+
String ddl = "CREATE TABLE IF NOT EXISTS " + tableName + "_message_queue ("
43+
+ "id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, " + "source VARCHAR(36) NOT NULL, "
44+
+ "destination VARCHAR(36) NOT NULL, " + "created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
45+
+ "payload LONGTEXT NOT NULL, " + "PRIMARY KEY (id), " + "INDEX idx_dest_id (destination, id)"
46+
+ ") ENGINE=InnoDB;";
47+
try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) {
48+
stmt.execute(ddl);
49+
}
50+
}
51+
52+
private void initConnections() throws SQLException {
53+
// Open three long-lived connections
54+
this.lockConn = ds.getConnection();
55+
this.workConn = ds.getConnection();
56+
this.pubConn = ds.getConnection();
57+
// Prime the lock to enter wait-loop
58+
if (!acquireLock(lockConn, PROXY_CHANNEL, 10)) {
59+
throw new IllegalStateException("Could not acquire proxy-channel lock on startup");
60+
}
61+
}
62+
63+
private void startListener() {
64+
Thread t = new Thread(() -> {
65+
while (running) {
66+
try {
67+
if (acquireLock(lockConn, PROXY_CHANNEL, 300)) {
68+
List<ProxyMessage> batch = fetchBatch();
69+
batch.forEach(onMessage);
70+
}
71+
} catch (SQLException e) {
72+
e.printStackTrace();
73+
reconnectOnError();
74+
}
75+
}
76+
}, "ProxyMessenger-Listener");
77+
t.setDaemon(true);
78+
t.start();
79+
}
80+
81+
private void deleteMessageById(long id) throws SQLException {
82+
String delSql = "DELETE FROM " + tableName + "_message_queue WHERE id = ?";
83+
try (PreparedStatement del = workConn.prepareStatement(delSql)) {
84+
del.setLong(1, id);
85+
del.executeUpdate();
86+
}
87+
}
88+
89+
private List<ProxyMessage> fetchBatch() throws SQLException {
90+
String sql = "SELECT id, source, payload FROM " + tableName + "_message_queue "
91+
+ "WHERE destination='proxy' AND id > ? ORDER BY id";
92+
try (PreparedStatement ps = workConn.prepareStatement(sql)) {
93+
ps.setLong(1, lastSeenId);
94+
try (ResultSet rs = ps.executeQuery()) {
95+
List<ProxyMessage> results = new ArrayList<>();
96+
while (rs.next()) {
97+
long id = rs.getLong("id");
98+
String source = rs.getString("source");
99+
String payload = rs.getString("payload");
100+
results.add(new ProxyMessage(id, source, payload));
101+
lastSeenId = id;
102+
// Delete the message after processing
103+
deleteMessageById(id);
104+
}
105+
return results;
106+
}
107+
}
108+
}
109+
110+
/**
111+
* Sends a message from the proxy to a specific backend server.
112+
*/
113+
public synchronized void sendToBackend(String targetServerId, String payload) throws SQLException {
114+
String insertSql = "INSERT INTO " + tableName
115+
+ "_message_queue (source, destination, payload) VALUES ('proxy', ?, ?)";
116+
try (PreparedStatement ins = pubConn.prepareStatement(insertSql)) {
117+
ins.setString(1, targetServerId);
118+
ins.setString(2, payload);
119+
ins.executeUpdate();
120+
}
121+
String channel = "backend-channel-" + targetServerId;
122+
try (PreparedStatement rel = pubConn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
123+
rel.setString(1, channel);
124+
rel.executeQuery();
125+
}
126+
}
127+
128+
/**
129+
* Called by each backend instance to send a message to this proxy.
130+
*/
131+
public synchronized void sendToProxy(String fromServerId, String payload) throws SQLException {
132+
String insertSql = "INSERT INTO " + tableName
133+
+ "_message_queue (source, destination, payload) VALUES (?, 'proxy', ?)";
134+
try (PreparedStatement ins = pubConn.prepareStatement(insertSql)) {
135+
ins.setString(1, fromServerId);
136+
ins.setString(2, payload);
137+
ins.executeUpdate();
138+
}
139+
try (PreparedStatement rel = pubConn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
140+
rel.setString(1, PROXY_CHANNEL);
141+
rel.executeQuery();
142+
}
143+
}
144+
145+
private boolean acquireLock(Connection conn, String name, int timeout) throws SQLException {
146+
try (PreparedStatement ps = conn.prepareStatement("SELECT GET_LOCK(?, ?)")) {
147+
ps.setString(1, name);
148+
ps.setInt(2, timeout);
149+
try (ResultSet rs = ps.executeQuery()) {
150+
return rs.next() && rs.getInt(1) == 1;
151+
}
152+
}
153+
}
154+
155+
private void reconnectOnError() {
156+
try {
157+
closeQuiet(lockConn);
158+
closeQuiet(workConn);
159+
closeQuiet(pubConn);
160+
initConnections();
161+
} catch (SQLException e) {
162+
e.printStackTrace();
163+
}
164+
}
165+
166+
public void shutdown() {
167+
running = false;
168+
closeQuiet(lockConn);
169+
closeQuiet(workConn);
170+
closeQuiet(pubConn);
171+
}
172+
173+
private void closeQuiet(Connection c) {
174+
if (c != null)
175+
try {
176+
c.close();
177+
} catch (SQLException ignored) {
178+
}
179+
}
180+
181+
public static class ProxyMessage {
182+
public final long id;
183+
public final String sourceServerId;
184+
public final String payload;
185+
186+
public ProxyMessage(long id, String sourceServerId, String payload) {
187+
this.id = id;
188+
this.sourceServerId = sourceServerId;
189+
this.payload = payload;
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)