Skip to content

Commit 20e9107

Browse files
committed
Adjust redis handler to handle multiple listeners
1 parent 50ef854 commit 20e9107

File tree

1 file changed

+36
-13
lines changed

1 file changed

+36
-13
lines changed

SimpleAPI/src/main/java/com/bencodez/simpleapi/servercomm/redis/RedisHandler.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package com.bencodez.simpleapi.servercomm.redis;
22

3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
36
import redis.clients.jedis.Jedis;
47
import redis.clients.jedis.JedisPool;
58
import redis.clients.jedis.JedisPoolConfig;
69

710
public abstract class RedisHandler {
811
private final JedisPool publishPool;
912
private final JedisPool subscribePool;
13+
14+
private final Map<RedisListener, Thread> listenerThreads = new ConcurrentHashMap<>();
1015

1116
public RedisHandler(String host, int port, String username, String password) {
1217
int timeout = 2000; // Set a reasonable timeout
@@ -23,31 +28,49 @@ public RedisHandler(String host, int port, String username, String password) {
2328
}
2429

2530
public void close() {
26-
try {
27-
if (subscribeThread != null && subscribeThread.isAlive()) {
28-
subscribeThread.interrupt(); // Signal the thread to stop
31+
debug("Shutting down RedisHandler");
32+
33+
// Unsubscribe all listeners and stop threads
34+
for (Map.Entry<RedisListener, Thread> entry : listenerThreads.entrySet()) {
35+
RedisListener listener = entry.getKey();
36+
Thread thread = entry.getValue();
37+
38+
try {
39+
debug("Unsubscribing Redis listener on channel: " + listener.getChannel());
40+
listener.unsubscribe(); // Gracefully signal jedis.subscribe() to exit
41+
} catch (Exception e) {
42+
debug("Failed to unsubscribe listener: " + e.getMessage());
43+
}
44+
45+
try {
46+
if (thread != null && thread.isAlive()) {
47+
thread.join(2000); // Give up to 2 seconds for it to shut down
48+
}
49+
} catch (InterruptedException e) {
50+
debug("Interrupted while waiting for Redis listener thread to finish: " + e.getMessage());
2951
}
30-
} catch (Exception e) {
31-
debug("Failed to interrupt Redis subscribe thread: " + e.getMessage());
3252
}
33-
34-
53+
54+
listenerThreads.clear();
55+
56+
// Close Redis pools
3557
publishPool.close();
3658
subscribePool.close();
3759
}
3860

39-
private Thread subscribeThread;
40-
4161
public void loadListener(RedisListener listener) {
42-
subscribeThread = new Thread(() -> {
62+
Thread thread = new Thread(() -> {
4363
try (Jedis jedis = subscribePool.getResource()) {
64+
debug("Starting Redis subscription for channel: " + listener.getChannel());
4465
jedis.subscribe(listener, listener.getChannel());
4566
} catch (Exception e) {
46-
debug("Redis subscribe error: " + e.getMessage());
67+
debug("Redis subscribe error on channel " + listener.getChannel() + ": " + e.getMessage());
4768
}
48-
}, "RedisSubscribeThread");
69+
}, "RedisSubscribeThread-" + listener.getChannel());
4970

50-
subscribeThread.start();
71+
thread.setDaemon(true);
72+
listenerThreads.put(listener, thread);
73+
thread.start();
5174
}
5275

5376
public abstract void debug(String message);

0 commit comments

Comments
 (0)