Skip to content

Commit c723900

Browse files
authored
Refactor/338 : Rate Limiter 토큰 대기방식 개선 (#422)
* refactor : bucket 캡슐화 * refactor : 변경된 메서드 reader에 적용 * refactor : refillGreedy로 변경함에 따른 설정 변경 * refactor : reader에서 ACK 제거 * feat : RedisStreams와 상호작용하는 부분 모듈화 * refactor : ACK 시점 변경 및 RedisStreams 사용 부분 리팩토링 * feat : writer와 reader에 적용 * feat : 문제 출제 실패 시, MQ 데이터 삭제 추가 * feat : 문제 출제 실패로 인한 메일 발송 실패 상태 추가 * refactor : RateLimiter 토큰 대기방식 개선 * feat : 누락된 코드 추가
1 parent dac136a commit c723900

File tree

5 files changed

+47
-9
lines changed

5 files changed

+47
-9
lines changed

cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,14 @@
22

33
import com.example.cs25batch.adapter.RedisStreamsClient;
44
import com.example.cs25batch.sender.context.MailSenderContext;
5-
import io.github.bucket4j.Bucket;
65
import java.time.Duration;
76
import java.util.HashMap;
8-
import java.util.List;
97
import java.util.Map;
108
import lombok.RequiredArgsConstructor;
119
import lombok.extern.slf4j.Slf4j;
1210
import org.springframework.batch.item.ItemReader;
13-
import org.springframework.beans.factory.annotation.Qualifier;
1411
import org.springframework.beans.factory.annotation.Value;
15-
import org.springframework.data.redis.connection.stream.Consumer;
1612
import org.springframework.data.redis.connection.stream.MapRecord;
17-
import org.springframework.data.redis.connection.stream.ReadOffset;
18-
import org.springframework.data.redis.connection.stream.StreamOffset;
19-
import org.springframework.data.redis.connection.stream.StreamReadOptions;
20-
import org.springframework.data.redis.core.StringRedisTemplate;
2113
import org.springframework.stereotype.Component;
2214

2315
@Slf4j
@@ -35,9 +27,12 @@ public class RedisStreamReader implements ItemReader<Map<String, String>> {
3527
public Map<String, String> read() throws InterruptedException {
3628
//long start = System.currentTimeMillis();
3729

30+
/*
3831
while (!mailSenderContext.tryConsume(strategyKey, 1L)) {
3932
Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도
4033
}
34+
*/
35+
mailSenderContext.acquirePermitOrWait(strategyKey);
4136

4237
MapRecord<String, Object, Object> msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500));
4338
//redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());

cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import com.example.cs25batch.batch.service.JavaMailService;
55
import io.github.bucket4j.Bandwidth;
66
import io.github.bucket4j.Bucket;
7+
import io.github.bucket4j.ConsumptionProbe;
8+
import java.util.concurrent.ThreadLocalRandom;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.locks.LockSupport;
711
import lombok.RequiredArgsConstructor;
812
import org.springframework.stereotype.Component;
913

@@ -17,7 +21,7 @@ public class JavaMailSenderStrategy implements MailSenderStrategy{
1721
.addLimit(
1822
Bandwidth.builder()
1923
.capacity(4)
20-
.refillGreedy(4, Duration.ofMillis(1000))
24+
.refillGreedy(2, Duration.ofMillis(500))
2125
.build()
2226
)
2327
.build();
@@ -31,4 +35,18 @@ public void sendQuizMail(MailDto mailDto) {
3135
public boolean tryConsume(Long num){
3236
return bucket.tryConsume(num);
3337
}
38+
39+
@Override
40+
public void acquirePermitOrWait(){
41+
while (true) {
42+
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
43+
if (probe.isConsumed()) return;
44+
45+
long nanos = probe.getNanosToWaitForRefill();
46+
long jitter = TimeUnit.MILLISECONDS.toNanos(
47+
ThreadLocalRandom.current().nextInt(0, 50)
48+
);
49+
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
50+
}
51+
}
3452
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.example.cs25batch.sender;
22

33
import com.example.cs25batch.batch.dto.MailDto;
4+
import io.github.bucket4j.Bucket;
45

56
public interface MailSenderStrategy {
67
void sendQuizMail(MailDto mailDto);
78

89
boolean tryConsume(Long num);
10+
11+
void acquirePermitOrWait();
912
}

cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import com.example.cs25batch.batch.service.SesMailService;
55
import io.github.bucket4j.Bandwidth;
66
import io.github.bucket4j.Bucket;
7+
import io.github.bucket4j.ConsumptionProbe;
8+
import java.util.concurrent.ThreadLocalRandom;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.locks.LockSupport;
711
import lombok.RequiredArgsConstructor;
812
import org.springframework.stereotype.Component;
913

@@ -32,4 +36,18 @@ public void sendQuizMail(MailDto mailDto) {
3236
public boolean tryConsume(Long num){
3337
return bucket.tryConsume(num);
3438
}
39+
40+
@Override
41+
public void acquirePermitOrWait(){
42+
while (true) {
43+
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
44+
if (probe.isConsumed()) return;
45+
46+
long nanos = probe.getNanosToWaitForRefill();
47+
long jitter = TimeUnit.MILLISECONDS.toNanos(
48+
ThreadLocalRandom.current().nextInt(0, 50)
49+
);
50+
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
51+
}
52+
}
3553
}

cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ private MailSenderStrategy getValidStrategy(String strategyKey) {
3030
return strategy;
3131
}
3232

33+
public void acquirePermitOrWait(String strategyKey) {
34+
MailSenderStrategy strategy = getValidStrategy(strategyKey);
35+
strategy.acquirePermitOrWait();
36+
}
3337
}

0 commit comments

Comments
 (0)