Skip to content

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,173 +19,96 @@
@RequiredArgsConstructor
public class UserQueueService {

private static final String USER_QUEUE_WAIT_KEY = "users:queue:wait";
private static final String USER_QUEUE_ACTIVE_KEY = "users:queue:active";
private static final long INACTIVITY_THRESHOLD = 300;

private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final DistributedLockComponent lockComponent;
private final String USER_QUEUE_WAIT_KEY = "users:queue:wait";
private final String USER_QUEUE_PROCEED_KEY = "users:queue:proceed";
private final String USER_ACTIVE_SET_KEY = "users:active";

@Value("${MAX_ACTIVE_USERS}")
private long MAX_ACTIVE_USERS;
private final long INACTIVITY_THRESHOLD = 300;

public Mono<RegisterUserResponse> registerUser(String userId) {
return reactiveRedisTemplate.opsForZSet()
.rank(USER_QUEUE_PROCEED_KEY, userId)
.defaultIfEmpty(-1L)
.flatMap(rank -> rank >= 0 ? handleProceedUser(userId) : handleNewUser(userId));
}

private Mono<RegisterUserResponse> handleProceedUser(String userId) {
return updateUserActivityTime(userId)
.thenReturn(new RegisterUserResponse(0L));
private long getCurrentTime() {
return Instant.now().getEpochSecond();
}

private Mono<RegisterUserResponse> handleNewUser(String userId) {
return reactiveRedisTemplate.opsForSet().size(USER_ACTIVE_SET_KEY)
.flatMap(activeUsers -> activeUsers < MAX_ACTIVE_USERS ? addToProceedQueue(userId)
: checkAndAddToQueue(userId));
}


private Mono<RegisterUserResponse> checkAndAddToQueue(String userId) {
return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId)
.defaultIfEmpty(-1.0)
.flatMap(score -> {
if (score >= 0) {
return updateWaitQueueScore(userId);
} else {
return addToWaitQueue(userId);
}
});
}

private Mono<RegisterUserResponse> updateWaitQueueScore(String userId) {
double newScore = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId)
.flatMap(oldScore ->
reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_WAIT_KEY, userId, newScore)
.then(reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId))
)
.map(rank -> new RegisterUserResponse(rank + 1))
;
}

public Mono<RegisterUserResponse> addToProceedQueue(String userId) {
return Mono.create(sink -> {
lockComponent.execute(userId, 1000, 1000, () -> {
try {
addUserToQueue(userId)
.doOnSuccess(sink::success)
.doOnError(sink::error)
.subscribe();
} catch (Exception e) {
sink.error(e);
}
});
});
}

private Mono<RegisterUserResponse> addUserToQueue(String userId) {
var unixTime = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_PROCEED_KEY, userId, unixTime)
.filter(success -> success)
.flatMap(success -> {
if (success) {
return addToActiveSet(userId);
} else {
return checkAndAddToQueue(userId);
}
});
}

private Mono<RegisterUserResponse> addToActiveSet(String userId) {
return reactiveRedisTemplate.opsForSet()
.add(USER_ACTIVE_SET_KEY, userId)
.map(i -> new RegisterUserResponse(0L));
}


private Mono<RegisterUserResponse> addToWaitQueue(String userId) {
var unixTime = Instant.now().getEpochSecond();
public Mono<RegisterUserResponse> registerUser(String userId) {
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_WAIT_KEY, userId, unixTime)
.filter(i -> i)
.switchIfEmpty(Mono.error(new GatewayException(GatewayErrorCode.TOO_MANY_REQUESTS)))
.flatMap(i -> reactiveRedisTemplate.opsForZSet()
.rank(USER_QUEUE_WAIT_KEY, userId))
.map(rank -> new RegisterUserResponse(rank + 1))
;
.add(USER_QUEUE_WAIT_KEY, userId, getCurrentTime())
.flatMap(success ->
reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId)
)
.map(rank -> new RegisterUserResponse(rank + 1));
}

public Mono<Boolean> isAllowed(String userId) {
return reactiveRedisTemplate.opsForZSet()
.rank(USER_QUEUE_PROCEED_KEY, userId)
.rank(USER_QUEUE_ACTIVE_KEY, userId)
.defaultIfEmpty(-1L)
.map(rank -> rank >= 0)
.flatMap(isAllowed -> {
if (isAllowed) {
return updateUserActivityTime(userId).thenReturn(true);
}
return Mono.just(false);
});
.flatMap(isAllowed -> isAllowed ?
updateUserActivityTime(userId).thenReturn(true) :
Mono.just(false)
);
}

public Mono<Long> getRank(String userId) {
return reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId)
return reactiveRedisTemplate.opsForZSet()
.rank(USER_QUEUE_WAIT_KEY, userId)
.defaultIfEmpty(-1L)
.map(rank -> rank >= 0 ? rank + 1 : rank);
}

@Scheduled(fixedRate = 30000)
@Scheduled(fixedRate = 10000, initialDelay = 500)
public void scheduleAllowUser() {
removeInactiveUsers()
.then(allowUserTask())
.subscribe(
movedUsers -> {
},
error -> log.error(GatewayErrorCode.INTERNAL_SERVER_ERROR.getMessage(), error)
);
.subscribe();
}

private Mono<Void> removeInactiveUsers() {
long currentTime = Instant.now().getEpochSecond();
long currentTime = getCurrentTime();
return reactiveRedisTemplate.opsForZSet()
.rangeWithScores(USER_QUEUE_PROCEED_KEY, Range.closed(0L, -1L))
.rangeWithScores(USER_QUEUE_ACTIVE_KEY, Range.closed(0L, -1L))
.filter(userWithScore -> currentTime - userWithScore.getScore() > INACTIVITY_THRESHOLD)
.flatMap(userWithScore -> {
String userId = userWithScore.getValue();
return reactiveRedisTemplate.opsForZSet().remove(USER_QUEUE_PROCEED_KEY, userId)
.then(reactiveRedisTemplate.opsForSet().remove(USER_ACTIVE_SET_KEY, userId));
})
.flatMap(userWithScore -> removeUser(userWithScore.getValue()))
.then();
}

private Mono<Void> removeUser(String userId) {
return reactiveRedisTemplate.opsForZSet()
.remove(USER_QUEUE_ACTIVE_KEY, userId)
.then();
}

private Mono<Long> allowUserTask() {
return reactiveRedisTemplate.opsForSet().size(USER_ACTIVE_SET_KEY)
return reactiveRedisTemplate.opsForZSet()
.size(USER_QUEUE_ACTIVE_KEY)
.flatMap(activeUsers -> {
long slotsAvailable = MAX_ACTIVE_USERS - activeUsers;
if (slotsAvailable <= 0) {
return Mono.just(0L);
}
return moveUsersToProceeds(slotsAvailable);
return slotsAvailable <= 0 ?
Mono.just(0L) :
moveUsersToActives(slotsAvailable);
});
}

private Mono<Long> moveUsersToProceeds(long count) {
private Mono<Long> moveUsersToActives(long count) {
return reactiveRedisTemplate.opsForZSet()
.popMin(USER_QUEUE_WAIT_KEY, count)
.flatMap(user -> {
String userId = Objects.requireNonNull(user.getValue());
return updateUserActivityTime(userId)
.then(reactiveRedisTemplate.opsForSet().add(USER_ACTIVE_SET_KEY, userId));
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_ACTIVE_KEY, userId, getCurrentTime())
.filter(Boolean::booleanValue)
.switchIfEmpty(Mono.error(new GatewayException(GatewayErrorCode.TOO_MANY_REQUESTS)))
.thenReturn(1L);
})
.count();
}

private Mono<Boolean> updateUserActivityTime(String userId) {
long currentTime = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_PROCEED_KEY, userId, currentTime);
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_ACTIVE_KEY, userId, getCurrentTime());
}

}

This file was deleted.