diff --git a/service/gateway/server/src/main/java/com/sparta/gateway/server/application/DistributedLockComponent.java b/service/gateway/server/src/main/java/com/sparta/gateway/server/application/DistributedLockComponent.java deleted file mode 100644 index f9a72f33..00000000 --- a/service/gateway/server/src/main/java/com/sparta/gateway/server/application/DistributedLockComponent.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.sparta.gateway.server.application; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.aspectj.lang.annotation.Aspect; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -@Aspect -@Component -@RequiredArgsConstructor -@Slf4j(topic = "DistributedLockComponent") -public class DistributedLockComponent { - - private final RedissonClient redissonClient; - - public void execute( - String lockName, long waitMilliSecond, long leaseMilliSecond, Runnable logic) { - RLock lock = redissonClient.getLock(lockName); - try { - boolean isLocked = lock.tryLock(waitMilliSecond, leaseMilliSecond, TimeUnit.MILLISECONDS); - if (!isLocked) { - throw new IllegalStateException("[" + lockName + "] lock 획득 실패"); - } - logic.run(); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - throw new RuntimeException(e); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - - @Bean - public ExecutorService customThreadPool() { - return Executors.newFixedThreadPool(10); - } - -} diff --git a/service/gateway/server/src/main/java/com/sparta/gateway/server/application/UserQueueService.java b/service/gateway/server/src/main/java/com/sparta/gateway/server/application/UserQueueService.java index 525b1e72..2bde6834 100644 --- a/service/gateway/server/src/main/java/com/sparta/gateway/server/application/UserQueueService.java +++ b/service/gateway/server/src/main/java/com/sparta/gateway/server/application/UserQueueService.java @@ -19,173 +19,96 @@ @RequiredArgsConstructor public class UserQueueService { + private static final String USER_QUEUE_WAIT_KEY = "users:queue:wait"; + private static final String USER_QUEUE_ACTIVE_KEY = "users:queue:active"; + private static final long INACTIVITY_THRESHOLD = 300; + private final ReactiveRedisTemplate reactiveRedisTemplate; - private final DistributedLockComponent lockComponent; - private final String USER_QUEUE_WAIT_KEY = "users:queue:wait"; - private final String USER_QUEUE_PROCEED_KEY = "users:queue:proceed"; - private final String USER_ACTIVE_SET_KEY = "users:active"; + @Value("${MAX_ACTIVE_USERS}") private long MAX_ACTIVE_USERS; - private final long INACTIVITY_THRESHOLD = 300; - - public Mono registerUser(String userId) { - return reactiveRedisTemplate.opsForZSet() - .rank(USER_QUEUE_PROCEED_KEY, userId) - .defaultIfEmpty(-1L) - .flatMap(rank -> rank >= 0 ? handleProceedUser(userId) : handleNewUser(userId)); - } - private Mono handleProceedUser(String userId) { - return updateUserActivityTime(userId) - .thenReturn(new RegisterUserResponse(0L)); + private long getCurrentTime() { + return Instant.now().getEpochSecond(); } - private Mono handleNewUser(String userId) { - return reactiveRedisTemplate.opsForSet().size(USER_ACTIVE_SET_KEY) - .flatMap(activeUsers -> activeUsers < MAX_ACTIVE_USERS ? addToProceedQueue(userId) - : checkAndAddToQueue(userId)); - } - - - private Mono checkAndAddToQueue(String userId) { - return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId) - .defaultIfEmpty(-1.0) - .flatMap(score -> { - if (score >= 0) { - return updateWaitQueueScore(userId); - } else { - return addToWaitQueue(userId); - } - }); - } - - private Mono updateWaitQueueScore(String userId) { - double newScore = Instant.now().getEpochSecond(); - return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId) - .flatMap(oldScore -> - reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_WAIT_KEY, userId, newScore) - .then(reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId)) - ) - .map(rank -> new RegisterUserResponse(rank + 1)) - ; - } - - public Mono addToProceedQueue(String userId) { - return Mono.create(sink -> { - lockComponent.execute(userId, 1000, 1000, () -> { - try { - addUserToQueue(userId) - .doOnSuccess(sink::success) - .doOnError(sink::error) - .subscribe(); - } catch (Exception e) { - sink.error(e); - } - }); - }); - } - - private Mono addUserToQueue(String userId) { - var unixTime = Instant.now().getEpochSecond(); - return reactiveRedisTemplate.opsForZSet() - .add(USER_QUEUE_PROCEED_KEY, userId, unixTime) - .filter(success -> success) - .flatMap(success -> { - if (success) { - return addToActiveSet(userId); - } else { - return checkAndAddToQueue(userId); - } - }); - } - - private Mono addToActiveSet(String userId) { - return reactiveRedisTemplate.opsForSet() - .add(USER_ACTIVE_SET_KEY, userId) - .map(i -> new RegisterUserResponse(0L)); - } - - - private Mono addToWaitQueue(String userId) { - var unixTime = Instant.now().getEpochSecond(); + public Mono registerUser(String userId) { return reactiveRedisTemplate.opsForZSet() - .add(USER_QUEUE_WAIT_KEY, userId, unixTime) - .filter(i -> i) - .switchIfEmpty(Mono.error(new GatewayException(GatewayErrorCode.TOO_MANY_REQUESTS))) - .flatMap(i -> reactiveRedisTemplate.opsForZSet() - .rank(USER_QUEUE_WAIT_KEY, userId)) - .map(rank -> new RegisterUserResponse(rank + 1)) - ; + .add(USER_QUEUE_WAIT_KEY, userId, getCurrentTime()) + .flatMap(success -> + reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId) + ) + .map(rank -> new RegisterUserResponse(rank + 1)); } public Mono isAllowed(String userId) { return reactiveRedisTemplate.opsForZSet() - .rank(USER_QUEUE_PROCEED_KEY, userId) + .rank(USER_QUEUE_ACTIVE_KEY, userId) .defaultIfEmpty(-1L) .map(rank -> rank >= 0) - .flatMap(isAllowed -> { - if (isAllowed) { - return updateUserActivityTime(userId).thenReturn(true); - } - return Mono.just(false); - }); + .flatMap(isAllowed -> isAllowed ? + updateUserActivityTime(userId).thenReturn(true) : + Mono.just(false) + ); } public Mono getRank(String userId) { - return reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId) + return reactiveRedisTemplate.opsForZSet() + .rank(USER_QUEUE_WAIT_KEY, userId) .defaultIfEmpty(-1L) .map(rank -> rank >= 0 ? rank + 1 : rank); } - @Scheduled(fixedRate = 30000) + @Scheduled(fixedRate = 10000, initialDelay = 500) public void scheduleAllowUser() { removeInactiveUsers() .then(allowUserTask()) - .subscribe( - movedUsers -> { - }, - error -> log.error(GatewayErrorCode.INTERNAL_SERVER_ERROR.getMessage(), error) - ); + .subscribe(); } private Mono removeInactiveUsers() { - long currentTime = Instant.now().getEpochSecond(); + long currentTime = getCurrentTime(); return reactiveRedisTemplate.opsForZSet() - .rangeWithScores(USER_QUEUE_PROCEED_KEY, Range.closed(0L, -1L)) + .rangeWithScores(USER_QUEUE_ACTIVE_KEY, Range.closed(0L, -1L)) .filter(userWithScore -> currentTime - userWithScore.getScore() > INACTIVITY_THRESHOLD) - .flatMap(userWithScore -> { - String userId = userWithScore.getValue(); - return reactiveRedisTemplate.opsForZSet().remove(USER_QUEUE_PROCEED_KEY, userId) - .then(reactiveRedisTemplate.opsForSet().remove(USER_ACTIVE_SET_KEY, userId)); - }) + .flatMap(userWithScore -> removeUser(userWithScore.getValue())) + .then(); + } + + private Mono removeUser(String userId) { + return reactiveRedisTemplate.opsForZSet() + .remove(USER_QUEUE_ACTIVE_KEY, userId) .then(); } private Mono allowUserTask() { - return reactiveRedisTemplate.opsForSet().size(USER_ACTIVE_SET_KEY) + return reactiveRedisTemplate.opsForZSet() + .size(USER_QUEUE_ACTIVE_KEY) .flatMap(activeUsers -> { long slotsAvailable = MAX_ACTIVE_USERS - activeUsers; - if (slotsAvailable <= 0) { - return Mono.just(0L); - } - return moveUsersToProceeds(slotsAvailable); + return slotsAvailable <= 0 ? + Mono.just(0L) : + moveUsersToActives(slotsAvailable); }); } - private Mono moveUsersToProceeds(long count) { + private Mono moveUsersToActives(long count) { return reactiveRedisTemplate.opsForZSet() .popMin(USER_QUEUE_WAIT_KEY, count) .flatMap(user -> { String userId = Objects.requireNonNull(user.getValue()); - return updateUserActivityTime(userId) - .then(reactiveRedisTemplate.opsForSet().add(USER_ACTIVE_SET_KEY, userId)); + return reactiveRedisTemplate.opsForZSet() + .add(USER_QUEUE_ACTIVE_KEY, userId, getCurrentTime()) + .filter(Boolean::booleanValue) + .switchIfEmpty(Mono.error(new GatewayException(GatewayErrorCode.TOO_MANY_REQUESTS))) + .thenReturn(1L); }) .count(); } private Mono updateUserActivityTime(String userId) { - long currentTime = Instant.now().getEpochSecond(); - return reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_PROCEED_KEY, userId, currentTime); + return reactiveRedisTemplate.opsForZSet() + .add(USER_QUEUE_ACTIVE_KEY, userId, getCurrentTime()); } } diff --git a/service/gateway/server/src/main/java/com/sparta/gateway/server/infrastructure/configuration/RedissonConfig.java b/service/gateway/server/src/main/java/com/sparta/gateway/server/infrastructure/configuration/RedissonConfig.java deleted file mode 100644 index 2c4fbb40..00000000 --- a/service/gateway/server/src/main/java/com/sparta/gateway/server/infrastructure/configuration/RedissonConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.sparta.gateway.server.infrastructure.configuration; - -import lombok.extern.slf4j.Slf4j; -import org.redisson.Redisson; -import org.redisson.api.RedissonClient; -import org.redisson.config.Config; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Slf4j -public class RedissonConfig { - - private static final String REDIS_URL_PREFIX = "redis://"; - - @Value("${spring.data.redis.host}") - private String host; - - @Value("${spring.data.redis.port}") - private int port; - - @Bean - RedissonClient redissonClient() { - Config config = new Config(); - config.useSingleServer().setAddress(REDIS_URL_PREFIX + host + ":" + port); - return Redisson.create(config); - } - -}