From 7fc0bef2b3a168a987319015d5c92e395feeabf6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Fri, 17 Oct 2025 18:27:12 +0900 Subject: [PATCH 01/34] feat(build): add Kafka dependencies for application and testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `spring-kafka` 추가하여 Kafka 기반 메시징 구현 가능 - 테스트를 위해 `spring-kafka-test` 의존성 추가 - 메시징 기능 확장 및 관련 테스트 작성 환경 준비 --- build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index f56f3a33..b26e195a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -48,6 +48,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-validation") implementation("org.springframework.security:spring-security-core") + implementation("org.springframework.kafka:spring-kafka") implementation("io.jsonwebtoken:jjwt-api:0.12.6") implementation("io.jsonwebtoken:jjwt-impl:0.12.6") implementation("io.jsonwebtoken:jjwt-jackson:0.12.6") @@ -62,6 +63,7 @@ dependencies { testCompileOnly("org.projectlombok:lombok") testImplementation("org.springframework.boot:spring-boot-starter-test") + testImplementation("org.springframework.kafka:spring-kafka-test") testRuntimeOnly("org.junit.platform:junit-platform-launcher") testImplementation("org.junit-pioneer:junit-pioneer:2.3.0") testImplementation("org.mockito:mockito-core:5.18.0") From 23d5c2cdb9392cc7a16aa64cd6eff648fe698327 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Fri, 17 Oct 2025 18:58:51 +0900 Subject: [PATCH 02/34] feat(docker): enhance docker-compose with Kafka and Zookeeper setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Docker Compose 구성에 Kafka 및 Zookeeper 서비스 추가하여 메시징 기능 지원 - Kafka 설정에 내부 및 외부 리스너 설정(KAFKA_LISTENERS) 추가로 유연성 강화 - Kafka와 Zookeeper 서비스 간 healthcheck 및 종속성 설정으로 안정적 시작 보장 - 기존 서비스 구성 개선: - `restart: unless-stopped`를 설정하여 컨테이너 안정성 확보 - MySQL 및 Elasticsearch healthcheck 설정 최적화 - 필요하지 않은 주석 처리된 MySQL 테스트 서비스 제거로 구성 단순화 --- docker-compose.yml | 79 +++++++++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 25 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1251589a..67f3ffa2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,10 @@ +version: "3.8" + services: mysql: image: mysql:8.0 container_name: see-mysql + restart: unless-stopped ports: - "3306:3306" environment: @@ -14,30 +17,15 @@ services: - ./docker/mysql/init:/docker-entrypoint-initdb.d command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] - timeout: 20s + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-proot"] + interval: 10s + timeout: 5s retries: 10 - restart: unless-stopped - -# mysql-test: -# image: mysql:8.0 -# container_name: see-test-mysql -# ports: -# - "3307:3306" -# environment: -# MYSQL_ROOT_PASSWORD: root -# MYSQL_DATABASE: see_test -# MYSQL_USER: see_user -# MYSQL_PASSWORD: see_password -# volumes: -# - mysql_test_data:/var/lib/mysql -# - ./docker/mysql/init:/docker-entrypoint-initdb.d -# command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci -# restart: unless-stopped elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0 container_name: see-elasticsearch + restart: unless-stopped environment: - discovery.type=single-node - xpack.security.enabled=false @@ -51,23 +39,64 @@ services: interval: 10s timeout: 5s retries: 5 - restart: unless-stopped kibana: image: docker.elastic.co/kibana/kibana:8.12.0 container_name: see-kibana + restart: unless-stopped ports: - "5601:5601" environment: ELASTICSEARCH_HOSTS: http://see-elasticsearch:9200 depends_on: - - elasticsearch + elasticsearch: + condition: service_healthy + + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + container_name: see-zookeeper restart: unless-stopped + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + + kafka: + image: confluentinc/cp-kafka:7.6.1 + container_name: see-kafka + restart: unless-stopped + depends_on: + zookeeper: + condition: service_healthy + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + # 두 가지 리스너를 노출하여 컨테이너 내부/호스트 접근 모두 지원 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://see-kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + + # 기본 동작 설정 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG_RETENTION_HOURS: 168 + KAFKA_DELETE_TOPIC_ENABLE: "true" + + healthcheck: + test: [ "CMD", "bash", "-c", "kafka-topics --bootstrap-server localhost:9092 --list >/dev/null 2>&1" ] + interval: 10s + timeout: 5s + retries: 10 volumes: mysql_data: - driver: local - mysql_test_data: es_data: - driver: local - From 9b4fb249c940caa31f05adcc21336265eec1fbb6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Fri, 17 Oct 2025 19:18:04 +0900 Subject: [PATCH 03/34] feat(post): add serialization and event publishing interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `PostUpdated`와 `PostCreated` 이벤트 클래스에 `Serializable` 인터페이스 추가하여 이벤트 직렬화 가능 - 애플리케이션 계층에서 도메인 이벤트를 게시할 수 있도록 `PostEventPublisher` 인터페이스 정의 - 이벤트 직렬화 및 게시 기능 추가로 메시징 시스템 연동 및 확장성 강화 - 코드의 유지보수성을 고려하여 인터페이스 기반 설계를 채택 --- .../see/application/post/required/PostEventPublisher.java | 7 +++++++ src/main/java/dooya/see/domain/post/event/PostCreated.java | 4 +++- src/main/java/dooya/see/domain/post/event/PostUpdated.java | 4 +++- 3 files changed, 13 insertions(+), 2 deletions(-) create mode 100644 src/main/java/dooya/see/application/post/required/PostEventPublisher.java diff --git a/src/main/java/dooya/see/application/post/required/PostEventPublisher.java b/src/main/java/dooya/see/application/post/required/PostEventPublisher.java new file mode 100644 index 00000000..9f79b592 --- /dev/null +++ b/src/main/java/dooya/see/application/post/required/PostEventPublisher.java @@ -0,0 +1,7 @@ +package dooya.see.application.post.required; + +import dooya.see.domain.shared.DomainEvent; + +public interface PostEventPublisher { + void publish(DomainEvent event); +} diff --git a/src/main/java/dooya/see/domain/post/event/PostCreated.java b/src/main/java/dooya/see/domain/post/event/PostCreated.java index 6ef91289..56a5b623 100644 --- a/src/main/java/dooya/see/domain/post/event/PostCreated.java +++ b/src/main/java/dooya/see/domain/post/event/PostCreated.java @@ -4,11 +4,13 @@ import dooya.see.domain.post.PostCategory; import dooya.see.domain.shared.DomainEvent; +import java.io.Serializable; + public record PostCreated( Long postId, Long memberId, PostCategory category, boolean publishImmediately, Post post -) implements DomainEvent { +) implements DomainEvent, Serializable { } diff --git a/src/main/java/dooya/see/domain/post/event/PostUpdated.java b/src/main/java/dooya/see/domain/post/event/PostUpdated.java index b6baeaae..f6628e12 100644 --- a/src/main/java/dooya/see/domain/post/event/PostUpdated.java +++ b/src/main/java/dooya/see/domain/post/event/PostUpdated.java @@ -3,6 +3,8 @@ import dooya.see.domain.post.Post; import dooya.see.domain.shared.DomainEvent; +import java.io.Serializable; + /** * 게시글이 수정되었을 때 발생하는 도메인 이벤트 */ @@ -14,4 +16,4 @@ public record PostUpdated( boolean categoryChanged, boolean tagsChanged, Post post -) implements DomainEvent {} +) implements DomainEvent, Serializable {} From eeab1b6d6372b8bdc310bcf9279c0aaaa1ff45c2 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Fri, 17 Oct 2025 19:22:59 +0900 Subject: [PATCH 04/34] feat(kafka): implement PostEventProducer for Kafka event publishing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PostEventProducer 클래스 추가하여 Kafka를 통해 도메인 이벤트 게시를 지원 - Spring Kafka를 사용해 POST 이벤트를 지정된 토픽(post-events)으로 발행 - 발행 성공 및 실패 로깅을 포함해 이벤트 전달 과정의 투명성 강화 - 메시징 시스템과의 통합으로 시스템 확장성 및 비동기 처리 효율성 향상 --- .../integration/kafka/PostEventProducer.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java new file mode 100644 index 00000000..d1c318b0 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java @@ -0,0 +1,27 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostEventPublisher; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PostEventProducer implements PostEventPublisher { + private final KafkaTemplate kafkaTemplate; + + private static final String TOPIC = "post-events"; + + @Override + public void publish(DomainEvent event) { + try { + kafkaTemplate.send(TOPIC, event); + log.info("Kafka에 이벤트 발행 성공: {}", event); + } catch (Exception e) { + log.error("Kafka 이벤트 발행 실패: {}", event, e); + } + } +} From d5edabd5195110b19501a60f82536877d14b0ce6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Fri, 17 Oct 2025 23:31:01 +0900 Subject: [PATCH 05/34] feat(kafka): add producer configuration for Kafka integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaProducerConfig 클래스 추가하여 Kafka 프로듀서 설정 정의 - Bootstrap 서버, Key/Value 직렬화 설정 및 JsonSerializer 옵션 적용 - KafkaTemplate 빈 등록으로 프로듀서 사용 간소화 - Kafka 기반 메시징 시스템 연동 준비 작업 완료 --- .../kafka/config/KafkaProducerConfig.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java new file mode 100644 index 00000000..60635c2f --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,32 @@ +package dooya.see.adapter.integration.kafka.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + + return new DefaultKafkaProducerFactory<>(config); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} From f750cc2521298a7c6c13e4eab1ef4ebd7e86cad6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 02:28:08 +0900 Subject: [PATCH 06/34] feat(kafka): add consumer and configuration for event processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `PostEventConsumer` 클래스 추가로 Kafka의 post-events 토픽 이벤트 처리 기능 구현 - `KafkaConsumerConfig` 클래스 추가하여 Kafka 소비자 설정 정의: - 기본 서버, 그룹 ID, 직렬화/역직렬화 옵션 설정 - ConsumerFactory와 KafkaListenerContainerFactory 빈 등록으로 간소화된 소비자 관리 - `PostSearchIndexer`와 통합하여 Post 생성 이벤트(PostCreated)를 Elasticsearch에 인덱싱 - 미처리 가능한 이벤트 유형은 로그 경고로 처리 - Kafka 메시징의 소비 기능 추가로 이벤트 기반 비동기 처리 완성 --- .../integration/kafka/PostEventConsumer.java | 31 +++++++++++++ .../kafka/config/KafkaConsumerConfig.java | 43 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java new file mode 100644 index 00000000..74594096 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java @@ -0,0 +1,31 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.event.PostCreated; +import dooya.see.domain.post.event.PostUpdated; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PostEventConsumer { + private final PostSearchIndexer postSearchIndexer; + + @KafkaListener(topics = "post-events", groupId = "post-indexr-group") + public void consume(Object event) { + log.info("Kafka Event 수신: {}", event); + + try { + if (event instanceof PostCreated created) { + postSearchIndexer.index(created.post()); + } else { + log.warn("처리 불가능한 이벤트 타입: {}", event.getClass().getSimpleName()); + } + } catch (Exception e) { + log.error("Kafka 이벤트 처리 중 오류 발생: {}", event, e); + } + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 00000000..c435bcc6 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,43 @@ +package dooya.see.adapter.integration.kafka.config; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + @Bean + public ConsumerFactory consumerFactory() { + JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); + jsonDeserializer.addTrustedPackages("*"); + + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "post-indexr-group"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(1); + return factory; + } +} From f18a24208bff2dc9f1862252f3b8376a9401f9ee Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:25:28 +0900 Subject: [PATCH 07/34] feat(kafka): implement post event processing and message handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `PostEventMessage` 및 `PostEventType` 추가로 Kafka 메시지 모델 정의 - `PostEventProcessor` 구현으로 Kafka 메시지 및 도메인 이벤트 처리: - Post 생성/업데이트 이벤트 메시지로 Elasticsearch 색인 생성/갱신 - Post 삭제 이벤트 메시지로 Elasticsearch 색인 제거 - 미처리 가능한 이벤트 및 메시지 타입을 로그로 기록하여 문제 식별 용이성 확보 - Kafka 통합의 이벤트 처리 역량 확장 및 비동기 처리 완성도 개선 --- .../integration/kafka/PostEventMessage.java | 17 ++++ .../integration/kafka/PostEventProcessor.java | 99 +++++++++++++++++++ .../integration/kafka/PostEventType.java | 7 ++ 3 files changed, 123 insertions(+) create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java new file mode 100644 index 00000000..decf3a25 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventMessage.java @@ -0,0 +1,17 @@ +package dooya.see.adapter.integration.kafka; + +import java.io.Serializable; + +public record PostEventMessage(PostEventType type, Long postId) implements Serializable { + static PostEventMessage created(Long postId) { + return new PostEventMessage(PostEventType.CREATED, postId); + } + + static PostEventMessage updated(Long postId) { + return new PostEventMessage(PostEventType.UPDATED, postId); + } + + static PostEventMessage deleted(Long postId) { + return new PostEventMessage(PostEventType.DELETED, postId); + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java new file mode 100644 index 00000000..bcb6be5d --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProcessor.java @@ -0,0 +1,99 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostRepository; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.event.PostCreated; +import dooya.see.domain.post.event.PostDeleted; +import dooya.see.domain.post.event.PostUpdated; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +class PostEventProcessor { + private final PostSearchIndexer postSearchIndexer; + private final PostRepository postRepository; + + void process(DomainEvent event) { + if (event instanceof PostCreated created) { + handlePostCreated(created); + } else if (event instanceof PostUpdated updated) { + handlePostUpdated(updated); + } else if (event instanceof PostDeleted deleted) { + handlePostDeleted(deleted); + } else { + log.debug("처리할 필요 없는 이벤트 타입: {}", event.getClass().getName()); + } + } + + void process(PostEventMessage message) { + if (message == null) { + return; + } + + try { + switch (message.type()) { + case CREATED, UPDATED -> indexLatestSnapshot(message.postId()); + case DELETED -> deleteIndex(message.postId()); + default -> log.debug("처리할 필요 없는 메시지 타입: {}", message.type()); + } + } catch (Exception ex) { + log.error("Kafka 이벤트 메시지 처리 실패: {}", message, ex); + } + } + + private void indexLatestSnapshot(Long postId) { + postRepository.findById(postId) + .ifPresentOrElse(this::safeIndex, + () -> log.warn("색인 대상 게시글을 찾을 수 없습니다: postId={}", postId)); + } + + private void deleteIndex(Long postId) { + try { + log.debug("Kafka 메시지 처리 - 게시글 삭제 색인 제거: {}", postId); + postSearchIndexer.delete(postId); + } catch (Exception e) { + log.error("게시글 삭제 색인 처리 실패: postId={}", postId, e); + } + } + + private void safeIndex(Post post) { + try { + log.debug("Kafka 메시지 처리 - 게시글 색인 갱신: {}", post.getId()); + postSearchIndexer.index(post); + } catch (Exception e) { + log.error("게시글 색인 처리 실패: postId={}", post.getId(), e); + } + } + + private void handlePostCreated(PostCreated event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 생성 색인: {}", event.postId()); + postSearchIndexer.index(event.post()); + } catch (Exception e) { + log.error("게시글 생성 색인 처리 실패: {}", event, e); + } + } + + private void handlePostUpdated(PostUpdated event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 업데이트 색인: {}", event.postId()); + postSearchIndexer.index(event.post()); + } catch (Exception e) { + log.error("게시글 업데이트 색인 처리 실패: {}", event, e); + } + } + + private void handlePostDeleted(PostDeleted event) { + try { + log.debug("도메인 이벤트 처리 - 게시글 삭제 색인 제거: {}", event.postId()); + postSearchIndexer.delete(event.postId()); + } catch (Exception e) { + log.error("게시글 삭제 색인 처리 실패: {}", event, e); + } + } +} diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java new file mode 100644 index 00000000..0bbbf353 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventType.java @@ -0,0 +1,7 @@ +package dooya.see.adapter.integration.kafka; + +public enum PostEventType { + CREATED, + UPDATED, + DELETED +} From bda2fcef489fe3f6eb351f470a2211d6493bbc59 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:26:51 +0900 Subject: [PATCH 08/34] feat(kafka): enhance producer and consumer configurations for flexibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaProducerConfig 및 KafkaConsumerConfig에 @ConditionalOnProperty 추가로 기능 비활성화 가능 - KafkaProperties를 활용한 설정 간소화 및 대체 설정 지원 - PostEventMessage 기반 프로듀서/소비자 설정으로 메시징 체계 정교화 - JsonSerializer 설정 개선, 타입 정보 유지 및 신뢰된 패키지 설정 추가 - PostEventConsumer 리팩토링으로 PostEventProcessor와 메시지 처리 일원화 - 이벤트 타입별 처리 로직 단순화 및 테스트 용이성 확보 - 설정 및 처리 로직의 유연성 향상으로 Kafka 통합의 효율성과 확장성 증대 --- .../integration/kafka/PostEventConsumer.java | 27 +++++------- .../kafka/config/KafkaConsumerConfig.java | 42 ++++++++++++------- .../kafka/config/KafkaProducerConfig.java | 23 ++++++---- 3 files changed, 53 insertions(+), 39 deletions(-) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java index 74594096..0f174713 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java @@ -1,31 +1,24 @@ package dooya.see.adapter.integration.kafka; -import dooya.see.application.post.required.PostSearchIndexer; -import dooya.see.domain.post.event.PostCreated; -import dooya.see.domain.post.event.PostUpdated; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component @RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") public class PostEventConsumer { - private final PostSearchIndexer postSearchIndexer; + private final PostEventProcessor postEventProcessor; - @KafkaListener(topics = "post-events", groupId = "post-indexr-group") - public void consume(Object event) { - log.info("Kafka Event 수신: {}", event); - - try { - if (event instanceof PostCreated created) { - postSearchIndexer.index(created.post()); - } else { - log.warn("처리 불가능한 이벤트 타입: {}", event.getClass().getSimpleName()); - } - } catch (Exception e) { - log.error("Kafka 이벤트 처리 중 오류 발생: {}", event, e); - } + @KafkaListener( + topics = "${see.kafka.topics.post-events:post-events}", + groupId = "${spring.kafka.consumer.group-id:post-indexer-group}" + ) + public void consume(PostEventMessage message) { + log.debug("Kafka 이벤트 수신: {}", message); + postEventProcessor.process(message); } } diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java index c435bcc6..1102007d 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java @@ -1,42 +1,54 @@ package dooya.see.adapter.integration.kafka.config; +import dooya.see.adapter.integration.kafka.PostEventMessage; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") public class KafkaConsumerConfig { - @Bean - public ConsumerFactory consumerFactory() { - JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); - jsonDeserializer.addTrustedPackages("*"); + private final KafkaProperties kafkaProperties; + + public KafkaConsumerConfig(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } - Map config = new HashMap<>(); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); - config.put(ConsumerConfig.GROUP_ID_CONFIG, "post-indexr-group"); + @Bean + public ConsumerFactory postEventConsumerFactory() { + Map config = new HashMap<>(kafkaProperties.buildConsumerProperties(null)); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); - config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + config.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "post-indexer-group"); + config.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + JsonDeserializer jsonDeserializer = new JsonDeserializer<>(PostEventMessage.class); + jsonDeserializer.addTrustedPackages("dooya.see.adapter.integration.kafka"); + jsonDeserializer.setRemoveTypeHeaders(false); + jsonDeserializer.setUseTypeMapperForKey(false); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer); } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory postEventConsumerFactory + ) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); + factory.setConsumerFactory(postEventConsumerFactory); factory.setConcurrency(1); return factory; } diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java index 60635c2f..3eb1723b 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java @@ -1,7 +1,10 @@ package dooya.see.adapter.integration.kafka.config; +import dooya.see.adapter.integration.kafka.PostEventMessage; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -13,20 +16,26 @@ import java.util.Map; @Configuration +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") public class KafkaProducerConfig { + private final KafkaProperties kafkaProperties; + + public KafkaProducerConfig(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + @Bean - public ProducerFactory producerFactory() { - Map config = new HashMap<>(); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); + public ProducerFactory postEventProducerFactory() { + Map config = new HashMap<>(kafkaProperties.buildProducerProperties(null)); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true); return new DefaultKafkaProducerFactory<>(config); } @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); + public KafkaTemplate postEventKafkaTemplate(ProducerFactory postEventProducerFactory) { + return new KafkaTemplate<>(postEventProducerFactory); } } From ce6a906376d24f82f9357c1b93b2cb39245e5ad0 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:27:10 +0900 Subject: [PATCH 09/34] feat(kafka): enhance PostEventProducer for transactional messaging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Post 이벤트 발행 시 트랜잭션 동기화를 지원하여 커밋 후 이벤트를 발행 - @ConditionalOnProperty 추가로 Kafka 통합 비활성화 가능 - 이벤트 메시지로 `PostEventMessage`를 사용하며 PostCreated, PostUpdated, PostDeleted 이벤트 변환 처리 - 토픽명을 외부 설정에서 가져오도록 개선, 기본값으로 `"post-events"` 설정 - 발행 성공/실패 시 디버그 및 에러 로깅 추가로 문제 분석 용이성 향상 - Kafka 메시징의 유연성 및 안정성을 향상시켜 메시지 손실 가능성 최소화 --- .../integration/kafka/PostEventProducer.java | 69 +++++++++++++++++-- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java index d1c318b0..a4afb478 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java @@ -1,27 +1,86 @@ package dooya.see.adapter.integration.kafka; import dooya.see.application.post.required.PostEventPublisher; +import dooya.see.domain.post.event.PostCreated; +import dooya.see.domain.post.event.PostDeleted; +import dooya.see.domain.post.event.PostUpdated; import dooya.see.domain.shared.DomainEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; @Slf4j @Component @RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") public class PostEventProducer implements PostEventPublisher { - private final KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; - private static final String TOPIC = "post-events"; + @Value("${see.kafka.topics.post-events:post-events}") + private String topic; @Override public void publish(DomainEvent event) { + PostEventMessage message = toMessage(event); + if (message == null) { + if (log.isDebugEnabled()) { + log.debug("Kafka 전송 대상이 아닌 이벤트입니다: {}", event.getClass().getName()); + } + return; + } + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + doSend(message); + } + }); + } else { + doSend(message); + } + } + + private PostEventMessage toMessage(DomainEvent event) { + if (event instanceof PostCreated created) { + return PostEventMessage.created(created.postId()); + } + if (event instanceof PostUpdated updated) { + return PostEventMessage.updated(updated.postId()); + } + if (event instanceof PostDeleted deleted) { + return PostEventMessage.deleted(deleted.postId()); + } + return null; + } + + private void doSend(PostEventMessage message) { try { - kafkaTemplate.send(TOPIC, event); - log.info("Kafka에 이벤트 발행 성공: {}", event); + kafkaTemplate.send(topic, message) + .whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("Kafka 이벤트 발행 실패: {}", message, throwable); + return; + } + if (result != null && log.isDebugEnabled()) { + log.debug("Kafka 이벤트 발행 성공: topic={}, partition={}, offset={}, payload={}", + result.getRecordMetadata().topic(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset(), + message); + } + }) + .exceptionally(throwable -> { + log.error("Kafka 이벤트 발행 실패: {}", message, throwable); + return null; + }); } catch (Exception e) { - log.error("Kafka 이벤트 발행 실패: {}", event, e); + log.error("Kafka 이벤트 발행 실패: {}", message, e); } } } From caf4d3387781072f5db9f8c57ad7274ca503b126 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:27:21 +0900 Subject: [PATCH 10/34] feat(post): replace PostSearchIndexer with PostEventPublisher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 기존 Elasticsearch 연동 `PostSearchIndexer` 대신 `PostEventPublisher`로 이벤트 처리 방식 변경 - 게시글 생성, 업데이트, 삭제 시 Post 도메인 이벤트를 외부 파이프라인으로 위임하도록 수정 - 중복 코드 제거를 위해 `publish` 메서드로 공통 로직 통합 - 도메인 이벤트 기반 아키텍처로 확장성 및 유지보수성 개선 --- .../application/post/PostEventHandler.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/java/dooya/see/application/post/PostEventHandler.java b/src/main/java/dooya/see/application/post/PostEventHandler.java index 11344965..2c4426eb 100644 --- a/src/main/java/dooya/see/application/post/PostEventHandler.java +++ b/src/main/java/dooya/see/application/post/PostEventHandler.java @@ -1,9 +1,10 @@ package dooya.see.application.post; -import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.application.post.required.PostEventPublisher; import dooya.see.domain.post.event.PostCreated; import dooya.see.domain.post.event.PostDeleted; import dooya.see.domain.post.event.PostUpdated; +import dooya.see.domain.shared.DomainEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -13,23 +14,25 @@ @Component @RequiredArgsConstructor public class PostEventHandler { - private final PostSearchIndexer postSearchIndexer; + private final PostEventPublisher postEventPublisher; @EventListener public void handlePostCreated(PostCreated event) { - log.info("[ES] 게시글 생성 색인: {}", event.postId()); - postSearchIndexer.index(event.post()); + publish(event); } @EventListener public void handlePostUpdated(PostUpdated event) { - log.info("[ES] 게시글 업데이트 색인: {}", event.postId()); - postSearchIndexer.index(event.post()); + publish(event); } @EventListener public void handlePostDeleted(PostDeleted event) { - log.info("[ES] 게시글 색인 삭제: {}", event.postId()); - postSearchIndexer.delete(event.postId()); + publish(event); + } + + private void publish(DomainEvent event) { + log.debug("도메인 이벤트를 외부 파이프라인으로 위임: {}", event); + postEventPublisher.publish(event); } } From 90b87f82e0905d6b18402a043b5459a720b00b1f Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:27:30 +0900 Subject: [PATCH 11/34] feat(kafka): add DirectPostEventPublisher for handling events locally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - see.kafka.enabled=false 시 도메인 이벤트를 Kafka 대신 직접 처리하는 DirectPostEventPublisher 추가 - @ConditionalOnProperty를 사용해 설정 기반으로 컴포넌트 활성화 제어 - Kafka 비활성화 환경에서도 안정적으로 이벤트를 처리할 수 있도록 구현 - PostEventProcessor를 활용해 도메인 이벤트를 동기적으로 처리하며 디버그 로깅 추가 --- .../kafka/DirectPostEventPublisher.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java diff --git a/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java b/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java new file mode 100644 index 00000000..dbe2f0c4 --- /dev/null +++ b/src/main/java/dooya/see/adapter/integration/kafka/DirectPostEventPublisher.java @@ -0,0 +1,22 @@ +package dooya.see.adapter.integration.kafka; + +import dooya.see.application.post.required.PostEventPublisher; +import dooya.see.domain.shared.DomainEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "false", matchIfMissing = true) +public class DirectPostEventPublisher implements PostEventPublisher { + private final PostEventProcessor postEventProcessor; + + @Override + public void publish(DomainEvent event) { + log.debug("Kafka 비활성화 상태 - 도메인 이벤트를 직접 처리합니다: {}", event); + postEventProcessor.process(event); + } +} From cd3c782138c717c774f08309ae3c37b8e3862aea Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:27:44 +0900 Subject: [PATCH 12/34] feat(config): simplify test configuration and add Kafka toggle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - application-test.yml 축소 및 불필요한 설정 제거로 단순화 - hibernate.ddl-auto를 `update`로 변경하여 테스트 환경 자동화 - Hibernate 관련 세부 설정 제거 - application.properties 파일에 `spring.profiles.active=test` 추가 - Kafka 설정 플래그(`see.kafka.enabled`) 제거로 간단한 테스트 구성 지원 - 설정 간소화와 효율성 개선으로 테스트 유지보수성 증가 --- src/test/resources/application-test.yml | 21 +++++---------------- src/test/resources/application.properties | 1 + 2 files changed, 6 insertions(+), 16 deletions(-) create mode 100644 src/test/resources/application.properties diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index c8dbdac6..d84f75f7 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -1,23 +1,12 @@ spring: - datasource: - url: jdbc:mysql://localhost:3306/see_test?serverTimezone=Asia/Seoul&useSSL=false&allowPublicKeyRetrieval=true - username: see_user - password: see_password - driver-class-name: com.mysql.cj.jdbc.Driver - jpa: hibernate: - ddl-auto: none - properties: - hibernate: - dialect: org.hibernate.dialect.MySQL8Dialect # ✅ 이 한 줄 추가! - jdbc: - batch_size: 1000 - order_updates: true - order_inserts: true - show-sql: true # SQL 출력 원하면 추가 + ddl-auto: update logging: level: dooya.see: INFO - org.hibernate.SQL: DEBUG \ No newline at end of file + +see: + kafka: + enabled: false diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties new file mode 100644 index 00000000..f841722a --- /dev/null +++ b/src/test/resources/application.properties @@ -0,0 +1 @@ +spring.profiles.active=test From 6d413b0caa1cd996b09f4d6db147bf120c7739cd Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:27:57 +0900 Subject: [PATCH 13/34] feat(test): add integration tests for post event pipelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 활성화 및 비활성화 여부에 따라 이벤트 처리 방식 검증을 위한 통합 테스트 추가 - PostEventDirectPipelineTest: Kafka 비활성화 시 도메인 이벤트가 바로 처리되는 시나리오 테스트 - PostEventKafkaPipelineTest: Kafka 활성화 환경에서 이벤트가 적절히 색인되고 삭제되는 것을 검증 - 테스트 환경에서 재사용 가능한 RecordingPostSearchIndexer로 이벤트 처리 기록 및 검증 지원 - 도메인 이벤트 처리 방식 변화에 대한 신뢰성 및 안정성 확보 --- .../post/PostEventDirectPipelineTest.java | 113 +++++++++++++++ .../post/PostEventKafkaPipelineTest.java | 134 ++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java create mode 100644 src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java diff --git a/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java b/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java new file mode 100644 index 00000000..a0dfa9e2 --- /dev/null +++ b/src/test/java/dooya/see/application/post/PostEventDirectPipelineTest.java @@ -0,0 +1,113 @@ +package dooya.see.application.post; + +import dooya.see.application.post.provided.PostManager; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.dto.PostUpdateRequest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import static dooya.see.domain.post.PostFixture.createPostRequest; +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(properties = "see.kafka.enabled=false") +class PostEventDirectPipelineTest { + + private final PostManager postManager; + private final RecordingPostSearchIndexer recordingIndexer; + + PostEventDirectPipelineTest(PostManager postManager, RecordingPostSearchIndexer recordingIndexer) { + this.postManager = postManager; + this.recordingIndexer = recordingIndexer; + } + + @BeforeEach + void setUp() { + recordingIndexer.reset(); + } + + @Test + void Kafka_비활성화_시에도_도메인_이벤트가_즉시_색인된다() { + Post created = postManager.create(createPostRequest(), authorId()); + + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId())); + } + + @Test + void Kafka_비활성화_시_수정_이벤트도_즉시_처리된다() { + Post created = postManager.create(createPostRequest(), authorId()); + recordingIndexer.reset(); + + PostUpdateRequest updateRequest = new PostUpdateRequest( + Optional.of("직접 변경된 제목"), + Optional.of("직접 변경된 본문"), + Optional.empty(), + Optional.empty() + ); + + Post updated = postManager.update(updateRequest, created.getId(), authorId()); + + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(updated.getId()) + && post.getContent().title().equals("직접 변경된 제목")); + } + + @Test + void Kafka_비활성화_시_삭제_이벤트도_즉시_처리된다() { + Post created = postManager.create(createPostRequest(), authorId()); + recordingIndexer.reset(); + + postManager.delete(created.getId(), authorId()); + + assertThat(recordingIndexer.deletedPostIds()).contains(created.getId()); + } + + private static Long authorId() { + return 1L; + } + + static class RecordingPostSearchIndexer implements PostSearchIndexer { + private final CopyOnWriteArrayList indexed = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList deleted = new CopyOnWriteArrayList<>(); + + @Override + public void index(Post post) { + indexed.add(post); + } + + @Override + public void delete(Long postId) { + deleted.add(postId); + } + + List indexedPosts() { + return List.copyOf(indexed); + } + + List deletedPostIds() { + return List.copyOf(deleted); + } + + void reset() { + indexed.clear(); + deleted.clear(); + } + } + + @org.springframework.boot.test.context.TestConfiguration + static class TestConfiguration { + @Bean + @Primary + RecordingPostSearchIndexer recordingPostSearchIndexer() { + return new RecordingPostSearchIndexer(); + } + } +} diff --git a/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java new file mode 100644 index 00000000..7ba80726 --- /dev/null +++ b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java @@ -0,0 +1,134 @@ +package dooya.see.application.post; + +import dooya.see.application.post.provided.PostManager; +import dooya.see.application.post.required.PostSearchIndexer; +import dooya.see.domain.post.Post; +import dooya.see.domain.post.dto.PostCreateRequest; +import dooya.see.domain.post.dto.PostUpdateRequest; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import static dooya.see.domain.post.PostFixture.createPostRequest; +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(properties = { + "see.kafka.enabled=true", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" +}) +@EmbeddedKafka(partitions = 1, topics = "${see.kafka.topics.post-events:post-events}") +@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") +class PostEventKafkaPipelineTest { + + private final PostManager postManager; + private final RecordingPostSearchIndexer recordingIndexer; + + PostEventKafkaPipelineTest(PostManager postManager, RecordingPostSearchIndexer recordingIndexer) { + this.postManager = postManager; + this.recordingIndexer = recordingIndexer; + } + + @BeforeEach + void setUp() { + recordingIndexer.reset(); + } + + @Test + void 게시글_생성_이벤트는_Kafka를_통해_색인된다() { + Post created = postManager.create(createPostRequest(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + } + + @Test + void 게시글_수정_이벤트는_Kafka를_통해_색인이_갱신된다() { + Post created = postManager.create(createPostRequest(), authorId()); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + + recordingIndexer.reset(); + + PostUpdateRequest updateRequest = new PostUpdateRequest( + Optional.of("변경된 제목"), + Optional.of("변경된 본문"), + Optional.empty(), + Optional.empty() + ); + + Post updated = postManager.update(updateRequest, created.getId(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(updated.getId()) + && post.getContent().title().equals("변경된 제목"))); + } + + @Test + void 게시글_삭제_이벤트는_Kafka를_통해_색인이_삭제된다() { + Post created = postManager.create(createPostRequest(), authorId()); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.indexedPosts()) + .anyMatch(post -> post.getId().equals(created.getId()))); + + recordingIndexer.reset(); + + postManager.delete(created.getId(), authorId()); + + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(recordingIndexer.deletedPostIds()).contains(created.getId())); + } + + private static Long authorId() { + return 1L; + } + + static class RecordingPostSearchIndexer implements PostSearchIndexer { + private final CopyOnWriteArrayList indexed = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList deleted = new CopyOnWriteArrayList<>(); + + @Override + public void index(Post post) { + indexed.add(post); + } + + @Override + public void delete(Long postId) { + deleted.add(postId); + } + + List indexedPosts() { + return List.copyOf(indexed); + } + + List deletedPostIds() { + return List.copyOf(deleted); + } + + void reset() { + indexed.clear(); + deleted.clear(); + } + } + + @org.springframework.boot.test.context.TestConfiguration + static class TestConfiguration { + @Bean + @Primary + RecordingPostSearchIndexer recordingPostSearchIndexer() { + return new RecordingPostSearchIndexer(); + } + } +} From bbb4623e19c2a222a5e6f3a4cc4546806ebb029f Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:28:06 +0900 Subject: [PATCH 14/34] feat(test): remove PostEventHandlerTest for outdated indexing logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PostEventHandlerTest 제거. 테스트에서 사용된 색인 로직은 Post 도메인 이벤트와 Kafka 처리로 대체 - 기존 테스트가 새로운 이벤트 처리 및 색인 파이프라인과 더 이상 호환되지 않음 - 유지보수성 향상을 위해 불필요한 테스트 코드 삭제 --- .../post/PostEventHandlerTest.java | 28 ------------------- 1 file changed, 28 deletions(-) delete mode 100644 src/test/java/dooya/see/application/post/PostEventHandlerTest.java diff --git a/src/test/java/dooya/see/application/post/PostEventHandlerTest.java b/src/test/java/dooya/see/application/post/PostEventHandlerTest.java deleted file mode 100644 index c55ccfe5..00000000 --- a/src/test/java/dooya/see/application/post/PostEventHandlerTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package dooya.see.application.post; - -import dooya.see.adapter.search.elasticsearch.repository.PostSearchElasticsearchRepository; -import dooya.see.application.post.provided.PostManager; -import dooya.see.domain.post.Post; -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -import java.time.Duration; - -import static dooya.see.domain.post.PostFixture.createPostRequest; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -@SpringBootTest -record PostEventHandlerTest( - PostManager postManager, - PostSearchElasticsearchRepository searchRepository) { - private static final Long AUTHOR_ID = 1L; - - @Test - void 게시글_생성시_자동으로_색인된다() { - Post post = postManager.create(createPostRequest(), AUTHOR_ID); - - await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> - assertThat(searchRepository.findById(String.valueOf(post.getId()))).isPresent()); - } -} \ No newline at end of file From 83116ceab67ff46cc6172a56864a5c7626c0033b Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:29:40 +0900 Subject: [PATCH 15/34] feat(ci): add Kafka and Zookeeper services to CI pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka와 Zookeeper를 CI 환경에 추가하여 테스트 시 메시징 연동 검증 가능 - 서비스 의존성 대기 로직 수정으로 Elasticsearch, Kafka, Zookeeper 준비 상태 확인 - SPRING_KAFKA_BOOTSTRAP_SERVERS 환경 변수 추가로 테스트 환경 설정 개선 - CI 파이프라인 확장으로 메시징 관련 통합 테스트 신뢰성 증대 --- .github/workflows/ci.yml | 95 ++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 92dba17b..d23db2ab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,38 @@ jobs: --health-timeout 5s --health-retries 10 + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + ports: + - 2181:2181 + env: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + options: >- + --health-cmd "echo ruok | nc localhost 2181 | grep imok || exit 1" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + + kafka: + image: confluentinc/cp-kafka:7.6.1 + ports: + - 9092:9092 + env: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + options: >- + --health-cmd "nc -z localhost 9092" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + depends_on: + - zookeeper + steps: - name: Check out repository uses: actions/checkout@v4 @@ -57,39 +89,29 @@ jobs: - name: Validate Gradle wrapper uses: gradle/wrapper-validation-action@v2 - with: - min-wrapper-count: 1 - allow-snapshots: false - name: Make gradlew executable run: chmod +x ./gradlew - - name: Verify Gradle Wrapper Checksum - run: | - if [ -f "gradle/wrapper/gradle-wrapper.properties" ]; then - echo "Gradle wrapper configuration verified" - cat gradle/wrapper/gradle-wrapper.properties - fi - - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: gradle-build-scan-report: true cache-read-only: false - - name: Wait for Elasticsearch to be ready + - name: Wait for services to be ready run: | - echo "Waiting for Elasticsearch..." + echo "Waiting for Elasticsearch, Kafka, and Zookeeper..." for i in {1..20}; do - if curl -fsS http://localhost:9200 >/dev/null; then - echo "Elasticsearch is up!" + if curl -fsS http://localhost:9200 >/dev/null && nc -z localhost 9092 && nc -z localhost 2181; then + echo "All services are up!" exit 0 fi - echo "Retrying in 3s..." - sleep 3 - done - echo "Elasticsearch did not become ready in time." - exit 1 + echo "Retrying in 5s..." + sleep 5 + done + echo "Services did not become ready in time." + exit 1 - name: Run tests run: | @@ -102,6 +124,7 @@ jobs: -Dspring.profiles.active=test env: SPRING_ELASTICSEARCH_URIS: http://localhost:9200 + SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - name: Upload test results uses: actions/upload-artifact@v4 @@ -111,36 +134,4 @@ jobs: path: | build/reports/tests/ build/test-results/ - retention-days: 7 - - build-check: - runs-on: ubuntu-latest - if: github.event_name == 'pull_request' - - steps: - - name: Check out repository - uses: actions/checkout@v4 - with: - fetch-depth: 1 - - - name: Set up JDK 21 - uses: actions/setup-java@v4 - with: - java-version: '21' - distribution: 'temurin' - cache: gradle - - - name: Create application.yml file - run: | - mkdir -p ./src/main/resources - echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml - - - name: Setup Gradle - uses: gradle/gradle-build-action@v2 - - - name: Compile check - run: | - ./gradlew compileJava compileTestJava \ - --parallel \ - --build-cache \ - --configuration-cache \ No newline at end of file + retention-days: 7 \ No newline at end of file From 60a3b568e566002d46779a8cdd083c495640edcf Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:35:38 +0900 Subject: [PATCH 16/34] feat(ci): improve CI pipeline step readability with emojis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 각 CI 작업 단계에 적절한 이모지를 추가하여 가독성을 개선 - 서비스 준비 확인 로직 강화: 개별 서비스 상태(Zookeeper, Kafka, Elasticsearch) 확인으로 디버깅 용이성 향상 - 메시지 출력 개선: 서비스 준비 상태 및 재시도 상태를 직관적으로 전달 - 테스트 신뢰성과 개발 경험 개선을 위해 워크플로우 세부사항을 다듬음 --- .github/workflows/ci.yml | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d23db2ab..33759c6a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,59 +61,63 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 10 - depends_on: - - zookeeper steps: - - name: Check out repository + - name: 🧩 Checkout repository uses: actions/checkout@v4 with: fetch-depth: 1 - - name: Set up JDK 21 + - name: ☕️ Set up JDK 21 uses: actions/setup-java@v4 with: java-version: '21' distribution: 'temurin' cache: gradle - - name: Create application.yml file + - name: 📝 Create application.yml run: | mkdir -p ./src/main/resources echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml - - name: Disable IPv6 + - name: 🧱 Disable IPv6 (network stability) run: | sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1 - - name: Validate Gradle wrapper + - name: 🧩 Validate Gradle Wrapper uses: gradle/wrapper-validation-action@v2 - - name: Make gradlew executable + - name: 🧍 Make gradlew executable run: chmod +x ./gradlew - - name: Setup Gradle + - name: ⚙️ Setup Gradle uses: gradle/gradle-build-action@v2 with: gradle-build-scan-report: true cache-read-only: false - - name: Wait for services to be ready + - name: 🕐 Wait for Kafka, Zookeeper, and Elasticsearch run: | - echo "Waiting for Elasticsearch, Kafka, and Zookeeper..." - for i in {1..20}; do - if curl -fsS http://localhost:9200 >/dev/null && nc -z localhost 9092 && nc -z localhost 2181; then - echo "All services are up!" + echo "Waiting for Elasticsearch, Zookeeper, and Kafka to be ready..." + for i in {1..25}; do + es_ready=$(curl -fsS http://localhost:9200/_cluster/health > /dev/null && echo "yes" || echo "no") + zk_ready=$(echo ruok | nc localhost 2181 | grep imok > /dev/null && echo "yes" || echo "no") + kafka_ready=$(nc -z localhost 9092 && echo "yes" || echo "no") + + if [ "$es_ready" = "yes" ] && [ "$zk_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then + echo "✅ All services are ready!" exit 0 fi - echo "Retrying in 5s..." + + echo "⏳ Waiting for services... retrying in 5s ($i/25)" sleep 5 done - echo "Services did not become ready in time." + + echo "❌ Services did not start in time" exit 1 - - name: Run tests + - name: 🧪 Run tests run: | ./gradlew clean test \ --parallel \ @@ -126,7 +130,7 @@ jobs: SPRING_ELASTICSEARCH_URIS: http://localhost:9200 SPRING_KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - - name: Upload test results + - name: 📦 Upload test results uses: actions/upload-artifact@v4 if: always() with: From c8040659eca9069f72b2b728424bde826202e85a Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:41:15 +0900 Subject: [PATCH 17/34] feat(ci): add compile check workflow for PR validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 새로운 CI 워크플로우 추가로 PR 생성 시 코드 컴파일 체크 수행 - Java 21 및 Gradle 설정 포함하여 최신 환경에서의 테스트 및 빌드를 지원 - secret 기반 application.yml 생성으로 안전한 설정 관리 구현 - 병렬 및 캐시 사용 설정으로 CI 속도 최적화 - 코드 품질 및 변경 사항 신뢰성 검증 강화 --- .github/workflows/ci.yml | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 33759c6a..12742994 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -138,4 +138,35 @@ jobs: path: | build/reports/tests/ build/test-results/ + retention-days: 7 + build-check: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + + steps: + - name: Check out repository + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Set up JDK 21 + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'temurin' + cache: gradle + + - name: Create application.yml file + run: | + mkdir -p ./src/main/resources + echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + + - name: Compile check + run: | + ./gradlew compileJava compileTestJava \ + --parallel \ + --build-cache \ + --configuration-cache retention-days: 7 \ No newline at end of file From c4eac2623b69266bed72f96393ad3c97c70685d3 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:47:00 +0900 Subject: [PATCH 18/34] feat(ci): remove Zookeeper and migrate Kafka to KRaft mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CI 파이프라인에서 Zookeeper를 제거하고 Kafka를 KRaft 모드로 전환 - KRaft 기반 설정 추가(KAFKA_PROCESS_ROLES, KAFKA_NODE_ID 등)로 Zookeeper 의존성 제거 - Kafka 서비스 구성 단순화 및 트랜잭션 관련 설정으로 테스트 신뢰성 강화 - 서비스 준비 대기 로직에서 Zookeeper 제거로 불필요한 대기 시간 감소 - 워크플로우 유지보수성을 개선하며 최신 Kafka 설정 적용 --- .github/workflows/ci.yml | 55 ++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 12742994..90075aac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,30 +32,21 @@ jobs: --health-timeout 5s --health-retries 10 - zookeeper: - image: confluentinc/cp-zookeeper:7.6.1 - ports: - - 2181:2181 - env: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - options: >- - --health-cmd "echo ruok | nc localhost 2181 | grep imok || exit 1" - --health-interval 10s - --health-timeout 5s - --health-retries 10 - kafka: image: confluentinc/cp-kafka:7.6.1 ports: - 9092:9092 env: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs options: >- --health-cmd "nc -z localhost 9092" --health-interval 10s @@ -97,15 +88,14 @@ jobs: gradle-build-scan-report: true cache-read-only: false - - name: 🕐 Wait for Kafka, Zookeeper, and Elasticsearch + - name: 🕐 Wait for Kafka and Elasticsearch run: | - echo "Waiting for Elasticsearch, Zookeeper, and Kafka to be ready..." + echo "Waiting for Elasticsearch and Kafka to be ready..." for i in {1..25}; do es_ready=$(curl -fsS http://localhost:9200/_cluster/health > /dev/null && echo "yes" || echo "no") - zk_ready=$(echo ruok | nc localhost 2181 | grep imok > /dev/null && echo "yes" || echo "no") kafka_ready=$(nc -z localhost 9092 && echo "yes" || echo "no") - if [ "$es_ready" = "yes" ] && [ "$zk_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then + if [ "$es_ready" = "yes" ] && [ "$kafka_ready" = "yes" ]; then echo "✅ All services are ready!" exit 0 fi @@ -139,34 +129,45 @@ jobs: build/reports/tests/ build/test-results/ retention-days: 7 + build-check: runs-on: ubuntu-latest if: github.event_name == 'pull_request' steps: - - name: Check out repository + - name: 🧩 Checkout repository uses: actions/checkout@v4 with: fetch-depth: 1 - - name: Set up JDK 21 + - name: ☕️ Set up JDK 21 uses: actions/setup-java@v4 with: java-version: '21' distribution: 'temurin' cache: gradle - - name: Create application.yml file + - name: 📝 Create application.yml run: | mkdir -p ./src/main/resources echo "${{ secrets.APPLICATION_YML }}" > ./src/main/resources/application.yml - - name: Setup Gradle + + - name: ⚙️ Setup Gradle uses: gradle/gradle-build-action@v2 - - name: Compile check + - name: 🧱 Compile check run: | ./gradlew compileJava compileTestJava \ --parallel \ --build-cache \ --configuration-cache + + - name: 📦 Upload build results + uses: actions/upload-artifact@v4 + if: always() + with: + name: build-results + path: | + build/classes/ + build/libs/ retention-days: 7 \ No newline at end of file From 57daacf0bb30832ddf138e99174499137339aafe Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:49:15 +0900 Subject: [PATCH 19/34] feat(ci): add Kafka cluster ID to CI environment configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CI 환경에서 Kafka 클러스터의 고유 식별자를 설정하도록 KAFKA_CLUSTER_ID 추가 - 고유 식별자를 통해 테스트 환경 간 충돌 가능성 방지 - Kafka 설정의 일관성을 유지하며 CI 안정성 개선 --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 90075aac..ab2afae5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,6 +47,7 @@ jobs: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_CLUSTER_ID: "ci-cluster-001" options: >- --health-cmd "nc -z localhost 9092" --health-interval 10s From 56a8378cc862b9b242504a3fc25e42fad9791a99 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:51:11 +0900 Subject: [PATCH 20/34] feat(ci): refine Kafka startup and healthcheck configurations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 시작 명령어에 조건부 `kafka-storage format` 추가로 재포맷 방지 - CLUSTER_ID로 환경 변수 이름 변경하여 설정 명확성 향상 - 헬스체크 옵션 수정: 인터벌을 15초, 타임아웃을 10초로 증가시켜 안정성 개선 - CI 워크플로우에서 Kafka 구성의 신뢰성과 유지보수성을 강화 --- .github/workflows/ci.yml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ab2afae5..6beef836 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,11 +47,16 @@ jobs: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - KAFKA_CLUSTER_ID: "ci-cluster-001" + CLUSTER_ID: ci-cluster-001 + command: > + bash -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then + /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; + fi && + /etc/confluent/docker/run" options: >- --health-cmd "nc -z localhost 9092" - --health-interval 10s - --health-timeout 5s + --health-interval 15s + --health-timeout 10s --health-retries 10 steps: From cd7f9068e1793b885376df3aaa4717fb6e29aac1 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:52:15 +0900 Subject: [PATCH 21/34] feat(ci): update Kafka start script with entrypoint and echo log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 시작 스크립트를 `--entrypoint bash`로 수정하여 명령어 실행 방식 개선 - Kafka 시작 완료 후 'Kafka started successfully!' 메시지 추가로 진단 편의성 향상 - CI 실행 중 Kafka 구성 로직의 가독성과 유지보수성 강화 --- .github/workflows/ci.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6beef836..de620fd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,12 +48,13 @@ jobs: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: ci-cluster-001 - command: > - bash -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then - /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; - fi && - /etc/confluent/docker/run" options: >- + --entrypoint bash + -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then + /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; + fi && + /etc/confluent/docker/run" && + echo 'Kafka started successfully!' --health-cmd "nc -z localhost 9092" --health-interval 15s --health-timeout 10s From 9519b88d756c1c818cb2e8bd3e08e3aa7bfb70e6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:54:29 +0900 Subject: [PATCH 22/34] feat(ci): update Kafka entrypoint for cleaner command execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 시작 스크립트의 `--entrypoint` 옵션을 JSON 배열 형식으로 변경하여 명령어 구문 개선 - 불필요한 로깅 메시지 및 bash 옵션 제거로 워크플로우 가독성 강화 - CI 환경의 명령어 실행 구조를 단순화하고 유지보수성을 향상시킴 --- .github/workflows/ci.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index de620fd7..c32138ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,12 +49,7 @@ jobs: KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: ci-cluster-001 options: >- - --entrypoint bash - -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then - /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; - fi && - /etc/confluent/docker/run" && - echo 'Kafka started successfully!' + --entrypoint '["bash","-c","if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; fi && /etc/confluent/docker/run"]' --health-cmd "nc -z localhost 9092" --health-interval 15s --health-timeout 10s From 918309d56d73ceaa5524667417e3381a78d963e2 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:56:26 +0900 Subject: [PATCH 23/34] feat(ci): simplify Kafka entrypoint command in workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `--entrypoint` 옵션의 명령어를 단순화하여 CI 워크플로우 가독성 개선 - Kafka 시작 명령어를 다중 줄로 작성해 유지보수성 및 명령어 가독성 강화 - 불필요한 KAFKA_CLUSTER_ID 설정 제거로 환경 변수 구성 간소화 --- .github/workflows/ci.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c32138ba..6e5d85f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,9 +47,12 @@ jobs: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: ci-cluster-001 options: >- - --entrypoint '["bash","-c","if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; fi && /etc/confluent/docker/run"]' + --entrypoint bash + -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then + /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; + fi; + /etc/confluent/docker/run" --health-cmd "nc -z localhost 9092" --health-interval 15s --health-timeout 10s From 0299228391804d2c213e24f40cb11043812df9c6 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 03:59:18 +0900 Subject: [PATCH 24/34] feat(ci): remove redundant Kafka options and add cluster ID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KAFKA_CLUSTER_ID 환경 변수를 다시 추가하여 CI에서 클러스터 식별자를 명확히 설정 - 불필요한 `--entrypoint bash` 및 관련 명령어 제거로 워크플로우 단순화 - Kafka 시작 옵션 및 헬스체크 구성 가독성 강화 - 설정 정리를 통해 CI 환경 구성의 유지보수성 개선 --- .github/workflows/ci.yml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6e5d85f8..79b075f0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,6 +37,7 @@ jobs: ports: - 9092:9092 env: + KAFKA_CLUSTER_ID: ci-cluster-001 KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 @@ -48,11 +49,6 @@ jobs: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs options: >- - --entrypoint bash - -c "if [ ! -f /tmp/kraft-combined-logs/meta.properties ]; then - /usr/bin/kafka-storage format --ignore-formatted --cluster-id ci-cluster-001 --config /etc/kafka/kafka.properties; - fi; - /etc/confluent/docker/run" --health-cmd "nc -z localhost 9092" --health-interval 15s --health-timeout 10s @@ -175,4 +171,4 @@ jobs: path: | build/classes/ build/libs/ - retention-days: 7 \ No newline at end of file + retention-days: 7 From c49eff37131aa4bf14f320d60a7109112ee31305 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 04:01:11 +0900 Subject: [PATCH 25/34] feat(ci): add CLUSTER_ID environment variable for Kafka setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CLUSTER_ID 환경 변수를 Kafka 설정에 추가하여 CI에서 클러스터 식별자를 명확히 정의 - 설정 변경으로 CI 환경의 일관성 강화 및 유지보수성 개선 --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 79b075f0..e9250cad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,6 +37,7 @@ jobs: ports: - 9092:9092 env: + CLUSTER_ID: ci-cluster-001 KAFKA_CLUSTER_ID: ci-cluster-001 KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 From b261fead49fc1fb6394109f6fb11da2634c19349 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 04:03:24 +0900 Subject: [PATCH 26/34] feat(ci): update Kafka cluster ID in environment variables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CLUSTER_ID 및 KAFKA_CLUSTER_ID 환경 변수를 고유 값으로 업데이트하여 CI 환경에서의 명확성 강화 - 구성을 최신 상태로 유지하며 Kafka 클러스터 식별의 일관성 개선 --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9250cad..9e10b48c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,8 +37,8 @@ jobs: ports: - 9092:9092 env: - CLUSTER_ID: ci-cluster-001 - KAFKA_CLUSTER_ID: ci-cluster-001 + CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA + KAFKA_CLUSTER_ID: 9UbE6ogDQKqe49oBCZZmnA KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 From 8100e0ccbdbda3687a7c96f05af0a79b0661c437 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:09:53 +0900 Subject: [PATCH 27/34] feat(kafka): add error handler to post event consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaListenerContainerFactory에 `DefaultErrorHandler` 추가로 예외 처리 강화 - Kafka 리스너가 에러 발생 시 안정적 처리를 지원하도록 설정 수정 - `PostEventConsumer`에 컨테이너 팩토리 설정 적용으로 신뢰성 향상 --- .../dooya/see/adapter/integration/kafka/PostEventConsumer.java | 3 ++- .../adapter/integration/kafka/config/KafkaConsumerConfig.java | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java index 0f174713..48aeda9b 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventConsumer.java @@ -15,7 +15,8 @@ public class PostEventConsumer { @KafkaListener( topics = "${see.kafka.topics.post-events:post-events}", - groupId = "${spring.kafka.consumer.group-id:post-indexer-group}" + groupId = "${spring.kafka.consumer.group-id:post-indexer-group}", + containerFactory = "kafkaListenerContainerFactory" ) public void consume(PostEventMessage message) { log.debug("Kafka 이벤트 수신: {}", message); diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java index 1102007d..e723ee06 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaConsumerConfig.java @@ -11,6 +11,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; @@ -50,6 +51,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaLi new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(postEventConsumerFactory); factory.setConcurrency(1); + factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; } } From b757262a708e3931dc3a8c4c89a82b7ea76759ea Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:16:58 +0900 Subject: [PATCH 28/34] feat(gradle): add spring-retry dependency for retry logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - spring-retry 의존성 추가로 재시도 로직 구현 지원 - 특정 실패 시 복구 및 재시도 기능 강화 - 실패 내구성을 개선하고 신뢰성을 높이기 위한 사전 작업 --- build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle.kts b/build.gradle.kts index b26e195a..e80d36ba 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -49,6 +49,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-validation") implementation("org.springframework.security:spring-security-core") implementation("org.springframework.kafka:spring-kafka") + implementation("org.springframework.retry:spring-retry") implementation("io.jsonwebtoken:jjwt-api:0.12.6") implementation("io.jsonwebtoken:jjwt-impl:0.12.6") implementation("io.jsonwebtoken:jjwt-jackson:0.12.6") From 59d2de93db534cc6bf89eaebbfa642c9b22685d0 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:17:16 +0900 Subject: [PATCH 29/34] feat(kafka): add retry template for Kafka producer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 프로듀서에 적용할 `RetryTemplate` 빈 추가 - 최대 시도 횟수 3회와 지수 백오프(초기 지연 200ms, 최대 2초) 설정 - 재시도 로직을 통해 메시지 전송 실패 시 안정성과 신뢰성 개선 --- .../integration/kafka/config/KafkaProducerConfig.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java index 3eb1723b..febe498b 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/config/KafkaProducerConfig.java @@ -11,6 +11,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.retry.support.RetryTemplate; import java.util.HashMap; import java.util.Map; @@ -38,4 +39,12 @@ public ProducerFactory postEventProducerFactory() { public KafkaTemplate postEventKafkaTemplate(ProducerFactory postEventProducerFactory) { return new KafkaTemplate<>(postEventProducerFactory); } + + @Bean + public RetryTemplate kafkaRetryTemplate() { + return RetryTemplate.builder() + .maxAttempts(3) + .exponentialBackoff(200, 2.0, 2_000) + .build(); + } } From 565d730aba471e32215cbc07dac7c05c2b2cbabc Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:17:25 +0900 Subject: [PATCH 30/34] feat(kafka): improve post event producer with retry mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `RetryTemplate`을 활용한 Kafka 이벤트 발행 재시도 로직 추가 - 메시지 키 생성 및 재시도 실행 방식 수정으로 이벤트 전송 안정성 강화 - 재시도 초과 시 마지막 에러 로깅 및 예외 처리로 문제 식별성 개선 --- .../integration/kafka/PostEventProducer.java | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java index a4afb478..433f59a9 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java @@ -10,16 +10,20 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.retry.support.RetryTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; +import java.util.concurrent.ExecutionException; + @Slf4j @Component @RequiredArgsConstructor @ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") public class PostEventProducer implements PostEventPublisher { private final KafkaTemplate kafkaTemplate; + private final RetryTemplate retryTemplate; @Value("${see.kafka.topics.post-events:post-events}") private String topic; @@ -34,15 +38,19 @@ public void publish(DomainEvent event) { return; } + String key = message.postId() != null ? message.postId().toString() : null; + + Runnable sendTask = () -> sendWithRetry(key, message); + if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - doSend(message); + sendTask.run(); } }); } else { - doSend(message); + sendTask.run(); } } @@ -59,28 +67,30 @@ private PostEventMessage toMessage(DomainEvent event) { return null; } - private void doSend(PostEventMessage message) { - try { - kafkaTemplate.send(topic, message) - .whenComplete((result, throwable) -> { - if (throwable != null) { - log.error("Kafka 이벤트 발행 실패: {}", message, throwable); - return; - } - if (result != null && log.isDebugEnabled()) { - log.debug("Kafka 이벤트 발행 성공: topic={}, partition={}, offset={}, payload={}", - result.getRecordMetadata().topic(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset(), - message); - } - }) - .exceptionally(throwable -> { - log.error("Kafka 이벤트 발행 실패: {}", message, throwable); - return null; - }); - } catch (Exception e) { - log.error("Kafka 이벤트 발행 실패: {}", message, e); - } + private void sendWithRetry(String key, PostEventMessage message) { + retryTemplate.execute(retryContext -> { + try { + var result = kafkaTemplate.send(topic, key, message).get(); + if (result != null && log.isDebugEnabled()) { + log.debug("Kafka 이벤트 발행 성공: topic={}, partition={}, offset={}, key={}, payload={}", + result.getRecordMetadata().topic(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset(), + key, + message); + } + return null; + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Kafka 전송이 인터럽트되었습니다", interrupted); + } catch (ExecutionException executionException) { + Throwable cause = executionException.getCause() != null ? executionException.getCause() : executionException; + throw new IllegalStateException("Kafka 전송 실패", cause); + } + }, recoveryContext -> { + Throwable lastError = recoveryContext.getLastThrowable(); + log.error("Kafka 이벤트 발행 실패(재시도 완료): {}", message, lastError); + throw new IllegalStateException("Kafka 전송 실패(재시도 초과)", lastError); + }); } } From 2e0bcd9245b9af08be6b46ba9699e501f0f5136e Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:22:17 +0900 Subject: [PATCH 31/34] feat(domain): add serializable implementation to post entities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `Tag`, `Post`, `PostContent`, and `PostMetaData`에 `Serializable` 구현하여 직렬화 가능하도록 수정 - 직렬화 지원을 위해 각 클래스에 `serialVersionUID` 필드 추가 - 엔티티 직렬화를 위한 사전 작업으로, 분산 환경에서의 데이터 전송 및 저장소 연계를 대비 --- src/main/java/dooya/see/domain/post/Post.java | 3 ++- src/main/java/dooya/see/domain/post/PostContent.java | 4 +++- src/main/java/dooya/see/domain/post/PostMetaData.java | 4 +++- src/main/java/dooya/see/domain/post/Tag.java | 4 +++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/dooya/see/domain/post/Post.java b/src/main/java/dooya/see/domain/post/Post.java index 55ccfd38..b1ec8713 100644 --- a/src/main/java/dooya/see/domain/post/Post.java +++ b/src/main/java/dooya/see/domain/post/Post.java @@ -20,7 +20,8 @@ @Builder(access = AccessLevel.PRIVATE) @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PROTECTED) -public class Post extends AbstractAggregateRoot { +public class Post extends AbstractAggregateRoot implements java.io.Serializable { + private static final long serialVersionUID = 1L; @Embedded private PostContent content; diff --git a/src/main/java/dooya/see/domain/post/PostContent.java b/src/main/java/dooya/see/domain/post/PostContent.java index e3faed89..98d7ac23 100644 --- a/src/main/java/dooya/see/domain/post/PostContent.java +++ b/src/main/java/dooya/see/domain/post/PostContent.java @@ -9,7 +9,9 @@ public record PostContent( String title, @Column(name = "body", length = 50000, nullable = false) String body -) { +) implements java.io.Serializable { + private static final long serialVersionUID = 1L; + public PostContent { validateTitle(title); validateBody(body); diff --git a/src/main/java/dooya/see/domain/post/PostMetaData.java b/src/main/java/dooya/see/domain/post/PostMetaData.java index 488cc3f4..63b7d883 100644 --- a/src/main/java/dooya/see/domain/post/PostMetaData.java +++ b/src/main/java/dooya/see/domain/post/PostMetaData.java @@ -2,6 +2,7 @@ import jakarta.persistence.Embeddable; +import java.io.Serializable; import java.time.LocalDateTime; @Embeddable @@ -11,7 +12,8 @@ public record PostMetaData( LocalDateTime modifiedAt, LocalDateTime publishedAt -) { +) implements Serializable { + private static final long serialVersionUID = 1L; public static PostMetaData create() { LocalDateTime now = LocalDateTime.now(); diff --git a/src/main/java/dooya/see/domain/post/Tag.java b/src/main/java/dooya/see/domain/post/Tag.java index 59e7e7f6..aefc0dd1 100644 --- a/src/main/java/dooya/see/domain/post/Tag.java +++ b/src/main/java/dooya/see/domain/post/Tag.java @@ -2,10 +2,12 @@ import jakarta.persistence.Embeddable; +import java.io.Serializable; import java.util.regex.Pattern; @Embeddable -public record Tag(String name) { +public record Tag(String name) implements Serializable { + private static final long serialVersionUID = 1L; private static final Pattern TAG_PATTERN = Pattern.compile("^[가-힣a-zA-Z0-9]+$"); public Tag { From a8c7dfca539ffb96c7b13be2169d3f2cb5177254 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:23:31 +0900 Subject: [PATCH 32/34] feat(kafka): add unique consumer group ID for test configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `spring.kafka.consumer.group-id`를 테스트 설정에 추가하여 Kafka 테스트 환경에서 유니크한 그룹 ID를 사용할 수 있도록 변경 - 테스트 병렬 실행 및 고유 컨슈머 그룹 간 충돌 방지를 위한 조치 - 랜덤 UUID 값을 환경 변수로 사용하여 매 실행마다 고유 그룹 ID 할당 --- .../dooya/see/application/post/PostEventKafkaPipelineTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java index 7ba80726..a1262e54 100644 --- a/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java +++ b/src/test/java/dooya/see/application/post/PostEventKafkaPipelineTest.java @@ -24,7 +24,8 @@ @SpringBootTest(properties = { "see.kafka.enabled=true", - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.group-id=${random.uuid}" }) @EmbeddedKafka(partitions = 1, topics = "${see.kafka.topics.post-events:post-events}") @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") From ce42856962028ecbaf2a7d6206b36bb636e0c346 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:33:25 +0900 Subject: [PATCH 33/34] feat(docs): enhance Kafka and event pipeline documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 통합 및 도메인 이벤트 파이프라인 관련 문서 대폭 확장 - 이벤트 플로우 다이어그램 갱신: Kafka 및 DirectPublisher를 모두 포함하도록 변경 - Post 엔티티 및 값 객체 직렬화 지원에 대한 세부 내용 추가 - 메시지 재시도 정책, Fallback 전략, Embedded Kafka와 관련된 설정 팁 포함 - 개발 및 테스트 환경을 위한 Kafka 비활성화 옵션 설명 추가 --- ...0\353\215\270 \353\254\270\354\204\234.md" | 149 ++++++++++++------ ...4\354\240\204 \353\254\270\354\204\234.md" | 16 +- README.md | 41 ++++- 3 files changed, 151 insertions(+), 55 deletions(-) diff --git "a/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" "b/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" index 5f36ac1e..39a0c565 100644 --- "a/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" +++ "b/.docs/\353\217\204\353\251\224\354\235\270 \353\252\250\353\215\270 \353\254\270\354\204\234.md" @@ -244,13 +244,14 @@ stateDiagram-v2 #### PostContent 값 객체 - **제목 필수**: 빈 제목 허용 안함 -- **내용 선택**: 본문은 빈 내용 허용 +- **내용 필수**: 본문은 빈 내용 허용 안함, 최대 50,000자 제한 - **불변성**: 수정 시 새로운 인스턴스 생성 #### PostMetaData 값 객체 - **시간 추적**: 생성/수정/발행 시점 기록 - **자동 갱신**: 수정 시 modifiedAt 자동 업데이트 - **불변성**: 시점 정보는 생성 후 변경 불가 +- **직렬화 지원**: Kafka 전송을 위해 관련 엔티티/값 객체(Post, PostContent, PostMetaData, Tag)는 모두 `Serializable`을 구현 ## 댓글(Comment) 애그리거트 @@ -368,37 +369,30 @@ classDiagram ### 포스트 관련 이벤트 ```mermaid -graph TB - subgraph "Post Events" - PC[PostCreated] - PU[PostUpdated] - PPub[PostPublished] - PH[PostHidden] - PD[PostDeleted] - PV[PostViewed] - PL[PostLiked] - PUL[PostUnliked] - end - - subgraph "Comment Events" - CC[CommentCreated] - CU[CommentUpdated] - CH[CommentHidden] - CD[CommentDeleted] - end - - subgraph "Event Handlers" - PSE[PostStatsEventHandler] - end - - PC --> PSE - PV --> PSE - PL --> PSE - PUL --> PSE - CC --> PSE - CD --> PSE +graph LR + PostAggregate((Post Aggregate)) + Handler[PostEventHandler] + Publisher[PostEventPublisher] + Producer[PostEventProducer] + Kafka[(Kafka Broker)] + Consumer[PostEventConsumer] + Processor[PostEventProcessor] + Indexer[PostSearchIndexer → Elasticsearch] + Direct[DirectPostEventPublisher] + Stats[PostStatsEventHandler] + + PostAggregate --> Handler + Handler --> Publisher + Publisher --> Producer + Producer --> |PostEventMessage| Kafka + Kafka --> Consumer --> Processor --> Indexer + Publisher --> Direct --> Processor + Processor --> Indexer + PostAggregate --> Stats ``` +도메인 이벤트는 Post 애그리거트에서 발행되며 `PostEventHandler`가 애플리케이션 이벤트로 수신합니다. 기본적으로 Kafka를 거쳐 `PostEventProcessor`가 Elasticsearch 색인을 갱신하고, 통계 관련 이벤트는 `PostStatsEventHandler`가 별도로 처리합니다. Kafka를 사용할 수 없는 경우 `DirectPostEventPublisher`가 동일한 프로세서를 직접 호출합니다. + ### 이벤트 처리 규칙 #### 통계 업데이트 @@ -564,27 +558,88 @@ public class Post extends AbstractAggregateRoot { #### 이벤트 처리 패턴 ```java -@Async -@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) -public void handlePostViewed(PostViewed event) { - if (event.postId() == null) return; - - log.info("PostViewed 이벤트 처리: postId={}, viewerId={}", event.postId(), event.memberId()); - - try { - postStatsManager.incrementViewCount(event.postId()); - } catch (Exception e) { - log.error("조회수 증가 실패: postId={}", event.postId(), e); - // 통계 실패가 메인 로직에 영향주지 않도록 격리 +@Component +@RequiredArgsConstructor +public class PostEventHandler { + + private final PostEventPublisher postEventPublisher; + + @EventListener + public void handlePostCreated(PostCreated event) { + postEventPublisher.publish(event); + } + + @EventListener + public void handlePostUpdated(PostUpdated event) { + postEventPublisher.publish(event); + } + + @EventListener + public void handlePostDeleted(PostDeleted event) { + postEventPublisher.publish(event); + } +} + +@Component +@ConditionalOnProperty(value = "see.kafka.enabled", havingValue = "true") +@RequiredArgsConstructor +public class PostEventProducer implements PostEventPublisher { + private final KafkaTemplate kafkaTemplate; + private final RetryTemplate retryTemplate; + + @Value("${see.kafka.topics.post-events:post-events}") + private String topic; + + @Override + public void publish(DomainEvent event) { + PostEventMessage message = PostEventMessage.from(event); + String key = message.postId().toString(); + + Runnable sender = () -> retryTemplate.execute(ctx -> { + kafkaTemplate.send(topic, key, message).get(); + return null; + }, recovery -> { + throw new IllegalStateException("Kafka 전송 실패", recovery.getLastThrowable()); + }); + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + sender.run(); + } + }); + } else { + sender.run(); + } + } +} + +@Component +@RequiredArgsConstructor +class PostEventProcessor { + private final PostRepository postRepository; + private final PostSearchIndexer indexer; + + void process(PostEventMessage message) { + switch (message.type()) { + case CREATED, UPDATED -> postRepository.findById(message.postId()) + .ifPresent(indexer::index); + case DELETED -> indexer.delete(message.postId()); + } } } ``` +Kafka를 사용할 수 없는 환경에서는 `see.kafka.enabled=false`로 설정하여 `DirectPostEventPublisher`가 즉시 `PostEventProcessor`를 호출하도록 구성했습니다. + #### 이벤트 처리 원칙 -- **트랜잭션 분리**: `@TransactionalEventListener(AFTER_COMMIT)`로 메인 트랜잭션과 분리 -- **비동기 처리**: `@Async`로 성능 최적화 -- **예외 격리**: try-catch로 사이드 이펙트 실패가 메인 로직에 영향 없도록 보장 -- **PostStatsManager 활용**: 인터페이스를 통한 관심사 분리 +- **트랜잭션 후처리**: Kafka 전송은 커밋 이후 `TransactionSynchronization`을 통해 수행 +- **메시지 키**: `postId`를 메시지 키로 사용해 파티션 내 순서를 보장 +- **재시도 정책**: `RetryTemplate`으로 최대 3회 재시도 후 실패를 호출자에게 전달 +- **컨슈머 에러 처리**: `DefaultErrorHandler`가 적용된 리스너 컨테이너로 재처리/로그를 관리 +- **Fallback 보장**: Kafka 비활성화 시에도 동일한 이벤트 플로우를 직접 실행 +- **테스트 격리**: Embedded Kafka 통합 테스트에서는 `${random.uuid}` 그룹 ID로 소비자를 분리 ### 5. 헥사고날 아키텍처 적용 @@ -680,4 +735,4 @@ public class JpaMemberRepository implements MemberRepository { - **Spring Data JPA 레퍼런스** - **헥사고날 아키텍처 가이드** -이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다. \ No newline at end of file +이 도메인 모델 문서는 See 프로젝트의 핵심 비즈니스 로직과 설계 원칙을 담고 있습니다. 지속적인 도메인 지식 발견과 함께 문서도 함께 발전시켜 나가야 합니다. diff --git "a/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" "b/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" index 957e51af..90f0fdae 100644 --- "a/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" +++ "b/.docs/\354\232\251\354\226\264 \354\202\254\354\240\204 \353\254\270\354\204\234.md" @@ -343,6 +343,20 @@ graph LR | **롤백** | Rollback | 트랜잭션을 취소하고 이전 상태로 복원 | "에러 발생 시 자동으로 롤백됩니다" | | **커밋** | Commit | 트랜잭션의 변경사항을 영구 반영 | "모든 작업이 성공하면 커밋합니다" | +### 메시징 & 이벤트 용어 + +| 한국어 | 영어 | 정의 | 사용 예시 | +|--------|------|------|-----------| +| **도메인 이벤트** | Domain Event | 애그리거트가 발행하는 상태 변화 알림 | "PostCreated 이벤트가 Kafka로 전송됩니다" | +| **PostEventPublisher** | PostEventPublisher | 도메인 이벤트를 외부 파이프라인으로 전달하는 애플리케이션 포트 | "PostEventHandler는 PostEventPublisher를 통해 메시지를 보냅니다" | +| **PostEventProducer** | PostEventProducer | KafkaTemplate을 사용해 이벤트 메시지를 브로커로 보내는 어댑터 | "재시도 정책이 적용된 PostEventProducer" | +| **PostEventMessage** | PostEventMessage | Kafka에 전송되는 직렬화 가능한 포스트 이벤트 DTO | "postId와 이벤트 타입만 포함한 경량 메시지" | +| **PostEventProcessor** | PostEventProcessor | Kafka 메시지를 읽어 최신 포스트 스냅샷을 색인 서비스로 위임하는 컴포넌트 | "메시지 타입에 따라 index/delete를 수행" | +| **DirectPostEventPublisher** | DirectPostEventPublisher | Kafka 비활성화 시 즉시 색인을 수행하는 대체 어댑터 | "로컬 개발에서는 DirectPostEventPublisher가 사용됩니다" | +| **Embedded Kafka** | Embedded Kafka | 테스트에서 경량 Kafka 브로커를 실행하는 도구 | "PostEventKafkaPipelineTest는 Embedded Kafka로 파이프라인을 검증" | +| **RetryTemplate** | RetryTemplate | 일정한 정책으로 연산을 재시도하게 해주는 Spring Retry 유틸리티 | "Kafka 전송 실패 시 RetryTemplate이 최대 3회 재시도합니다" | +| **see.kafka.enabled** | see.kafka.enabled | Kafka 사용 여부를 제어하는 애플리케이션 설정 | "테스트에서는 see.kafka.enabled=false로 즉시 처리" | + ## 💡 디자인 패턴 용어 ### 생성 패턴 @@ -402,4 +416,4 @@ graph LR --- -이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요. \ No newline at end of file +이 용어 사전은 See 프로젝트의 성장과 함께 지속적으로 업데이트됩니다. 새로운 용어나 수정이 필요한 내용이 있다면 언제든 팀에 공유해 주세요. diff --git a/README.md b/README.md index 25221c31..4e546bf1 100644 --- a/README.md +++ b/README.md @@ -66,12 +66,21 @@ - **Application Layer**: 유스케이스 조율과 트랜잭션 관리 - Primary Port: MemberManager, PostManager, CommentManager - Secondary Port: MemberRepository, PostRepository, PostStatsRepository - - 이벤트 핸들러: PostStatsEventHandler + - 이벤트 핸들러: PostEventHandler, PostStatsEventHandler - **Adapter Layer**: 외부 시스템과의 연동 - Web API: REST 엔드포인트 - Security: JWT 인증/인가 - Persistence: JPA 구현체 + - Integration: Kafka 프로듀서/컨슈머, Elasticsearch 색인기 + +### 이벤트 파이프라인 + +1. **도메인 이벤트 발행**: Post, Comment 등 애그리거트가 `AbstractAggregateRoot`를 통해 이벤트를 수집합니다. +2. **애플리케이션 이벤트**: `PostEventHandler`가 스프링 애플리케이션 이벤트를 받아 `PostEventPublisher`(포트)에 위임합니다. +3. **Kafka 전송**: `PostEventProducer`는 트랜잭션 커밋 이후 Kafka로 메시지를 전송하며, `postId`를 메시지 키로 사용하고 `RetryTemplate`으로 재시도 정책을 적용합니다. +4. **컨슈머 처리**: `PostEventConsumer`는 `DefaultErrorHandler`가 적용된 리스너 컨테이너에서 메시지를 소비하고, `PostEventProcessor`가 최신 게시글을 재조회해 Elasticsearch 색인을 갱신합니다. +5. **Fallback 모드**: `see.kafka.enabled=false` 일 때는 `DirectPostEventPublisher`가 즉시 색인 작업을 수행하여 Kafka 없이도 동일한 로직을 유지합니다. ## 기술 스택 @@ -80,10 +89,13 @@ - **Spring Boot 3.5.4**: 최신 스프링 부트로 개발 생산성 향상 - **Spring Data JPA**: 데이터 접근 계층 추상화 - **Spring Security**: 인증/인가 보안 체계 +- **Spring Retry**: 외부 시스템 연동 시 재시도 정책 지원 -### Database +### Storage & Messaging - **MySQL 8.0**: 운영 환경 데이터베이스 - **H2**: 개발/테스트 환경 인메모리 데이터베이스 +- **Elasticsearch 8.x**: 검색 인덱스 저장소 +- **Apache Kafka 7.x (Confluent)**: 도메인 이벤트 브로커 - **Docker Compose**: 컨테이너 기반 개발 환경 ### Testing & Quality @@ -111,17 +123,27 @@ git clone https://github.com/your-repo/see.git cd see ``` -### 2. 데이터베이스 시작 +### 2. 인프라 기동 (선택) ```bash -docker-compose up -d mysql +# MySQL, Elasticsearch, Kafka를 한 번에 실행하려면 +docker compose up -d mysql elasticsearch kafka + +# 또는 필요한 서비스만 선택적으로 실행할 수 있습니다. ``` +Kafka를 사용하지 않는 개발 환경이라면 위 단계를 생략하고 `see.kafka.enabled=false` 프로필을 활성화하면 됩니다. + ### 3. 애플리케이션 실행 ```bash ./gradlew bootRun ``` -### 4. API 테스트 +### 4. 환경 설정 팁 +- `see.kafka.enabled=false`: Kafka 없이 즉시 색인을 수행 (기본 테스트 프로필) +- `spring.kafka.bootstrap-servers`: 로컬 Kafka 브로커 주소 (임베디드 테스트에서는 자동 주입) +- `spring.elasticsearch.uris`: Elasticsearch 연결 주소 (기본값 `http://localhost:9200`) + +### 5. API 테스트 ```bash # 회원 가입 curl -X POST http://localhost:8080/api/members \ @@ -184,7 +206,8 @@ src/ │ └── adapter/ # 어댑터 계층 │ ├── webapi/ # REST API │ ├── security/ # 보안 -│ └── persistence/ # 데이터베이스 +│ ├── persistence/ # 데이터베이스 +│ └── integration/ # Kafka, Elasticsearch 연동 └── test/ # 테스트 코드 ├── domain/ # 단위 테스트 (순수 Java) ├── application/ # 통합 테스트 @@ -197,12 +220,16 @@ src/ - **단위 테스트**: 도메인 로직을 순수 Java로 빠르게 검증 - **통합 테스트**: 포트 구현체를 모의 객체로 대체하여 애플리케이션 서비스 검증 - **인수 테스트**: 실제 어댑터를 사용하여 End-to-End 시나리오 검증 +- **Kafka 통합 테스트**: Embedded Kafka를 활용해 이벤트 파이프라인을 종단 간 검증 ### 테스트 실행 ```bash -# 전체 테스트 +# 전체 테스트 (Kafka 비활성화 프로필 기본 적용) ./gradlew test +# Kafka 파이프라인 통합 테스트만 실행 (Embedded Kafka) +./gradlew test --tests PostEventKafkaPipelineTest + # 테스트 커버리지 확인 ./gradlew jacocoTestReport open build/jacocoHtml/index.html From 6458bb4853de257426df1a61d4e55db919222393 Mon Sep 17 00:00:00 2001 From: Park JeongHyun Date: Sat, 18 Oct 2025 21:35:49 +0900 Subject: [PATCH 34/34] fix(kafka): handle unexpected exceptions in post event producer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka 이벤트 발행 중 발생할 수 있는 처리되지 않은 예외를 안전하게 처리하도록 수정 - `sendWithRetry` 실행 시 예외 로그를 기록하여 이슈 원인 파악 가능 - 안정성을 높이는 리팩토링 작업으로 서비스 신뢰성 강화 - DLQ 및 모니터링 알림 연동 고려를 위한 TODO 추가 --- .../adapter/integration/kafka/PostEventProducer.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java index 433f59a9..c50a1bc4 100644 --- a/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java +++ b/src/main/java/dooya/see/adapter/integration/kafka/PostEventProducer.java @@ -40,7 +40,13 @@ public void publish(DomainEvent event) { String key = message.postId() != null ? message.postId().toString() : null; - Runnable sendTask = () -> sendWithRetry(key, message); + Runnable sendTask = () -> { + try { + sendWithRetry(key, message); + } catch (Exception unexpected) { + log.error("Kafka 이벤트 발행 중 처리되지 않은 예외 발생: {}", message, unexpected); + } + }; if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @@ -90,7 +96,8 @@ private void sendWithRetry(String key, PostEventMessage message) { }, recoveryContext -> { Throwable lastError = recoveryContext.getLastThrowable(); log.error("Kafka 이벤트 발행 실패(재시도 완료): {}", message, lastError); - throw new IllegalStateException("Kafka 전송 실패(재시도 초과)", lastError); + // TODO: DLQ 또는 모니터링 알림 연동을 고려합니다. + return null; }); } }