diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/interceptor/WatchingSessionHeartbeatInterceptor.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/interceptor/WatchingSessionHeartbeatInterceptor.java index a60d7448..38f97f8d 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/interceptor/WatchingSessionHeartbeatInterceptor.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/interceptor/WatchingSessionHeartbeatInterceptor.java @@ -1,7 +1,5 @@ package com.mopl.moplwebsocketsse.domain.watch.interceptor; -import java.util.List; - import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; @@ -9,9 +7,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.stereotype.Component; -import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping; import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry; -import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,41 +18,25 @@ public class WatchingSessionHeartbeatInterceptor implements ChannelInterceptor { private final WatchingSessionRegistry registry; - private final WatchingSessionRepository repository; @Override public void postSend(Message message, MessageChannel channel, boolean sent) { - if (!sent) + if (!sent) { return; + } SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); - if (type != SimpMessageType.HEARTBEAT) + if (type != SimpMessageType.HEARTBEAT) { return; + } String wsSessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders()); - if (wsSessionId == null) - return; - - List mappings = registry.getAllByWsSessionId(wsSessionId); - - if (mappings.isEmpty()) { - log.warn("[WatchingSessionHeartbeatInterceptor] No mappings found! wsId={}", wsSessionId); + if (wsSessionId == null) { return; } - for (SessionMapping mapping : mappings) { - try { - boolean alive = repository.refreshSessionTtl(mapping.watchingSessionId(), mapping.userId()); - - if (!alive) { - log.warn("[WatchingSessionHeartbeatInterceptor] Session not alive, removing. wsId={}, subId={}, watchingId={}", - wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId()); - registry.removeBySubscriptionId(mapping.webSocketSessionId(), mapping.subscriptionId()); - } - } catch (Exception e) { - log.warn("[WatchingSessionHeartbeatInterceptor] Redis 세션 갱신 실패. wsId={}, subId={}", - wsSessionId, mapping.subscriptionId(), e); - } - } + registry.updateLastActiveTimeByWsSessionId(wsSessionId); + + log.trace("[Heartbeat] Updated lastActiveTime. wsId={}", wsSessionId); } } \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/listener/WatchingSessionEventListener.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/listener/WatchingSessionEventListener.java index e93dc8e1..93c64a9e 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/listener/WatchingSessionEventListener.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/listener/WatchingSessionEventListener.java @@ -18,6 +18,7 @@ import com.mopl.moplwebsocketsse.domain.watch.event.WatchingSessionStartedEvent; import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping; import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry; +import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionHolder; import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository; import com.mopl.moplwebsocketsse.domain.watch.service.WatchingSessionService; @@ -34,6 +35,7 @@ public class WatchingSessionEventListener { private final WatchingSessionService wsService; private final WatchingSessionEventPublisher wsPublisher; private final SimpMessagingTemplate messagingTemplate; + private final WebSocketSessionHolder webSocketSessionHolder; @EventListener public void handleSessionSubscribe(SessionSubscribeEvent event) { @@ -70,7 +72,7 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) { wsRegistry.register(wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId); log.debug( - "[WatchingSessionEventListener] SUBSCRIBE After registry. wsId={}, subId={}, watchingId={}, userId={}, contentId={}", + "[WatchingSessionEventListener] SUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}", wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId ); @@ -93,8 +95,8 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) { wsPublisher.publish(startedEvent); } } catch (Exception e) { - log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}", watchingSession.getId(), - e); + log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}", + watchingSession.getId(), e); } } @@ -117,20 +119,20 @@ public void handleSessionUnsubscribe(SessionUnsubscribeEvent event) { try { WatchingSessionChange message = wsService.createLeaveMessage( - mapping.watchingSessionId(), - mapping.contentId() + mapping.getWatchingSessionId(), + mapping.getContentId() ); - broadcastToWatchers(mapping.contentId(), message); + broadcastToWatchers(mapping.getContentId(), message); } catch (Exception e) { log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}", - mapping.watchingSessionId(), e); + mapping.getWatchingSessionId(), e); } - wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId()); + wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId()); log.debug( "[WatchingSessionEventListener] UNSUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}", - wsSessionId, subscriptionId, mapping.watchingSessionId(), mapping.userId(), mapping.contentId() + wsSessionId, subscriptionId, mapping.getWatchingSessionId(), mapping.getUserId(), mapping.getContentId() ); } @@ -144,6 +146,7 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) { } List mappings = wsRegistry.removeAllByWsSessionId(wsSessionId); + webSocketSessionHolder.remove(wsSessionId); if (mappings.isEmpty()) { log.debug("[WatchingSessionEventListener] No mappings found on DISCONNECT. wsId={}", wsSessionId); @@ -153,21 +156,21 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) { for (SessionMapping mapping : mappings) { try { WatchingSessionChange message = wsService.createLeaveMessage( - mapping.watchingSessionId(), - mapping.contentId() + mapping.getWatchingSessionId(), + mapping.getContentId() ); - broadcastToWatchers(mapping.contentId(), message); + broadcastToWatchers(mapping.getContentId(), message); } catch (Exception e) { log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}", - mapping.watchingSessionId(), e); + mapping.getWatchingSessionId(), e); } - wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId()); + wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId()); log.debug( "[WatchingSessionEventListener] DISCONNECT. wsId={}, subId={}, watchingId={}, userId={}, contentId={}", - wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId(), - mapping.userId(), mapping.contentId() + wsSessionId, mapping.getSubscriptionId(), mapping.getWatchingSessionId(), + mapping.getUserId(), mapping.getContentId() ); } } diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/SessionMapping.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/SessionMapping.java index 7c34cd7e..36d6ce4c 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/SessionMapping.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/SessionMapping.java @@ -1,19 +1,49 @@ package com.mopl.moplwebsocketsse.domain.watch.registry; +import java.time.Instant; import java.util.UUID; +import org.springframework.web.socket.WebSocketSession; + +import lombok.Getter; + /** * 세션 매핑 정보 - * - * @param webSocketSessionId WebSocket STOMP Session ID - * @param watchingSessionId WatchingSession ID - * @param userId 사용자 ID - * @param contentId 콘텐츠 ID */ -public record SessionMapping( - String webSocketSessionId, - String subscriptionId, - UUID watchingSessionId, - UUID userId, - UUID contentId -) {} \ No newline at end of file +@Getter +public class SessionMapping { + + private final String webSocketSessionId; + private final String subscriptionId; + private final UUID watchingSessionId; + private final UUID userId; + private final UUID contentId; + + private volatile Instant lastActiveTime; + + /** + * @param webSocketSessionId 웹소켓 세션 아이디 + * @param subscriptionId 구독 아이디 + * @param watchingSessionId 시청 세션 UUID + * @param userId 사용자 UUID + * @param contentId 콘텐츠 UUID + */ + public SessionMapping( + String webSocketSessionId, + String subscriptionId, + UUID watchingSessionId, + UUID userId, + UUID contentId + ) { + this.webSocketSessionId = webSocketSessionId; + this.subscriptionId = subscriptionId; + this.watchingSessionId = watchingSessionId; + this.userId = userId; + this.contentId = contentId; + this.lastActiveTime = Instant.now(); + } + + public void updateLastActiveTime() { + this.lastActiveTime = Instant.now(); + } +} \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WatchingSessionRegistry.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WatchingSessionRegistry.java index 489b06f9..e4606514 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WatchingSessionRegistry.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WatchingSessionRegistry.java @@ -1,6 +1,7 @@ package com.mopl.moplwebsocketsse.domain.watch.registry; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -8,21 +9,15 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; -import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository; - -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @Slf4j @Component -@RequiredArgsConstructor public class WatchingSessionRegistry { - private final WatchingSessionRepository repository; - private final Map subscriptionMappings = new ConcurrentHashMap<>(); private final Map> wsToSubscriptions = new ConcurrentHashMap<>(); @@ -32,6 +27,7 @@ private String makeKey(String wsSessionId, String subscriptionId) { public void register(String wsSessionId, String subscriptionId, UUID watchingSessionId, UUID userId, UUID contentId) { + SessionMapping mapping = new SessionMapping( wsSessionId, subscriptionId, watchingSessionId, userId, contentId ); @@ -41,8 +37,26 @@ public void register(String wsSessionId, String subscriptionId, wsToSubscriptions.computeIfAbsent(wsSessionId, k -> ConcurrentHashMap.newKeySet()) .add(key); - log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, key={}, watchingId={}", - wsSessionId, subscriptionId, key, watchingSessionId); + log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, watchingId={}", + wsSessionId, subscriptionId, watchingSessionId); + } + + public void updateLastActiveTimeByWsSessionId(String wsSessionId) { + Set keys = wsToSubscriptions.get(wsSessionId); + if (keys == null || keys.isEmpty()) { + return; + } + + for (String key : keys) { + SessionMapping mapping = subscriptionMappings.get(key); + if (mapping != null) { + mapping.updateLastActiveTime(); + } + } + } + + public Collection getAllMappings() { + return Collections.unmodifiableCollection(subscriptionMappings.values()); } public SessionMapping removeBySubscriptionId(String wsSessionId, String subscriptionId) { @@ -50,12 +64,12 @@ public SessionMapping removeBySubscriptionId(String wsSessionId, String subscrip SessionMapping mapping = subscriptionMappings.remove(key); if (mapping != null) { - Set subs = wsToSubscriptions.get(mapping.webSocketSessionId()); + Set subs = wsToSubscriptions.get(mapping.getWebSocketSessionId()); if (subs != null) { subs.remove(key); } - log.debug("[WatchingSessionRegistry] Removed by key. key={}, watchingId={}", - key, mapping.watchingSessionId()); + log.debug("[WatchingSessionRegistry] Removed. key={}, watchingId={}", + key, mapping.getWatchingSessionId()); } return mapping; @@ -76,7 +90,7 @@ public List removeAllByWsSessionId(String wsSessionId) { } } - log.debug("[WatchingSessionRegistry] Removed all by wsId. wsId={}, count={}", wsSessionId, mappings.size()); + log.debug("[WatchingSessionRegistry] Removed all. wsId={}, count={}", wsSessionId, mappings.size()); return mappings; } @@ -102,40 +116,7 @@ public List getAllByWsSessionId(String wsSessionId) { return mappings; } - @Scheduled(fixedRate = 300000) - public void cleanupRegistry() { - if (subscriptionMappings.isEmpty()) - return; - - log.debug("[WatchingSessionRegistry] Cleanup started. size={}", subscriptionMappings.size()); - - List> entries = - new ArrayList<>(subscriptionMappings.entrySet()); - - List sessionIds = entries.stream() - .map(e -> e.getValue().watchingSessionId()) - .toList(); - - List existsList = repository.existsSessions(sessionIds); - - int removedCount = 0; - for (int i = 0; i < entries.size(); i++) { - boolean exists = (i < existsList.size()) && Boolean.TRUE.equals(existsList.get(i)); - if (!exists) { - String key = entries.get(i).getKey(); - SessionMapping mapping = subscriptionMappings.remove(key); - if (mapping != null) { - Set subs = wsToSubscriptions.get(mapping.webSocketSessionId()); - if (subs != null) { - subs.remove(key); - } - removedCount++; - } - } - } - - if (removedCount > 0) { - log.info("[WatchingSessionRegistry] Cleanup completed. removed={}", removedCount); - } + public int size() { + return subscriptionMappings.size(); } } \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionDecorator.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionDecorator.java new file mode 100644 index 00000000..5e402e92 --- /dev/null +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionDecorator.java @@ -0,0 +1,34 @@ +package com.mopl.moplwebsocketsse.domain.watch.registry; + +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class WebSocketSessionDecorator extends WebSocketHandlerDecorator { + + private final WebSocketSessionHolder holder; + + public WebSocketSessionDecorator(WebSocketHandler delegate, WebSocketSessionHolder holder) { + super(delegate); + this.holder = holder; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + holder.add(session.getId(), session); + log.debug("[WebSocketSessionDecorator] Connection established. sessionId={}", session.getId()); + super.afterConnectionEstablished(session); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + holder.remove(session.getId()); + log.debug("[WebSocketSessionDecorator] Connection closed. sessionId={}, status={}", + session.getId(), closeStatus); + super.afterConnectionClosed(session, closeStatus); + } +} \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionHolder.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionHolder.java new file mode 100644 index 00000000..93d3c9ec --- /dev/null +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/registry/WebSocketSessionHolder.java @@ -0,0 +1,36 @@ +package com.mopl.moplwebsocketsse.domain.watch.registry; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class WebSocketSessionHolder { + + private final Map sessions = new ConcurrentHashMap<>(); + + public void add(String sessionId, WebSocketSession session) { + sessions.put(sessionId, session); + log.debug("[WebSocketSessionHolder] Added. sessionId={}", sessionId); + } + + public WebSocketSession get(String sessionId) { + return sessions.get(sessionId); + } + + public void remove(String sessionId) { + WebSocketSession removed = sessions.remove(sessionId); + if (removed != null) { + log.debug("[WebSocketSessionHolder] Removed. sessionId={}", sessionId); + } + } + + public int size() { + return sessions.size(); + } +} \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/repository/WatchingSessionRepository.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/repository/WatchingSessionRepository.java index 08138072..54f39ff6 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/repository/WatchingSessionRepository.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/repository/WatchingSessionRepository.java @@ -11,7 +11,9 @@ import java.util.concurrent.TimeUnit; import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.RedisOperations; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.SessionCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.ZSetOperations; @@ -354,23 +356,7 @@ public long countWatchersByContentId(UUID contentId) { return count != null ? count : 0L; } - public boolean refreshSessionTtl(UUID sessionId, UUID watcherId) { - Boolean sessionAlive = stringRedisTemplate.expire( - SESSION_PREFIX + sessionId, - SESSION_TTL_SECONDS, - TimeUnit.SECONDS - ); - - stringRedisTemplate.expire( - USER_WATCHING_PREFIX + watcherId, - SESSION_TTL_SECONDS, - TimeUnit.SECONDS - ); - - return Boolean.TRUE.equals(sessionAlive); - } - - public List existsSessions(List sessionIds) { + public List batchRefreshSessionTtl(List sessionIds, List watcherIds) { if (sessionIds == null || sessionIds.isEmpty()) { return Collections.emptyList(); } @@ -379,18 +365,88 @@ public List existsSessions(List sessionIds) { new SessionCallback() { @Override public Object execute(RedisOperations operations) { - for (UUID sessionId : sessionIds) { - operations.hasKey(SESSION_PREFIX + sessionId); + for (int i = 0; i < sessionIds.size(); i++) { + UUID sessionId = sessionIds.get(i); + UUID watcherId = watcherIds.get(i); + + operations.expire(SESSION_PREFIX + sessionId, SESSION_TTL_SECONDS, TimeUnit.SECONDS); + operations.expire(USER_WATCHING_PREFIX + watcherId, SESSION_TTL_SECONDS, TimeUnit.SECONDS); } return null; } } ); - List existsList = new ArrayList<>(results.size()); - for (Object r : results) { - existsList.add(Boolean.TRUE.equals(r)); + List sessionResults = new ArrayList<>(); + for (int i = 0; i < results.size(); i += 2) { + sessionResults.add(Boolean.TRUE.equals(results.get(i))); } - return existsList; + + return sessionResults; + } + + public int cleanupGhostEntries() { + int totalRemoved = 0; + + ScanOptions scanKeysOptions = ScanOptions.scanOptions() + .match(CONTENT_WATCHERS_PREFIX + "*") + .count(100) + .build(); + + try (Cursor keysCursor = stringRedisTemplate.scan(scanKeysOptions)) { + while (keysCursor.hasNext()) { + String contentKey = keysCursor.next(); + + ScanOptions memberScanOptions = ScanOptions.scanOptions() + .count(100) + .build(); + + try (Cursor> membersCursor = + stringRedisTemplate.opsForZSet().scan(contentKey, memberScanOptions)) { + + List batch = new ArrayList<>(); + + while (membersCursor.hasNext()) { + ZSetOperations.TypedTuple tuple = membersCursor.next(); + if (tuple.getValue() != null) { + batch.add(tuple.getValue()); + } + + if (batch.size() >= 100 || !membersCursor.hasNext()) { + if (batch.isEmpty()) continue; + + List currentBatch = new ArrayList<>(batch); + batch.clear(); + + List existsResults = stringRedisTemplate.executePipelined( + new SessionCallback() { + @Override + public Object execute(RedisOperations operations) { + for (String sessionId : currentBatch) { + operations.hasKey(SESSION_PREFIX + sessionId); + } + return null; + } + } + ); + + List ghosts = new ArrayList<>(); + for (int i = 0; i < currentBatch.size(); i++) { + if (!Boolean.TRUE.equals(existsResults.get(i))) { + ghosts.add(currentBatch.get(i)); + } + } + + if (!ghosts.isEmpty()) { + stringRedisTemplate.opsForZSet().remove(contentKey, ghosts.toArray()); + totalRemoved += ghosts.size(); + } + } + } + } + } + } + + return totalRemoved; } } \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionBatchRefresher.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionBatchRefresher.java new file mode 100644 index 00000000..a7e18c36 --- /dev/null +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionBatchRefresher.java @@ -0,0 +1,118 @@ +package com.mopl.moplwebsocketsse.domain.watch.scheduler; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; + +import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping; +import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry; +import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionHolder; +import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository; +import com.mopl.moplwebsocketsse.security.registry.JwtRegistry; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class WatchingSessionBatchRefresher { + + private static final CloseStatus SESSION_EXPIRED = new CloseStatus(4001, "Session expired"); + + private final WatchingSessionRegistry registry; + private final WatchingSessionRepository repository; + private final JwtRegistry jwtRegistry; + private final WebSocketSessionHolder webSocketSessionHolder; + + @Scheduled(fixedRate = 30_000) + public void refreshActiveSessions() { + Collection mappings = registry.getAllMappings(); + + if (mappings.isEmpty()) { + return; + } + + log.debug("[BatchRefresher] Starting batch refresh. sessionCount={}", mappings.size()); + + List validMappings = new ArrayList<>(); + List invalidMappings = new ArrayList<>(); + + for (SessionMapping mapping : mappings) { + try { + boolean hasJwt = jwtRegistry.hasActiveJwtInformationByUserId(mapping.getUserId()); + if (hasJwt) { + validMappings.add(mapping); + } else { + invalidMappings.add(mapping); + } + } catch (Exception e) { + log.warn("[BatchRefresher] JWT check failed, treating as invalid. userId={}", + mapping.getUserId(), e); + invalidMappings.add(mapping); + } + } + + if (!validMappings.isEmpty()) { + List sessionIds = validMappings.stream() + .map(SessionMapping::getWatchingSessionId) + .toList(); + List watcherIds = validMappings.stream() + .map(SessionMapping::getUserId) + .toList(); + + try { + repository.batchRefreshSessionTtl(sessionIds, watcherIds); + log.debug("[BatchRefresher] Refreshed TTL for {} sessions", validMappings.size()); + } catch (Exception e) { + log.error("[BatchRefresher] Failed to refresh TTL", e); + } + } + + if (!invalidMappings.isEmpty()) { + log.info("[BatchRefresher] Cleaning up {} invalid sessions", invalidMappings.size()); + + for (SessionMapping mapping : invalidMappings) { + closeWebSocketSession(mapping); + + registry.removeBySubscriptionId( + mapping.getWebSocketSessionId(), + mapping.getSubscriptionId() + ); + + log.info("[BatchRefresher] Removed invalid session. wsId={}, userId={}, watchingId={}", + mapping.getWebSocketSessionId(), mapping.getUserId(), mapping.getWatchingSessionId()); + } + } + + log.debug("[BatchRefresher] Batch refresh completed. valid={}, invalid={}", + validMappings.size(), invalidMappings.size()); + } + + private void closeWebSocketSession(SessionMapping mapping) { + WebSocketSession session = webSocketSessionHolder.get(mapping.getWebSocketSessionId()); + + if (session == null) { + return; + } + + if (!session.isOpen()) { + return; + } + + try { + session.close(SESSION_EXPIRED); + log.debug("[BatchRefresher] Closed WebSocket session. wsId={}", mapping.getWebSocketSessionId()); + } catch (IOException e) { + log.warn("[BatchRefresher] Failed to close WebSocket session. wsId={}", + mapping.getWebSocketSessionId(), e); + } + } +} \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionGhostCleaner.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionGhostCleaner.java new file mode 100644 index 00000000..5d3548ab --- /dev/null +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/domain/watch/scheduler/WatchingSessionGhostCleaner.java @@ -0,0 +1,31 @@ +package com.mopl.moplwebsocketsse.domain.watch.scheduler; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository; + +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class WatchingSessionGhostCleaner { + + private final WatchingSessionRepository repository; + + @PostConstruct + @Scheduled(cron = "0 0 2 * * *") + public void cleanup() { + try { + int removed = repository.cleanupGhostEntries(); + if (removed > 0) { + log.info("[GhostCleaner] Removed {} ghost entries", removed); + } + } catch (Exception e) { + log.error("[GhostCleaner] Failed to cleanup ghost entries", e); + } + } +} \ No newline at end of file diff --git a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/global/config/WebSocketConfig.java b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/global/config/WebSocketConfig.java index 8203e273..6702b711 100644 --- a/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/global/config/WebSocketConfig.java +++ b/mopl-websocket-sse/src/main/java/com/mopl/moplwebsocketsse/global/config/WebSocketConfig.java @@ -7,8 +7,11 @@ import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; import com.mopl.moplwebsocketsse.domain.watch.interceptor.WatchingSessionHeartbeatInterceptor; +import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionDecorator; +import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionHolder; import com.mopl.moplwebsocketsse.security.jwt.JwtChannelInterceptor; import lombok.RequiredArgsConstructor; @@ -21,6 +24,7 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { private final JwtChannelInterceptor jwtChannelInterceptor; private final ThreadPoolTaskScheduler wsHeartbeatScheduler; private final WatchingSessionHeartbeatInterceptor watchingSessionHeartbeatInterceptor; + private final WebSocketSessionHolder webSocketSessionHolder; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { @@ -33,16 +37,23 @@ public void registerStompEndpoints(StompEndpointRegistry registry) { @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/sub") - .setHeartbeatValue(new long[]{10000L,10000L}) + .setHeartbeatValue(new long[] {10000L, 10000L}) .setTaskScheduler(wsHeartbeatScheduler); registry.setApplicationDestinationPrefixes("/pub"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { - registration.interceptors( + registration.interceptors( jwtChannelInterceptor, watchingSessionHeartbeatInterceptor ); } + + @Override + public void configureWebSocketTransport(WebSocketTransportRegistration registration) { + registration.addDecoratorFactory( + handler -> new WebSocketSessionDecorator(handler, webSocketSessionHolder) + ); + } } \ No newline at end of file