Skip to content

Commit b4577f9

Browse files
authored
Merge pull request #268 from spring-team-7/feat/chat
[feat] ์ฑ„ํŒ… ์•Œ๋ฆผ DLQ / DLX ์ ์šฉ
2 parents 4ced6c3 + b810de8 commit b4577f9

6 files changed

Lines changed: 143 additions & 18 deletions

File tree

โ€Žsrc/main/java/org/example/tablenow/domain/chat/dto/response/ChatMessageResponse.javaโ€Ž

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import java.time.LocalDateTime;
1010

11+
import static org.example.tablenow.global.constant.TimeConstants.TIME_YYYY_MM_DD_HH_MM_SS;
12+
1113
@Getter
1214
@Builder
1315
public class ChatMessageResponse {
@@ -16,7 +18,7 @@ public class ChatMessageResponse {
1618
private final String senderName;
1719
private final String content;
1820
private final String imageUrl;
19-
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
21+
@JsonFormat(pattern = TIME_YYYY_MM_DD_HH_MM_SS)
2022
private final LocalDateTime createdAt;
2123
@JsonProperty("isRead")
2224
private final boolean read;

โ€Žsrc/main/java/org/example/tablenow/domain/chat/message/consumer/ChatConsumer.javaโ€Ž

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.example.tablenow.domain.notification.service.NotificationService;
99
import org.example.tablenow.global.exception.ErrorCode;
1010
import org.example.tablenow.global.exception.HandledException;
11+
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
1112
import org.springframework.amqp.rabbit.annotation.RabbitListener;
1213
import org.springframework.stereotype.Component;
1314

@@ -22,13 +23,14 @@ public class ChatConsumer {
2223
@RabbitListener(queues = CHAT_QUEUE)
2324
public void consume(ChatMessageResponse chatMessage) {
2425
if (chatMessage == null) {
25-
log.warn("[ChatConsumer] ์ˆ˜์‹ ํ•œ ๋ฉ”์‹œ์ง€๊ฐ€ null์ž…๋‹ˆ๋‹ค.");
26+
log.warn("[ChatConsumer] ์ˆ˜์‹ ํ•œ ๋ฉ”์‹œ์ง€๊ฐ€ null");
2627
return;
2728
}
2829

2930
Long receiverId = determineReceiver(chatMessage);
3031
if (receiverId == null) {
31-
log.warn("[ChatConsumer] receiverId๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. chatMessage: {}", chatMessage);
32+
log.warn("[ChatConsumer] receiverId๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์—†์Œ โ†’ senderId={}, ownerId={}, reservationUserId={}",
33+
chatMessage.getSenderId(), chatMessage.getOwnerId(), chatMessage.getReservationUserId());
3234
return;
3335
}
3436

@@ -43,9 +45,10 @@ public void consume(ChatMessageResponse chatMessage) {
4345

4446
log.info("[ChatConsumer] ์ฑ„ํŒ… ์•Œ๋ฆผ ์ „์†ก ์™„๋ฃŒ โ†’ receiverId={}, reservationId={}",
4547
receiverId, chatMessage.getReservationId());
46-
4748
} catch (Exception e) {
48-
log.error("[ChatConsumer] ์ฑ„ํŒ… ์•Œ๋ฆผ ์ฒ˜๋ฆฌ ์ค‘ ์˜ˆ์™ธ ๋ฐœ์ƒ", e);
49+
log.error("[ChatConsumer] ์ฑ„ํŒ… ์•Œ๋ฆผ ์ฒ˜๋ฆฌ ์ค‘ ์˜ˆ์™ธ ๋ฐœ์ƒ โ†’ receiverId={}, reservationId={}",
50+
receiverId, chatMessage.getReservationId(), e);
51+
throw new AmqpRejectAndDontRequeueException("[DLQ] ์•Œ๋ฆผ ์ „์†ก ์‹คํŒจ โ†’ DLQ๋กœ ์ด๋™", e);
4952
}
5053
}
5154

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.example.tablenow.domain.chat.message.consumer;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.example.tablenow.domain.chat.message.service.ChatRetryService;
5+
import org.springframework.amqp.core.Message;
6+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7+
import org.springframework.stereotype.Component;
8+
9+
import static org.example.tablenow.global.constant.RabbitConstant.CHAT_DLQ;
10+
11+
@Component
12+
@RequiredArgsConstructor
13+
public class ChatDlqReprocessor {
14+
15+
private final ChatRetryService chatRetryService;
16+
17+
@RabbitListener(queues = CHAT_DLQ)
18+
public void reprocess(Message message) {
19+
chatRetryService.process(message);
20+
}
21+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.example.tablenow.domain.chat.message.service;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.example.tablenow.domain.chat.dto.response.ChatMessageResponse;
6+
import org.springframework.amqp.core.Message;
7+
import org.springframework.amqp.core.MessageProperties;
8+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
9+
import org.springframework.stereotype.Component;
10+
11+
import java.util.Map;
12+
13+
import static org.example.tablenow.global.constant.RabbitConstant.CHAT_EXCHANGE;
14+
import static org.example.tablenow.global.constant.RabbitConstant.CHAT_ROUTING_KEY;
15+
16+
@Slf4j
17+
@Component
18+
@RequiredArgsConstructor
19+
public class ChatRetryService {
20+
21+
private final RabbitTemplate rabbitTemplate;
22+
23+
private static final String RETRY_HEADER = "x-retry-count";
24+
private static final int MAX_RETRY_COUNT = 3;
25+
26+
public void process(Message message) {
27+
try {
28+
ChatMessageResponse chatMessage = parseMessage(message);
29+
int retryCount = extractRetryCount(message);
30+
31+
if (retryCount >= MAX_RETRY_COUNT) {
32+
log.warn("[ChatDLQ] ์žฌ์ฒ˜๋ฆฌ ํšŸ์ˆ˜ ์ดˆ๊ณผ โ†’ reservationId={}, senderId={}, retryCount={}",
33+
chatMessage.getReservationId(), chatMessage.getSenderId(),retryCount);
34+
return;
35+
}
36+
37+
resendMessage(chatMessage, retryCount + 1);
38+
39+
} catch (Exception e) {
40+
log.error("[ChatDLQ] ์žฌ์ฒ˜๋ฆฌ ์ค‘ ์˜ˆ์™ธ ๋ฐœ์ƒ", e);
41+
}
42+
}
43+
44+
private ChatMessageResponse parseMessage(Message message) {
45+
Object object = rabbitTemplate.getMessageConverter().fromMessage(message);
46+
// ์ˆ˜๋™ ํผ๋ธ”๋ฆฌ์‹œ์ผ ๊ฒฝ์šฐ
47+
if (object instanceof Map map) {
48+
return ChatMessageResponse.builder()
49+
.reservationId(((Number) map.get("reservationId")).longValue())
50+
.senderId(((Number) map.get("senderId")).longValue())
51+
.ownerId(((Number) map.get("ownerId")).longValue())
52+
.reservationUserId(((Number) map.get("reservationUserId")).longValue())
53+
.content((String) map.get("content"))
54+
.imageUrl((String) map.get("imageUrl"))
55+
.senderName((String) map.get("senderName"))
56+
.build();
57+
}
58+
59+
return (ChatMessageResponse) object;
60+
}
61+
62+
private int extractRetryCount(Message message) {
63+
return (Integer) message.getMessageProperties()
64+
.getHeaders()
65+
.getOrDefault(RETRY_HEADER, 0);
66+
}
67+
68+
private void resendMessage(ChatMessageResponse chatMessage, int nextRetryCount) {
69+
MessageProperties props = new MessageProperties();
70+
props.setHeader(RETRY_HEADER, nextRetryCount);
71+
72+
Message retryMessage = rabbitTemplate.getMessageConverter().toMessage(chatMessage, props);
73+
rabbitTemplate.send(CHAT_EXCHANGE, CHAT_ROUTING_KEY, retryMessage);
74+
75+
log.info("[DLQ] ์žฌ์ฒ˜๋ฆฌ ์‹œ๋„ ์™„๋ฃŒ โ†’ reservationId={}, senderId={}, retryCount={}",
76+
chatMessage.getReservationId(), chatMessage.getSenderId(), nextRetryCount);
77+
}
78+
}

โ€Žsrc/main/java/org/example/tablenow/global/config/RabbitConfig.javaโ€Ž

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,13 @@ public Binding storeDeleteDlqBinding(Queue storeDeleteDlq, DirectExchange storeD
223223
return bind(storeDeleteDlq, storeDlx, STORE_DELETE_DLQ);
224224
}
225225

226-
private Binding bind(Queue queue, DirectExchange exchange, String routingKey) {
227-
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
228-
}
229-
230226
// ์ฑ„ํŒ… ์•Œ๋ฆผ Queue, Exchange, Binding
231227
@Bean
232228
public Queue chatQueue() {
233-
return new Queue(CHAT_QUEUE, true);
229+
return QueueBuilder.durable(CHAT_QUEUE)
230+
.withArgument("x-dead-letter-exchange", CHAT_DLX)
231+
.withArgument("x-dead-letter-routing-key", CHAT_DLQ)
232+
.build();
234233
}
235234

236235
@Bean
@@ -239,11 +238,23 @@ public DirectExchange chatExchange() {
239238
}
240239

241240
@Bean
242-
public Binding chatBinding() {
243-
return BindingBuilder
244-
.bind(chatQueue())
245-
.to(chatExchange())
246-
.with(CHAT_ROUTING_KEY);
241+
public Binding chatBinding(Queue chatQueue, DirectExchange chatExchange) {
242+
return bind(chatQueue, chatExchange, CHAT_ROUTING_KEY);
243+
}
244+
245+
@Bean
246+
public DirectExchange chatDlx() {
247+
return new DirectExchange(CHAT_DLX);
248+
}
249+
250+
@Bean
251+
public Queue chatDlq() {
252+
return QueueBuilder.durable(CHAT_DLQ).build();
253+
}
254+
255+
@Bean
256+
public Binding chatDlqBinding(Queue chatDlq, DirectExchange chatDlx) {
257+
return bind(chatDlq, chatDlx, CHAT_DLQ);
247258
}
248259

249260
// ๊ณตํ†ต ์„ค์ •
@@ -271,4 +282,8 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Conne
271282
private Binding bindFanout(Queue queue, FanoutExchange exchange) {
272283
return BindingBuilder.bind(queue).to(exchange);
273284
}
285+
286+
private Binding bind(Queue queue, DirectExchange exchange, String routingKey) {
287+
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
288+
}
274289
}

โ€Žsrc/main/java/org/example/tablenow/global/constant/RabbitConstant.javaโ€Ž

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ public class RabbitConstant {
1212
public static final String EVENT_PREFIX = "event";
1313
public static final String EVENT_OPEN_PREFIX = EVENT_PREFIX + ".open";
1414

15+
// ์ฑ„ํŒ… ๊ด€๋ จ PREFIX
16+
public static final String CHAT_PREFIX = "chat";
17+
1518
// ๋นˆ์ž๋ฆฌ ์•Œ๋ฆผ
1619
public static final String VACANCY_EXCHANGE = VACANCY_PREFIX + ".direct";
1720
public static final String VACANCY_QUEUE = VACANCY_PREFIX + ".queue";
@@ -47,9 +50,12 @@ public class RabbitConstant {
4750
public static final String STORE_UPDATE_DLQ = "store.update.dlq";
4851
public static final String STORE_DELETE_DLQ = "store.delete.dlq";
4952

50-
public static final String CHAT_EXCHANGE = "chat.exchange";
51-
public static final String CHAT_QUEUE = "chat.queue";
52-
public static final String CHAT_ROUTING_KEY = "chat.key";
53+
// ์ฑ„ํŒ… ์•Œ๋ฆผ
54+
public static final String CHAT_EXCHANGE = CHAT_PREFIX + ".exchange";
55+
public static final String CHAT_QUEUE = CHAT_PREFIX + ".queue";
56+
public static final String CHAT_ROUTING_KEY = CHAT_PREFIX + ".key";
57+
public static final String CHAT_DLX = CHAT_PREFIX + ".dlx";
58+
public static final String CHAT_DLQ = CHAT_PREFIX + ".dlq";
5359

5460
public static final int TTL_MILLIS = 30000;
5561
}

0 commit comments

Comments
ย (0)