Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.mopl.moplwebsocketsse.domain.watch.interceptor;

import java.util.List;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping;
import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry;
import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,41 +18,25 @@
public class WatchingSessionHeartbeatInterceptor implements ChannelInterceptor {

private final WatchingSessionRegistry registry;
private final WatchingSessionRepository repository;

@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
if (!sent)
if (!sent) {
return;
}

SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
if (type != SimpMessageType.HEARTBEAT)
if (type != SimpMessageType.HEARTBEAT) {
return;
}

String wsSessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
if (wsSessionId == null)
return;

List<SessionMapping> mappings = registry.getAllByWsSessionId(wsSessionId);

if (mappings.isEmpty()) {
log.warn("[WatchingSessionHeartbeatInterceptor] No mappings found! wsId={}", wsSessionId);
if (wsSessionId == null) {
return;
}

for (SessionMapping mapping : mappings) {
try {
boolean alive = repository.refreshSessionTtl(mapping.watchingSessionId(), mapping.userId());

if (!alive) {
log.warn("[WatchingSessionHeartbeatInterceptor] Session not alive, removing. wsId={}, subId={}, watchingId={}",
wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId());
registry.removeBySubscriptionId(mapping.webSocketSessionId(), mapping.subscriptionId());
}
} catch (Exception e) {
log.warn("[WatchingSessionHeartbeatInterceptor] Redis 세션 갱신 실패. wsId={}, subId={}",
wsSessionId, mapping.subscriptionId(), e);
}
}
registry.updateLastActiveTimeByWsSessionId(wsSessionId);

log.trace("[Heartbeat] Updated lastActiveTime. wsId={}", wsSessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.mopl.moplwebsocketsse.domain.watch.event.WatchingSessionStartedEvent;
import com.mopl.moplwebsocketsse.domain.watch.registry.SessionMapping;
import com.mopl.moplwebsocketsse.domain.watch.registry.WatchingSessionRegistry;
import com.mopl.moplwebsocketsse.domain.watch.registry.WebSocketSessionHolder;
import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;
import com.mopl.moplwebsocketsse.domain.watch.service.WatchingSessionService;

Expand All @@ -34,6 +35,7 @@ public class WatchingSessionEventListener {
private final WatchingSessionService wsService;
private final WatchingSessionEventPublisher wsPublisher;
private final SimpMessagingTemplate messagingTemplate;
private final WebSocketSessionHolder webSocketSessionHolder;

@EventListener
public void handleSessionSubscribe(SessionSubscribeEvent event) {
Expand Down Expand Up @@ -70,7 +72,7 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) {
wsRegistry.register(wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId);

log.debug(
"[WatchingSessionEventListener] SUBSCRIBE After registry. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
"[WatchingSessionEventListener] SUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
wsSessionId, subscriptionId, watchingSession.getId(), userId, contentId
);

Expand All @@ -93,8 +95,8 @@ public void handleSessionSubscribe(SessionSubscribeEvent event) {
wsPublisher.publish(startedEvent);
}
} catch (Exception e) {
log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}", watchingSession.getId(),
e);
log.error("[WatchingSessionEventListener] Failed to broadcast JOIN. sessionId={}",
watchingSession.getId(), e);
}
}

Expand All @@ -117,20 +119,20 @@ public void handleSessionUnsubscribe(SessionUnsubscribeEvent event) {

try {
WatchingSessionChange message = wsService.createLeaveMessage(
mapping.watchingSessionId(),
mapping.contentId()
mapping.getWatchingSessionId(),
mapping.getContentId()
);
broadcastToWatchers(mapping.contentId(), message);
broadcastToWatchers(mapping.getContentId(), message);
} catch (Exception e) {
log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}",
mapping.watchingSessionId(), e);
mapping.getWatchingSessionId(), e);
}

wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId());
wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId());

log.debug(
"[WatchingSessionEventListener] UNSUBSCRIBE. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
wsSessionId, subscriptionId, mapping.watchingSessionId(), mapping.userId(), mapping.contentId()
wsSessionId, subscriptionId, mapping.getWatchingSessionId(), mapping.getUserId(), mapping.getContentId()
);
}

Expand All @@ -144,6 +146,7 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) {
}

List<SessionMapping> mappings = wsRegistry.removeAllByWsSessionId(wsSessionId);
webSocketSessionHolder.remove(wsSessionId);

if (mappings.isEmpty()) {
log.debug("[WatchingSessionEventListener] No mappings found on DISCONNECT. wsId={}", wsSessionId);
Expand All @@ -153,21 +156,21 @@ public void handleSessionDisconnect(SessionDisconnectEvent event) {
for (SessionMapping mapping : mappings) {
try {
WatchingSessionChange message = wsService.createLeaveMessage(
mapping.watchingSessionId(),
mapping.contentId()
mapping.getWatchingSessionId(),
mapping.getContentId()
);
broadcastToWatchers(mapping.contentId(), message);
broadcastToWatchers(mapping.getContentId(), message);
} catch (Exception e) {
log.error("[WatchingSessionEventListener] Failed to broadcast LEAVE. sessionId={}",
mapping.watchingSessionId(), e);
mapping.getWatchingSessionId(), e);
}

wsRepository.delete(mapping.watchingSessionId(), mapping.contentId(), mapping.userId());
wsRepository.delete(mapping.getWatchingSessionId(), mapping.getContentId(), mapping.getUserId());

log.debug(
"[WatchingSessionEventListener] DISCONNECT. wsId={}, subId={}, watchingId={}, userId={}, contentId={}",
wsSessionId, mapping.subscriptionId(), mapping.watchingSessionId(),
mapping.userId(), mapping.contentId()
wsSessionId, mapping.getSubscriptionId(), mapping.getWatchingSessionId(),
mapping.getUserId(), mapping.getContentId()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
package com.mopl.moplwebsocketsse.domain.watch.registry;

import java.time.Instant;
import java.util.UUID;

import org.springframework.web.socket.WebSocketSession;

import lombok.Getter;

/**
* 세션 매핑 정보
*
* @param webSocketSessionId WebSocket STOMP Session ID
* @param watchingSessionId WatchingSession ID
* @param userId 사용자 ID
* @param contentId 콘텐츠 ID
*/
public record SessionMapping(
String webSocketSessionId,
String subscriptionId,
UUID watchingSessionId,
UUID userId,
UUID contentId
) {}
@Getter
public class SessionMapping {

private final String webSocketSessionId;
private final String subscriptionId;
private final UUID watchingSessionId;
private final UUID userId;
private final UUID contentId;

private volatile Instant lastActiveTime;

/**
* @param webSocketSessionId 웹소켓 세션 아이디
* @param subscriptionId 구독 아이디
* @param watchingSessionId 시청 세션 UUID
* @param userId 사용자 UUID
* @param contentId 콘텐츠 UUID
*/
public SessionMapping(
String webSocketSessionId,
String subscriptionId,
UUID watchingSessionId,
UUID userId,
UUID contentId
) {
this.webSocketSessionId = webSocketSessionId;
this.subscriptionId = subscriptionId;
this.watchingSessionId = watchingSessionId;
this.userId = userId;
this.contentId = contentId;
this.lastActiveTime = Instant.now();
}

public void updateLastActiveTime() {
this.lastActiveTime = Instant.now();
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
package com.mopl.moplwebsocketsse.domain.watch.registry;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;

import com.mopl.moplwebsocketsse.domain.watch.repository.WatchingSessionRepository;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RequiredArgsConstructor
public class WatchingSessionRegistry {

private final WatchingSessionRepository repository;

private final Map<String, SessionMapping> subscriptionMappings = new ConcurrentHashMap<>();
private final Map<String, Set<String>> wsToSubscriptions = new ConcurrentHashMap<>();

Expand All @@ -32,6 +27,7 @@ private String makeKey(String wsSessionId, String subscriptionId) {

public void register(String wsSessionId, String subscriptionId,
UUID watchingSessionId, UUID userId, UUID contentId) {

SessionMapping mapping = new SessionMapping(
wsSessionId, subscriptionId, watchingSessionId, userId, contentId
);
Expand All @@ -41,21 +37,39 @@ public void register(String wsSessionId, String subscriptionId,
wsToSubscriptions.computeIfAbsent(wsSessionId, k -> ConcurrentHashMap.newKeySet())
.add(key);

log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, key={}, watchingId={}",
wsSessionId, subscriptionId, key, watchingSessionId);
log.debug("[WatchingSessionRegistry] Registered. wsId={}, subId={}, watchingId={}",
wsSessionId, subscriptionId, watchingSessionId);
}

public void updateLastActiveTimeByWsSessionId(String wsSessionId) {
Set<String> keys = wsToSubscriptions.get(wsSessionId);
if (keys == null || keys.isEmpty()) {
return;
}

for (String key : keys) {
SessionMapping mapping = subscriptionMappings.get(key);
if (mapping != null) {
mapping.updateLastActiveTime();
}
}
}

public Collection<SessionMapping> getAllMappings() {
return Collections.unmodifiableCollection(subscriptionMappings.values());
}

public SessionMapping removeBySubscriptionId(String wsSessionId, String subscriptionId) {
String key = makeKey(wsSessionId, subscriptionId);
SessionMapping mapping = subscriptionMappings.remove(key);

if (mapping != null) {
Set<String> subs = wsToSubscriptions.get(mapping.webSocketSessionId());
Set<String> subs = wsToSubscriptions.get(mapping.getWebSocketSessionId());
if (subs != null) {
subs.remove(key);
}
log.debug("[WatchingSessionRegistry] Removed by key. key={}, watchingId={}",
key, mapping.watchingSessionId());
log.debug("[WatchingSessionRegistry] Removed. key={}, watchingId={}",
key, mapping.getWatchingSessionId());
}

return mapping;
Expand All @@ -76,7 +90,7 @@ public List<SessionMapping> removeAllByWsSessionId(String wsSessionId) {
}
}

log.debug("[WatchingSessionRegistry] Removed all by wsId. wsId={}, count={}", wsSessionId, mappings.size());
log.debug("[WatchingSessionRegistry] Removed all. wsId={}, count={}", wsSessionId, mappings.size());
return mappings;
}

Expand All @@ -102,40 +116,7 @@ public List<SessionMapping> getAllByWsSessionId(String wsSessionId) {
return mappings;
}

@Scheduled(fixedRate = 300000)
public void cleanupRegistry() {
if (subscriptionMappings.isEmpty())
return;

log.debug("[WatchingSessionRegistry] Cleanup started. size={}", subscriptionMappings.size());

List<Map.Entry<String, SessionMapping>> entries =
new ArrayList<>(subscriptionMappings.entrySet());

List<UUID> sessionIds = entries.stream()
.map(e -> e.getValue().watchingSessionId())
.toList();

List<Boolean> existsList = repository.existsSessions(sessionIds);

int removedCount = 0;
for (int i = 0; i < entries.size(); i++) {
boolean exists = (i < existsList.size()) && Boolean.TRUE.equals(existsList.get(i));
if (!exists) {
String key = entries.get(i).getKey();
SessionMapping mapping = subscriptionMappings.remove(key);
if (mapping != null) {
Set<String> subs = wsToSubscriptions.get(mapping.webSocketSessionId());
if (subs != null) {
subs.remove(key);
}
removedCount++;
}
}
}

if (removedCount > 0) {
log.info("[WatchingSessionRegistry] Cleanup completed. removed={}", removedCount);
}
public int size() {
return subscriptionMappings.size();
}
}
Loading