Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.example.cs25batch.adapter;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.StringRedisTemplate;

@RequiredArgsConstructor
public class RedisStreamsClient {

private final StringRedisTemplate redisTemplate;
private final String stream;
private final String group;
private final String consumer;

@Nullable
public MapRecord<String, Object, Object> readWithConsumerGroup(Duration blockTimeout) {

StreamReadOptions options = StreamReadOptions.empty().count(1).block(blockTimeout);

List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read(
Consumer.from(group, consumer),
options,
StreamOffset.create(stream, ReadOffset.lastConsumed())
);

return (records == null || records.isEmpty()) ? null : records.get(0);
}

public void ack(String recordId) {
redisTemplate.opsForStream().acknowledge(stream, group, RecordId.of(recordId));
}

public void del(String recordId) {
redisTemplate.opsForStream().delete(stream, RecordId.of(recordId));
}

public void ackAndDel(String recordId) {
RecordId id = RecordId.of(recordId);
redisTemplate.opsForStream().acknowledge(stream, group, id);
redisTemplate.opsForStream().delete(stream, id);
}

public void addDlq(String dlqStream, Map<String, String> message){
redisTemplate.opsForStream().add(dlqStream, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.example.cs25batch.aop;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25entity.domain.mail.entity.MailLog;
import com.example.cs25entity.domain.mail.enums.MailStatus;
Expand All @@ -26,7 +27,7 @@
public class MailLogAspect {

private final MailLogRepository mailLogRepository;
private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient redisClient;

@Around("execution(* com.example.cs25batch.sender.context.MailSenderContext.send(..))")
public Object logMailSend(ProceedingJoinPoint joinPoint) throws Throwable {
Expand Down Expand Up @@ -66,7 +67,7 @@ public Object logMailSend(ProceedingJoinPoint joinPoint) throws Throwable {
"subscriptionId", subscription.getId().toString(),
"quizId", quiz.getId().toString()
);
redisTemplate.opsForStream().add("quiz-email-retry-stream", retryMessage);
redisClient.addDlq("quiz-email-retry-stream", retryMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.example.cs25batch.batch.component.processor;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25batch.batch.service.TodayQuizService;
import com.example.cs25entity.domain.quiz.entity.Quiz;
import com.example.cs25entity.domain.quiz.exception.QuizException;
import com.example.cs25entity.domain.subscription.entity.Subscription;
import com.example.cs25entity.domain.subscription.repository.SubscriptionRepository;
import java.util.Map;
Expand All @@ -18,6 +20,7 @@ public class MailConsumerAsyncProcessor implements ItemProcessor<Map<String, Str

private final SubscriptionRepository subscriptionRepository;
private final TodayQuizService todayQuizService;
private final RedisStreamsClient redisClient;

@Override
public MailDto process(Map<String, String> message) throws Exception {
Expand All @@ -37,14 +40,19 @@ public MailDto process(Map<String, String> message) throws Exception {
}

//Quiz 출제
Quiz quiz = todayQuizService.getTodayQuizBySubscription(subscription);
try {
Quiz quiz = todayQuizService.getTodayQuizBySubscription(subscription);
return MailDto.builder()
.subscription(subscription)
.quiz(quiz)
.recordId(recordId)
.build();
} catch(QuizException e){
//문제 출제 실패로 인한 예외 발생 시, 기존 Queue에 있는 데이터 삭제
redisClient.ackAndDel(recordId);
return null;
}
//long quizEnd = System.currentTimeMillis();
//log.info("[5. 문제 출제] QuizId : {} {}ms", quiz.getId(), quizEnd - quizStart);

return MailDto.builder()
.subscription(subscription)
.quiz(quiz)
.recordId(recordId)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.example.cs25batch.batch.component.reader;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.sender.context.MailSenderContext;
import io.github.bucket4j.Bucket;
import java.time.Duration;
Expand All @@ -24,37 +25,24 @@
@Component("redisConsumeReader")
public class RedisStreamReader implements ItemReader<Map<String, String>> {

private static final String STREAM = "quiz-email-stream";
private static final String GROUP = "mail-consumer-group";
private static final String CONSUMER = "mail-worker";

@Value("${mail.strategy:javaBatchMailSender}")
private String strategyKey;

private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient redisClient;
private final MailSenderContext mailSenderContext;

@Override
public Map<String, String> read() throws InterruptedException {
//long start = System.currentTimeMillis();
Bucket bucket = mailSenderContext.getBucket(strategyKey);

while (!bucket.tryConsume(1)) {
while (!mailSenderContext.tryConsume(strategyKey, 1L)) {
Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도
}

List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read(
Consumer.from(GROUP, CONSUMER),
StreamReadOptions.empty().count(1).block(Duration.ofMillis(500)),
StreamOffset.create(STREAM, ReadOffset.lastConsumed())
);

if (records == null || records.isEmpty()) {
return null;
}
MapRecord<String, Object, Object> msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500));
//redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());

MapRecord<String, Object, Object> msg = records.get(0);
redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());
if(msg == null || msg.getValue().isEmpty()) return null;

Map<String, String> data = new HashMap<>();
Object subscriptionId = msg.getValue().get("subscriptionId");
Expand All @@ -65,7 +53,6 @@ public Map<String, String> read() throws InterruptedException {

//long end = System.currentTimeMillis();
//log.info("[3. Queue에서 꺼내기] {}ms", end - start);

return data;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.example.cs25batch.batch.component.writer;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25batch.sender.context.MailSenderContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
Expand All @@ -17,7 +16,7 @@
public class MailWriter implements ItemWriter<MailDto> {

private final MailSenderContext mailSenderContext;
private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient streamsClient;

@Value("${mail.strategy:javaBatchMailSender}")
private String strategyKey;
Expand All @@ -34,17 +33,8 @@ public void write(Chunk<? extends MailDto> items) throws Exception {
// 에러 로깅 또는 알림 처리
System.err.println("메일 발송 실패: " + e.getMessage());
} finally {
deleteStreamRecord(mail.getRecordId());
streamsClient.ackAndDel(mail.getRecordId());
}
}
}

private void deleteStreamRecord(String recordIdStr){
try {
RecordId recordId = RecordId.of(recordIdStr);
redisTemplate.opsForStream().delete("quiz-email-stream", recordId);
} catch (Exception e) {
log.warn("Redis 스트림 레코드 삭제 실패: recordId = {}, error = {}", recordIdStr, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.example.cs25batch.config;

import com.example.cs25batch.adapter.RedisStreamsClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;

@Configuration
public class RedisStreamsConfig {
@Bean
public RedisStreamsClient quizEmailStreamsClient(StringRedisTemplate redisTemplate) {
return new RedisStreamsClient(
redisTemplate,
"quiz-email-stream", // stream 이름
"mail-consumer-group", // group
"mail-worker" // consumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class JavaMailSenderStrategy implements MailSenderStrategy{
.addLimit(
Bandwidth.builder()
.capacity(4)
.refillGreedy(2, Duration.ofMillis(500))
.refillGreedy(4, Duration.ofMillis(1000))
.build()
)
.build();
Expand All @@ -28,7 +28,7 @@ public void sendQuizMail(MailDto mailDto) {
}

@Override
public Bucket getBucket() {
return bucket;
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.example.cs25batch.sender;

import com.example.cs25batch.batch.dto.MailDto;
import io.github.bucket4j.Bucket;

public interface MailSenderStrategy {
void sendQuizMail(MailDto mailDto);

Bucket getBucket();
boolean tryConsume(Long num);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class SesMailSenderStrategy implements MailSenderStrategy{
.addLimit(
Bandwidth.builder()
.capacity(14)
.refillGreedy(7, Duration.ofMillis(500))
.refillGreedy(14, Duration.ofMillis(1000))
.build()
)
.build();
Expand All @@ -29,7 +29,7 @@ public void sendQuizMail(MailDto mailDto) {
}

@Override
public Bucket getBucket() {
return bucket;
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.example.cs25batch.sender.MailSenderStrategy;
import java.util.Map;

import io.github.bucket4j.Bucket;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

Expand All @@ -18,9 +17,9 @@ public void send(MailDto dto, String strategyKey) {
strategy.sendQuizMail(dto);
}

public Bucket getBucket(String strategyKey) {
public boolean tryConsume(String strategyKey, Long num) {
MailSenderStrategy strategy = getValidStrategy(strategyKey);
return strategy.getBucket();
return strategy.tryConsume(num);
}

private MailSenderStrategy getValidStrategy(String strategyKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum MailStatus {
SENT,
FAILED
FAILED,
QUIZ_FAILED
}