Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.time.LocalDateTime;

import static org.example.tablenow.global.constant.TimeConstants.TIME_YYYY_MM_DD_HH_MM_SS;

@Getter
@Builder
public class ChatMessageResponse {
Expand All @@ -16,7 +18,7 @@ public class ChatMessageResponse {
private final String senderName;
private final String content;
private final String imageUrl;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = TIME_YYYY_MM_DD_HH_MM_SS)
private final LocalDateTime createdAt;
@JsonProperty("isRead")
private final boolean read;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.example.tablenow.domain.notification.service.NotificationService;
import org.example.tablenow.global.exception.ErrorCode;
import org.example.tablenow.global.exception.HandledException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

Expand All @@ -22,13 +23,14 @@ public class ChatConsumer {
@RabbitListener(queues = CHAT_QUEUE)
public void consume(ChatMessageResponse chatMessage) {
if (chatMessage == null) {
log.warn("[ChatConsumer] 수신한 메시지가 null입니다.");
log.warn("[ChatConsumer] 수신한 메시지가 null");
return;
}

Long receiverId = determineReceiver(chatMessage);
if (receiverId == null) {
log.warn("[ChatConsumer] receiverId를 결정할 수 없습니다. chatMessage: {}", chatMessage);
log.warn("[ChatConsumer] receiverId를 결정할 수 없음 → senderId={}, ownerId={}, reservationUserId={}",
chatMessage.getSenderId(), chatMessage.getOwnerId(), chatMessage.getReservationUserId());
return;
}

Expand All @@ -43,9 +45,10 @@ public void consume(ChatMessageResponse chatMessage) {

log.info("[ChatConsumer] 채팅 알림 전송 완료 → receiverId={}, reservationId={}",
receiverId, chatMessage.getReservationId());

} catch (Exception e) {
log.error("[ChatConsumer] 채팅 알림 처리 중 예외 발생", e);
log.error("[ChatConsumer] 채팅 알림 처리 중 예외 발생 → receiverId={}, reservationId={}",
receiverId, chatMessage.getReservationId(), e);
throw new AmqpRejectAndDontRequeueException("[DLQ] 알림 전송 실패 → DLQ로 이동", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.example.tablenow.domain.chat.message.consumer;

import lombok.RequiredArgsConstructor;
import org.example.tablenow.domain.chat.message.service.ChatRetryService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static org.example.tablenow.global.constant.RabbitConstant.CHAT_DLQ;

@Component
@RequiredArgsConstructor
public class ChatDlqReprocessor {

private final ChatRetryService chatRetryService;

@RabbitListener(queues = CHAT_DLQ)
public void reprocess(Message message) {
chatRetryService.process(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.example.tablenow.domain.chat.message.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.tablenow.domain.chat.dto.response.ChatMessageResponse;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Map;

import static org.example.tablenow.global.constant.RabbitConstant.CHAT_EXCHANGE;
import static org.example.tablenow.global.constant.RabbitConstant.CHAT_ROUTING_KEY;

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatRetryService {

private final RabbitTemplate rabbitTemplate;

private static final String RETRY_HEADER = "x-retry-count";
private static final int MAX_RETRY_COUNT = 3;

public void process(Message message) {
try {
ChatMessageResponse chatMessage = parseMessage(message);
int retryCount = extractRetryCount(message);

if (retryCount >= MAX_RETRY_COUNT) {
log.warn("[ChatDLQ] 재처리 횟수 초과 → reservationId={}, senderId={}, retryCount={}",
chatMessage.getReservationId(), chatMessage.getSenderId(),retryCount);
return;
}

resendMessage(chatMessage, retryCount + 1);

} catch (Exception e) {
log.error("[ChatDLQ] 재처리 중 예외 발생", e);
}
}

private ChatMessageResponse parseMessage(Message message) {
Object object = rabbitTemplate.getMessageConverter().fromMessage(message);
// 수동 퍼블리시일 경우
if (object instanceof Map map) {
return ChatMessageResponse.builder()
.reservationId(((Number) map.get("reservationId")).longValue())
.senderId(((Number) map.get("senderId")).longValue())
.ownerId(((Number) map.get("ownerId")).longValue())
.reservationUserId(((Number) map.get("reservationUserId")).longValue())
.content((String) map.get("content"))
.imageUrl((String) map.get("imageUrl"))
.senderName((String) map.get("senderName"))
.build();
}

return (ChatMessageResponse) object;
}

private int extractRetryCount(Message message) {
return (Integer) message.getMessageProperties()
.getHeaders()
.getOrDefault(RETRY_HEADER, 0);
}

private void resendMessage(ChatMessageResponse chatMessage, int nextRetryCount) {
MessageProperties props = new MessageProperties();
props.setHeader(RETRY_HEADER, nextRetryCount);

Message retryMessage = rabbitTemplate.getMessageConverter().toMessage(chatMessage, props);
rabbitTemplate.send(CHAT_EXCHANGE, CHAT_ROUTING_KEY, retryMessage);

log.info("[DLQ] 재처리 시도 완료 → reservationId={}, senderId={}, retryCount={}",
chatMessage.getReservationId(), chatMessage.getSenderId(), nextRetryCount);
}
}
35 changes: 25 additions & 10 deletions src/main/java/org/example/tablenow/global/config/RabbitConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,13 @@ public Binding storeDeleteDlqBinding(Queue storeDeleteDlq, DirectExchange storeD
return bind(storeDeleteDlq, storeDlx, STORE_DELETE_DLQ);
}

private Binding bind(Queue queue, DirectExchange exchange, String routingKey) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}

// 채팅 알림 Queue, Exchange, Binding
@Bean
public Queue chatQueue() {
return new Queue(CHAT_QUEUE, true);
return QueueBuilder.durable(CHAT_QUEUE)
.withArgument("x-dead-letter-exchange", CHAT_DLX)
.withArgument("x-dead-letter-routing-key", CHAT_DLQ)
.build();
}

@Bean
Expand All @@ -239,11 +238,23 @@ public DirectExchange chatExchange() {
}

@Bean
public Binding chatBinding() {
return BindingBuilder
.bind(chatQueue())
.to(chatExchange())
.with(CHAT_ROUTING_KEY);
public Binding chatBinding(Queue chatQueue, DirectExchange chatExchange) {
return bind(chatQueue, chatExchange, CHAT_ROUTING_KEY);
}

@Bean
public DirectExchange chatDlx() {
return new DirectExchange(CHAT_DLX);
}

@Bean
public Queue chatDlq() {
return QueueBuilder.durable(CHAT_DLQ).build();
}

@Bean
public Binding chatDlqBinding(Queue chatDlq, DirectExchange chatDlx) {
return bind(chatDlq, chatDlx, CHAT_DLQ);
}

// 공통 설정
Expand Down Expand Up @@ -271,4 +282,8 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Conne
private Binding bindFanout(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

private Binding bind(Queue queue, DirectExchange exchange, String routingKey) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public class RabbitConstant {
public static final String EVENT_PREFIX = "event";
public static final String EVENT_OPEN_PREFIX = EVENT_PREFIX + ".open";

// 채팅 관련 PREFIX
public static final String CHAT_PREFIX = "chat";

// 빈자리 알림
public static final String VACANCY_EXCHANGE = VACANCY_PREFIX + ".direct";
public static final String VACANCY_QUEUE = VACANCY_PREFIX + ".queue";
Expand Down Expand Up @@ -47,9 +50,12 @@ public class RabbitConstant {
public static final String STORE_UPDATE_DLQ = "store.update.dlq";
public static final String STORE_DELETE_DLQ = "store.delete.dlq";

public static final String CHAT_EXCHANGE = "chat.exchange";
public static final String CHAT_QUEUE = "chat.queue";
public static final String CHAT_ROUTING_KEY = "chat.key";
// 채팅 알림
public static final String CHAT_EXCHANGE = CHAT_PREFIX + ".exchange";
public static final String CHAT_QUEUE = CHAT_PREFIX + ".queue";
public static final String CHAT_ROUTING_KEY = CHAT_PREFIX + ".key";
public static final String CHAT_DLX = CHAT_PREFIX + ".dlx";
public static final String CHAT_DLQ = CHAT_PREFIX + ".dlq";

public static final int TTL_MILLIS = 30000;
}