Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/kr/allcll/backend/config/ExternalPreInvoker.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kr.allcll.backend.config;

import kr.allcll.backend.client.ExternalService;
import kr.allcll.backend.support.sse.SseEmitterStorage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -12,6 +13,7 @@
public class ExternalPreInvoker {

private final ExternalService externalService;
private final SseEmitterStorage sseEmitterStorage;

@Scheduled(fixedDelay = 1000 * 10)
void sendPinnedSubjectsToExternal() {
Expand All @@ -23,4 +25,14 @@ void sendPinnedSubjectsToExternal() {
}
log.info("[ExternalPreInvoker] 핀 과목 전달 완료");
}

@Scheduled(fixedDelay = 1000 * 60)
void cleanupExpiredSseActiveTimes() {
try {
sseEmitterStorage.cleanupExpiredActiveTimes();
log.debug("[ExternalPreInvoker] SSE 활성 시각 정리 완료");
} catch (Exception e) {
log.error("[ExternalPreInvoker] SSE 활성 시각 정리 중 오류 발생", e);
}
}
}
39 changes: 39 additions & 0 deletions src/main/java/kr/allcll/backend/support/sse/SseConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kr.allcll.backend.support.sse;

import java.time.LocalDateTime;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

final class SseConnection {

private SseEmitter emitter;
private final LocalDateTime lastActiveAt;

SseConnection(SseEmitter emitter, LocalDateTime lastActiveAt) {
this.emitter = emitter;
this.lastActiveAt = lastActiveAt;
}

static SseConnection of(SseEmitter emitter) {
return new SseConnection(emitter, LocalDateTime.now());
}

SseEmitter getEmitter() {
return emitter;
}

boolean isConnected() {
return emitter != null;
}

boolean isExpired(LocalDateTime threshold) {
return lastActiveAt.isBefore(threshold);
}

boolean shouldCleanup(LocalDateTime threshold) {
return isExpired(threshold) && !isConnected();
}

void disconnect() {
this.emitter = null;
}
}
96 changes: 73 additions & 23 deletions src/main/java/kr/allcll/backend/support/sse/SseEmitterStorage.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kr.allcll.backend.support.sse;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -12,51 +15,98 @@
@Component
public class SseEmitterStorage {

/*
ExecutorService로 변경이 필요할 수 있습니다.
*/
private final Map<String, SseEmitter> emitters;
private static final Duration GRACE_PERIOD = Duration.ofSeconds(30);

private final Map<String, SseConnection> connections;

/*
동시성 문제로 자료구조 변경이 필요할 수 있습니다.
*/
public SseEmitterStorage() {
this.emitters = new ConcurrentHashMap<>();
this.connections = new ConcurrentHashMap<>();
}

public void add(String token, SseEmitter sseEmitter) {
emitters.put(token, sseEmitter);
log.info("[SSE] 새로운 연결이 추가되었습니다. 현재 연결 수: {}", emitters.size());
SseConnection connection = SseConnection.of(sseEmitter);
connections.put(token, connection);
log.info("[SSE] 새로운 연결이 추가되었습니다. 현재 연결 수: {}", getActiveConnectionCount());

sseEmitter.onTimeout(() -> {
emitters.remove(token);
log.debug("[SSE] 연결이 타임아웃으로 종료되었습니다. 현재 연결 수: {}", emitters.size());
});
sseEmitter.onError(e -> {
emitters.remove(token);
});
sseEmitter.onCompletion(() -> {
emitters.remove(token);
disconnectToken(token, sseEmitter);
log.debug("[SSE] 연결이 타임아웃으로 종료되었습니다. 현재 연결 수: {}", getActiveConnectionCount());
});
sseEmitter.onError(e -> disconnectToken(token, sseEmitter));
sseEmitter.onCompletion(() -> disconnectToken(token, sseEmitter));
}

private void disconnectToken(String token, SseEmitter sseEmitter) {
SseConnection connection = connections.get(token);
if (connection != null && connection.getEmitter() == sseEmitter) {
connection.disconnect();
connections.remove(token);
}
}

/*
실제 객체를 전달하지 않으면 이미 완료된 SseEmitter에 대해 send() 호출이 발생한다.
Collections.unmodifiableList(), Stream.toList()를 사용하면 랩핑한 객체를 반환하기에 위 문제가 발생한다.
*/
public List<SseEmitter> getEmitters() {
return emitters.values().stream().toList();
return connections.values().stream()
.map(SseConnection::getEmitter)
.filter(Objects::nonNull)
.toList();
}

public Optional<SseEmitter> getEmitter(String token) {
SseEmitter emitter = emitters.get(token);
return Optional.ofNullable(emitter);
SseConnection connection = connections.get(token);
if (connection == null) {
return Optional.empty();
}
return Optional.ofNullable(connection.getEmitter());
}

public List<String> getUserTokens() {
return emitters.keySet().stream().toList();
return getUserTokensWithinGracePeriod();
}

/**
* Grace Period 내에 활성화된 적이 있는 사용자 토큰 목록을 반환합니다.
* 현재 연결된 사용자와 최근 GRACE_PERIOD 이내에 연결이 끊긴 사용자를 포함합니다.
*/
private List<String> getUserTokensWithinGracePeriod() {
LocalDateTime gracePeriodThreshold = LocalDateTime.now().minus(GRACE_PERIOD);

return connections.entrySet().stream()
.filter(entry -> {
SseConnection sseConnection = entry.getValue();

if (sseConnection.isConnected()) {
return true;
}

return !sseConnection.isExpired(gracePeriodThreshold);
})
.map(Map.Entry::getKey)
.toList();
}

/**
* Grace Period를 초과한 오래된 연결 엔트리를 정리합니다.
* 주기적으로 호출되어 메모리 누수를 방지합니다.
*/
public void cleanupExpiredActiveTimes() {
LocalDateTime cleanupThreshold = LocalDateTime.now().minus(GRACE_PERIOD);

connections.entrySet().removeIf(entry -> {
boolean remove = entry.getValue().shouldCleanup(cleanupThreshold);
if (remove) {
log.debug("[SSE] Grace Period 초과로 토큰 정리: {}", entry.getKey());
}
return remove;
});
}

public int getActiveConnectionCount() {
return emitters.size();
return (int) connections.values().stream()
.filter(SseConnection::isConnected)
.count();
}
}
Loading
Loading