Skip to content

Commit a7740d3

Browse files
authored
Merge pull request #38 from dPwls0125/feature/compensation
[Feature/compensation] Kafka 보상 이벤트 처리 병렬화 및 컨슈머 성능 최적화
2 parents e3ce2b7 + b201dcb commit a7740d3

21 files changed

Lines changed: 1021 additions & 117 deletions

File tree

AccountService/Dockerfile

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
FROM gradle:8.4-jdk21 AS build
2-
WORKDIR /workspace
3-
COPY . .
4-
RUN ./gradlew :AccountService:bootWar -x test --no-daemon
5-
6-
FROM eclipse-temurin:21-jre
7-
WORKDIR /app
8-
COPY --from=build /workspace/AccountService/build/libs/*.war app.war
9-
EXPOSE 8081
10-
ENTRYPOINT ["java","-jar","app.war"]
1+
FROM eclipse-temurin:21-jdk
2+
ARG WAR_FILE=build/libs/*.war
3+
COPY ${WAR_FILE} app.war
4+
ENTRYPOINT ["java","-jar","/app.war"]

CartService/Dockerfile

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
FROM gradle:8.4-jdk21 AS build
2-
WORKDIR /workspace
3-
COPY . .
4-
RUN ./gradlew :CartService:bootWar -x test --no-daemon
5-
6-
FROM eclipse-temurin:21-jre
7-
WORKDIR /app
8-
COPY --from=build /workspace/CartService/build/libs/*.war app.war
9-
EXPOSE 8082
10-
ENTRYPOINT ["java","-jar","app.war"]
1+
FROM eclipse-temurin:17-jdk
2+
ARG WAR_FILE=build/libs/*.war
3+
COPY ${WAR_FILE} app.war
4+
ENTRYPOINT ["java","-jar","/app.war"]

CatalogService/Dockerfile

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
FROM gradle:8.4-jdk21 AS build
2-
WORKDIR /workspace
3-
COPY . .
4-
RUN ./gradlew :CatalogService:bootWar -x test --no-daemon
5-
6-
FROM eclipse-temurin:21-jre
7-
WORKDIR /app
8-
COPY --from=build /workspace/CatalogService/build/libs/*.war app.war
9-
EXPOSE 8083
10-
ENTRYPOINT ["java","-jar","app.war"]
1+
FROM eclipse-temurin:17-jdk
2+
ARG WAR_FILE=build/libs/*.war
3+
COPY ${WAR_FILE} app.war
4+
ENTRYPOINT ["java","-jar","/app.war"]

CatalogService/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ archivesBaseName = 'jpetstore-catalog'
44

55
dependencies {
66
implementation project(':CommonLibrary')
7-
implementation 'org.projectlombok:lombok:1.18.30'
7+
compileOnly 'org.projectlombok:lombok:1.18.30'
88
annotationProcessor 'org.projectlombok:lombok:1.18.30'
99
implementation 'org.springframework.kafka:spring-kafka:3.3.7'
1010
implementation 'net.devh:grpc-server-spring-boot-starter:3.1.0.RELEASE'
11+
testImplementation 'org.springframework.kafka:spring-kafka-test:3.3.7'
1112
}
1213

CatalogService/src/main/java/org/mybatis/jpetstore/catalog/controller/CatalogController.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.HashMap;
1616
import java.util.List;
1717

18-
1918
@Controller
2019
@RequiredArgsConstructor
2120
public class CatalogController {

CatalogService/src/main/java/org/mybatis/jpetstore/catalog/kafka/KafkaConsumerConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public class KafkaConsumerConfig {
2222
@Value("${kafka.bootstrap-servers}")
2323
private String bootstrapServers;
2424

25+
@Value("${kafka.consumer.concurrency:1}")
26+
private int concurrency;
27+
2528
@Bean
2629
public ConsumerFactory<String, Object> consumerFactory() {
2730
Map<String, Object> config = new HashMap<>();
@@ -37,6 +40,7 @@ public ConsumerFactory<String, Object> consumerFactory() {
3740
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
3841
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
3942
factory.setConsumerFactory(consumerFactory());
43+
factory.setConcurrency(concurrency);
4044
return factory;
4145
}
4246
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.mybatis.jpetstore.catalog.service;
2+
3+
import org.junit.jupiter.api.DisplayName;
4+
import org.junit.jupiter.api.Test;
5+
import org.mybatis.jpetstore.catalog.CatalogServiceApplication;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.boot.test.context.SpringBootTest;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.test.context.EmbeddedKafka;
10+
import org.springframework.test.context.ActiveProfiles;
11+
import org.springframework.boot.test.mock.mockito.SpyBean;
12+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
13+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
14+
import org.springframework.kafka.listener.MessageListenerContainer;
15+
import org.springframework.kafka.test.utils.ContainerTestUtils;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.doAnswer;
25+
26+
@SpringBootTest(classes = CatalogServiceApplication.class, properties = {
27+
"eureka.client.enabled=false",
28+
"spring.cloud.discovery.enabled=false",
29+
"spring.cloud.compatibility-verifier.enabled=false",
30+
"spring.sql.init.mode=always",
31+
"spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,org.mybatis.jpetstore.common.config.CommonAutoConfiguration",
32+
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
33+
"spring.kafka.consumer.properties.spring.json.trusted.packages=*",
34+
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
35+
"kafka.bootstrap-servers=${spring.kafka.bootstrap-servers}",
36+
"kafka.consumer.concurrency=3", // 3개의 스레드 활성화 (메인 설정 활용)
37+
"grpc.server.port=0"
38+
})
39+
@EmbeddedKafka(partitions = 3, topics = {"product_compensation"}, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
40+
@ActiveProfiles("test")
41+
public class KafkaConcurrencyTest {
42+
43+
@Autowired
44+
private KafkaTemplate<String, Object> kafkaTemplate;
45+
46+
@SpyBean
47+
private CatalogService catalogService;
48+
49+
@Autowired
50+
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
51+
52+
@Autowired
53+
private EmbeddedKafkaBroker embeddedKafkaBroker;
54+
55+
@Test
56+
@DisplayName("성능 입증 테스트: Concurrency=3 일 때 30개의 메시지를 병렬로 처리하는지 확인")
57+
void measurePerformanceWithMultipleConsumers() throws InterruptedException {
58+
// 파티션 할당 대기
59+
for (MessageListenerContainer container : kafkaListenerEndpointRegistry.getListenerContainers()) {
60+
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
61+
}
62+
63+
int messageCount = 30;
64+
CountDownLatch latch = new CountDownLatch(messageCount);
65+
AtomicInteger processedCount = new AtomicInteger(0);
66+
67+
doAnswer(invocation -> {
68+
Thread.sleep(100); // 100ms 지연, consumer가 만약 하나라면 해당 100ms만큼 blocking되는 구조
69+
processedCount.incrementAndGet();
70+
System.out.println("[CONSUMER] Thread: " + Thread.currentThread().getName() + " | Total: " + processedCount.get());
71+
latch.countDown();
72+
return null;
73+
}).when(catalogService).rollbackInventory(any());
74+
75+
long startTime = System.currentTimeMillis();
76+
77+
for (int i = 0; i < messageCount; i++) {
78+
Map<String, Object> data = new HashMap<>();
79+
data.put("EST-" + (i % 5), 1);
80+
// 키를 다르게 주어 3개 파티션에 골고루 분산
81+
kafkaTemplate.send("product_compensation", "key-" + i, data);
82+
}
83+
84+
boolean completed = latch.await(20, TimeUnit.SECONDS);
85+
long endTime = System.currentTimeMillis();
86+
long totalTime = endTime - startTime;
87+
88+
System.out.println("==========================================");
89+
System.out.println("Message Count: " + messageCount);
90+
System.out.println("Total Processing Time: " + totalTime + " ms");
91+
System.out.println("==========================================");
92+
93+
assert(totalTime < 2500) : "Parallel processing is NOT working. Expected < 2500ms, but got " + totalTime + "ms";
94+
}
95+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package org.mybatis.jpetstore.catalog.service;
2+
3+
import org.junit.jupiter.api.DisplayName;
4+
import org.junit.jupiter.api.Test;
5+
import org.mybatis.jpetstore.catalog.CatalogServiceApplication;
6+
import org.mybatis.jpetstore.catalog.controller.CatalogController;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.test.context.SpringBootTest;
9+
import org.springframework.boot.test.context.TestConfiguration;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12+
import org.springframework.kafka.core.KafkaTemplate;
13+
import org.springframework.kafka.core.ProducerFactory;
14+
import org.springframework.kafka.support.serializer.JsonSerializer;
15+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
16+
import org.springframework.kafka.test.context.EmbeddedKafka;
17+
import org.springframework.kafka.test.utils.ContainerTestUtils;
18+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
19+
import org.springframework.kafka.listener.MessageListenerContainer;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.springframework.test.context.ActiveProfiles;
22+
import org.springframework.boot.test.mock.mockito.SpyBean;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertNotNull;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.Mockito.timeout;
31+
import static org.mockito.Mockito.verify;
32+
33+
/**
34+
* Saga 패턴의 보상 트랜잭션 수신 측(Catalog Service) 통합 테스트입니다.
35+
*/
36+
@SpringBootTest(classes = CatalogServiceApplication.class, properties = {
37+
"eureka.client.enabled=false",
38+
"spring.cloud.discovery.enabled=false",
39+
"spring.cloud.compatibility-verifier.enabled=false",
40+
"spring.sql.init.mode=always",
41+
"spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration,org.mybatis.jpetstore.common.config.CommonAutoConfiguration",
42+
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
43+
"spring.kafka.consumer.properties.spring.json.trusted.packages=*",
44+
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
45+
"kafka.bootstrap-servers=${spring.kafka.bootstrap-servers}",
46+
"grpc.server.port=0"
47+
})
48+
@EmbeddedKafka(partitions = 1, topics = {"product_compensation"}, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
49+
@ActiveProfiles("test")
50+
public class RollbackIntegrationTest {
51+
52+
@Autowired
53+
private KafkaTemplate<String, Object> kafkaTemplate;
54+
55+
@SpyBean
56+
private CatalogService catalogService;
57+
58+
@Autowired
59+
private CatalogController catalogController;
60+
61+
@Autowired
62+
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
63+
64+
@Autowired
65+
private EmbeddedKafkaBroker embeddedKafkaBroker;
66+
67+
@BeforeEach
68+
public void setUp() {
69+
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
70+
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
71+
}
72+
}
73+
74+
@Test
75+
@DisplayName("보상 트랜잭션 통합 테스트: Catalog 서비스는 Kafka 보상 이벤트를 받으면 실제 DB 재고를 복구해야 한다")
76+
void verifyCatalogRollbackByKafkaEvent() {
77+
// Given: 보상 이벤트 데이터 (아이템 EST-1, 수량 10개 복구)
78+
Map<String, Object> data = new HashMap<>();
79+
data.put("EST-1", 10);
80+
81+
// 테스트 시작 전 EST-1 의 현재 재고 확인 (데이터로드 SQL 에 의해 10000개로 초기화됨)
82+
Integer initialQuantity = catalogService.getItemQuantity("EST-1");
83+
assertNotNull(initialQuantity);
84+
System.out.println("=== [TEST] Initial DB Quantity for EST-1: " + initialQuantity + " ===");
85+
86+
// When: 실제 Kafka 토픽으로 메시지 전송
87+
System.out.println("=== [TEST] SENDING COMPENSATION EVENT TO EMBEDDED KAFKA ===");
88+
kafkaTemplate.send("product_compensation", data);
89+
kafkaTemplate.flush();
90+
91+
// Then: CatalogController의 @KafkaListener가 동작하여 catalogService.rollbackInventory를 호출하는지 대기
92+
verify(catalogService, timeout(15000).times(1)).rollbackInventory(any());
93+
94+
// Kafka 메시지 처리로 인해 트랜잭션이 커밋되는 시간을 약간 대기 (필요시)
95+
try {
96+
Thread.sleep(500);
97+
} catch (InterruptedException e) {
98+
e.printStackTrace();
99+
}
100+
101+
// DB 재고가 실제로 복구되었는지 확인
102+
Integer updatedQuantity = catalogService.getItemQuantity("EST-1");
103+
System.out.println("=== [TEST] Updated DB Quantity for EST-1: " + updatedQuantity + " ===");
104+
105+
assertEquals(initialQuantity + 10, updatedQuantity);
106+
107+
System.out.println("=== [TEST] VERIFIED: Catalog Service DB Rollback Triggered and Executed by REAL Kafka Event! ===");
108+
}
109+
}

CatalogService/src/test/java/org/mybatis/jpetstore/controller/CatalogControllerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import org.mybatis.jpetstore.catalog.controller.CatalogController;
55
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
7-
import org.springframework.test.context.bean.override.mockito.MockitoBean;
7+
import org.springframework.boot.test.mock.mockito.MockBean;
88
import org.springframework.test.web.servlet.MockMvc;
99
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
1010
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
@@ -23,7 +23,7 @@ class CatalogControllerTest {
2323

2424
@Autowired
2525
private MockMvc mockMvc;
26-
@MockitoBean
26+
@MockBean
2727
private CatalogService catalogService;
2828

2929
// 메인 페이지 요청 시 정상적으로 뷰를 반환한다

DiscoveryServer/Dockerfile

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
FROM gradle:8.4-jdk21 AS build
2-
WORKDIR /workspace
3-
COPY . .
4-
RUN ./gradlew :DiscoveryServer:bootJar -x test --no-daemon
5-
6-
FROM eclipse-temurin:21-jre
7-
WORKDIR /app
8-
COPY --from=build /workspace/DiscoveryServer/build/libs/*.jar app.jar
9-
EXPOSE 8761
10-
ENTRYPOINT ["java","-jar","app.jar"]
1+
FROM eclipse-temurin:17-jdk
2+
ARG WAR_FILE=build/libs/*.war
3+
COPY ${WAR_FILE} app.war
4+
ENTRYPOINT ["java","-jar","/app.war"]

0 commit comments

Comments
 (0)