Skip to content

Commit 92d169f

Browse files
authored
Merge pull request #124 from sb-05-mopl/feature/modify-WatchingSession-Batch-Refresh#117
[WatchingSession] 레디스 TTL 배치로 갱신, 레디스나 웹소켓 서버 비정상 종료시 대응 추가
2 parents f4555fd + ea54c76 commit 92d169f

10 files changed

Lines changed: 409 additions & 129 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
package com.mopl.moplwebsocketsse.domain.watch.interceptor;
22

3-
import java.util.List;
4-
53
import org.springframework.messaging.Message;
64
import org.springframework.messaging.MessageChannel;
75
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
86
import org.springframework.messaging.simp.SimpMessageType;
97
import org.springframework.messaging.support.ChannelInterceptor;
108
import org.springframework.stereotype.Component;
119

12-
import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping;
1310
import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry;
14-
import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;
1511

1612
import lombok.RequiredArgsConstructor;
1713
import lombok.extern.slf4j.Slf4j;
@@ -22,41 +18,25 @@
2218
public class WatchingSessionHeartbeatInterceptor implements ChannelInterceptor {
2319

2420
private final WatchingSessionRegistry registry;
25-
private final WatchingSessionRepository repository;
2621

2722
@Override
2823
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
29-
if (!sent)
24+
if (!sent) {
3025
return;
26+
}
3127

3228
SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
33-
if (type != SimpMessageType.HEARTBEAT)
29+
if (type != SimpMessageType.HEARTBEAT) {
3430
return;
31+
}
3532

3633
String wsSessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
37-
if (wsSessionId == null)
38-
return;
39-
40-
List<SessionMapping> mappings = registry.getAllByWsSessionId(wsSessionId);
41-
42-
if (mappings.isEmpty()) {
43-
log.warn("[WatchingSessionHeartbeatInterceptor] No mappings found! wsId={}", wsSessionId);
34+
if (wsSessionId == null) {
4435
return;
4536
}
4637

47-
for (SessionMapping mapping : mappings) {
48-
try {
49-
boolean alive = repository.refreshSessionTtl(mapping.watchingSessionId(), mapping.userId());
50-
51-
if (!alive) {
52-
log.warn("[WatchingSessionHeartbeatInterceptor] Session not alive, removing. wsId={}, subId={}, watchingId={}",
53-
wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId());
54-
registry.removeBySubscriptionId(mapping.webSocketSessionId(), mapping.subscriptionId());
55-
}
56-
} catch (Exception e) {
57-
log.warn("[WatchingSessionHeartbeatInterceptor] Redis 세션 갱신 실패. wsId={}, subId={}",
58-
wsSessionId, mapping.subscriptionId(), e);
59-
}
60-
}
38+
registry.updateLastActiveTimeByWsSessionId(wsSessionId);
39+
40+
log.trace("[Heartbeat] Updated lastActiveTime. wsId={}", wsSessionId);
6141
}
6242
}

mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/listener/WatchingSessionEventListener.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.mopl.moplwebsocketsse.domain.watch.event.WatchingSessionStartedEvent;
1919
import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping;
2020
import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry;
21+
import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionHolder;
2122
import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;
2223
import com.mopl.moplwebsocketsse.domain.watch.service.WatchingSessionService;
2324

@@ -34,6 +35,7 @@ public class WatchingSessionEventListener {
3435
private final WatchingSessionService wsService;
3536
private final WatchingSessionEventPublisher wsPublisher;
3637
private final SimpMessagingTemplate messagingTemplate;
38+
private final WebSocketSessionHolder webSocketSessionHolder;
3739

3840
@EventListener
3941
public void handleSessionSubscribe(SessionSubscribeEvent event) {
@@ -70,7 +72,7 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) {
7072
wsRegistry.register(wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId);
7173

7274
log.debug(
73-
"[WatchingSessionEventListener] SUBSCRIBE After registry. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
75+
"[WatchingSessionEventListener] SUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
7476
wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId
7577
);
7678

@@ -93,8 +95,8 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) {
9395
wsPublisher.publish(startedEvent);
9496
}
9597
} catch (Exception e) {
96-
log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}", watchingSession.getId(),
97-
e);
98+
log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}",
99+
watchingSession.getId(), e);
98100
}
99101
}
100102

@@ -117,20 +119,20 @@ public void handleSessionUnsubscribe(SessionUnsubscribeEvent event) {
117119

118120
try {
119121
WatchingSessionChange message = wsService.createLeaveMessage(
120-
mapping.watchingSessionId(),
121-
mapping.contentId()
122+
mapping.getWatchingSessionId(),
123+
mapping.getContentId()
122124
);
123-
broadcastToWatchers(mapping.contentId(), message);
125+
broadcastToWatchers(mapping.getContentId(), message);
124126
} catch (Exception e) {
125127
log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}",
126-
mapping.watchingSessionId(), e);
128+
mapping.getWatchingSessionId(), e);
127129
}
128130

129-
wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId());
131+
wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId());
130132

131133
log.debug(
132134
"[WatchingSessionEventListener] UNSUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
133-
wsSessionId, subscriptionId, mapping.watchingSessionId(), mapping.userId(), mapping.contentId()
135+
wsSessionId, subscriptionId, mapping.getWatchingSessionId(), mapping.getUserId(), mapping.getContentId()
134136
);
135137
}
136138

@@ -144,6 +146,7 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) {
144146
}
145147

146148
List<SessionMapping> mappings = wsRegistry.removeAllByWsSessionId(wsSessionId);
149+
webSocketSessionHolder.remove(wsSessionId);
147150

148151
if (mappings.isEmpty()) {
149152
log.debug("[WatchingSessionEventListener] No mappings found on DISCONNECT. wsId={}", wsSessionId);
@@ -153,21 +156,21 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) {
153156
for (SessionMapping mapping : mappings) {
154157
try {
155158
WatchingSessionChange message = wsService.createLeaveMessage(
156-
mapping.watchingSessionId(),
157-
mapping.contentId()
159+
mapping.getWatchingSessionId(),
160+
mapping.getContentId()
158161
);
159-
broadcastToWatchers(mapping.contentId(), message);
162+
broadcastToWatchers(mapping.getContentId(), message);
160163
} catch (Exception e) {
161164
log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}",
162-
mapping.watchingSessionId(), e);
165+
mapping.getWatchingSessionId(), e);
163166
}
164167

165-
wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId());
168+
wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId());
166169

167170
log.debug(
168171
"[WatchingSessionEventListener] DISCONNECT. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
169-
wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId(),
170-
mapping.userId(), mapping.contentId()
172+
wsSessionId, mapping.getSubscriptionId(), mapping.getWatchingSessionId(),
173+
mapping.getUserId(), mapping.getContentId()
171174
);
172175
}
173176
}
Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,49 @@
11
package com.mopl.moplwebsocketsse.domain.watch.registry;
22

3+
import java.time.Instant;
34
import java.util.UUID;
45

6+
import org.springframework.web.socket.WebSocketSession;
7+
8+
import lombok.Getter;
9+
510
/**
611
* 세션 매핑 정보
7-
*
8-
* @param webSocketSessionId WebSocket STOMP Session ID
9-
* @param watchingSessionId WatchingSession ID
10-
* @param userId 사용자 ID
11-
* @param contentId 콘텐츠 ID
1212
*/
13-
public record SessionMapping(
14-
String webSocketSessionId,
15-
String subscriptionId,
16-
UUID watchingSessionId,
17-
UUID userId,
18-
UUID contentId
19-
) {}
13+
@Getter
14+
public class SessionMapping {
15+
16+
private final String webSocketSessionId;
17+
private final String subscriptionId;
18+
private final UUID watchingSessionId;
19+
private final UUID userId;
20+
private final UUID contentId;
21+
22+
private volatile Instant lastActiveTime;
23+
24+
/**
25+
* @param webSocketSessionId 웹소켓 세션 아이디
26+
* @param subscriptionId 구독 아이디
27+
* @param watchingSessionId 시청 세션 UUID
28+
* @param userId 사용자 UUID
29+
* @param contentId 콘텐츠 UUID
30+
*/
31+
public SessionMapping(
32+
String webSocketSessionId,
33+
String subscriptionId,
34+
UUID watchingSessionId,
35+
UUID userId,
36+
UUID contentId
37+
) {
38+
this.webSocketSessionId = webSocketSessionId;
39+
this.subscriptionId = subscriptionId;
40+
this.watchingSessionId = watchingSessionId;
41+
this.userId = userId;
42+
this.contentId = contentId;
43+
this.lastActiveTime = Instant.now();
44+
}
45+
46+
public void updateLastActiveTime() {
47+
this.lastActiveTime = Instant.now();
48+
}
49+
}
Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,23 @@
11
package com.mopl.moplwebsocketsse.domain.watch.registry;
22

33
import java.util.ArrayList;
4+
import java.util.Collection;
45
import java.util.Collections;
56
import java.util.List;
67
import java.util.Map;
78
import java.util.Set;
89
import java.util.UUID;
910
import java.util.concurrent.ConcurrentHashMap;
1011

11-
import org.springframework.scheduling.annotation.Scheduled;
1212
import org.springframework.stereotype.Component;
13+
import org.springframework.web.socket.WebSocketSession;
1314

14-
import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;
15-
16-
import lombok.RequiredArgsConstructor;
1715
import lombok.extern.slf4j.Slf4j;
1816

1917
@Slf4j
2018
@Component
21-
@RequiredArgsConstructor
2219
public class WatchingSessionRegistry {
2320

24-
private final WatchingSessionRepository repository;
25-
2621
private final Map<String, SessionMapping> subscriptionMappings = new ConcurrentHashMap<>();
2722
private final Map<String, Set<String>> wsToSubscriptions = new ConcurrentHashMap<>();
2823

@@ -32,6 +27,7 @@ private String makeKey(String wsSessionId, String subscriptionId) {
3227

3328
public void register(String wsSessionId, String subscriptionId,
3429
UUID watchingSessionId, UUID userId, UUID contentId) {
30+
3531
SessionMapping mapping = new SessionMapping(
3632
wsSessionId, subscriptionId, watchingSessionId, userId, contentId
3733
);
@@ -41,21 +37,39 @@ public void register(String wsSessionId, String subscriptionId,
4137
wsToSubscriptions.computeIfAbsent(wsSessionId, k -> ConcurrentHashMap.newKeySet())
4238
.add(key);
4339

44-
log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, key={}, watchingId={}",
45-
wsSessionId, subscriptionId, key, watchingSessionId);
40+
log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, watchingId={}",
41+
wsSessionId, subscriptionId, watchingSessionId);
42+
}
43+
44+
public void updateLastActiveTimeByWsSessionId(String wsSessionId) {
45+
Set<String> keys = wsToSubscriptions.get(wsSessionId);
46+
if (keys == null || keys.isEmpty()) {
47+
return;
48+
}
49+
50+
for (String key : keys) {
51+
SessionMapping mapping = subscriptionMappings.get(key);
52+
if (mapping != null) {
53+
mapping.updateLastActiveTime();
54+
}
55+
}
56+
}
57+
58+
public Collection<SessionMapping> getAllMappings() {
59+
return Collections.unmodifiableCollection(subscriptionMappings.values());
4660
}
4761

4862
public SessionMapping removeBySubscriptionId(String wsSessionId, String subscriptionId) {
4963
String key = makeKey(wsSessionId, subscriptionId);
5064
SessionMapping mapping = subscriptionMappings.remove(key);
5165

5266
if (mapping != null) {
53-
Set<String> subs = wsToSubscriptions.get(mapping.webSocketSessionId());
67+
Set<String> subs = wsToSubscriptions.get(mapping.getWebSocketSessionId());
5468
if (subs != null) {
5569
subs.remove(key);
5670
}
57-
log.debug("[WatchingSessionRegistry] Removed by key. key={}, watchingId={}",
58-
key, mapping.watchingSessionId());
71+
log.debug("[WatchingSessionRegistry] Removed. key={}, watchingId={}",
72+
key, mapping.getWatchingSessionId());
5973
}
6074

6175
return mapping;
@@ -76,7 +90,7 @@ public List<SessionMapping> removeAllByWsSessionId(String wsSessionId) {
7690
}
7791
}
7892

79-
log.debug("[WatchingSessionRegistry] Removed all by wsId. wsId={}, count={}", wsSessionId, mappings.size());
93+
log.debug("[WatchingSessionRegistry] Removed all. wsId={}, count={}", wsSessionId, mappings.size());
8094
return mappings;
8195
}
8296

@@ -102,40 +116,7 @@ public List<SessionMapping> getAllByWsSessionId(String wsSessionId) {
102116
return mappings;
103117
}
104118

105-
@Scheduled(fixedRate = 300000)
106-
public void cleanupRegistry() {
107-
if (subscriptionMappings.isEmpty())
108-
return;
109-
110-
log.debug("[WatchingSessionRegistry] Cleanup started. size={}", subscriptionMappings.size());
111-
112-
List<Map.Entry<String, SessionMapping>> entries =
113-
new ArrayList<>(subscriptionMappings.entrySet());
114-
115-
List<UUID> sessionIds = entries.stream()
116-
.map(e -> e.getValue().watchingSessionId())
117-
.toList();
118-
119-
List<Boolean> existsList = repository.existsSessions(sessionIds);
120-
121-
int removedCount = 0;
122-
for (int i = 0; i < entries.size(); i++) {
123-
boolean exists = (i < existsList.size()) && Boolean.TRUE.equals(existsList.get(i));
124-
if (!exists) {
125-
String key = entries.get(i).getKey();
126-
SessionMapping mapping = subscriptionMappings.remove(key);
127-
if (mapping != null) {
128-
Set<String> subs = wsToSubscriptions.get(mapping.webSocketSessionId());
129-
if (subs != null) {
130-
subs.remove(key);
131-
}
132-
removedCount++;
133-
}
134-
}
135-
}
136-
137-
if (removedCount > 0) {
138-
log.info("[WatchingSessionRegistry] Cleanup completed. removed={}", removedCount);
139-
}
119+
public int size() {
120+
return subscriptionMappings.size();
140121
}
141122
}

0 commit comments

Comments
 (0)