From f346e1a5be3e4ec444aa7d07043c6b3f65582c34 Mon Sep 17 00:00:00 2001 From: Suxxxxhyun Date: Thu, 20 Feb 2025 14:32:00 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=ED=8C=8C=ED=8B=B0=EC=85=98=EC=9D=84=20?= =?UTF-8?q?=EC=B0=BE=EB=8F=84=EB=A1=9D=20=ED=95=98=EA=B8=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eventhub/config/KafkaProducerConfig.java | 8 +++++-- .../org/eventhub/config/PartitionFinder.java | 23 +++++++++++++++++++ .../KafkaCycleInfoEventPublisher.java | 4 ++++ 3 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java diff --git a/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java b/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java index 6f57f4bf..1ad1d08c 100644 --- a/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java +++ b/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java @@ -4,8 +4,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; - -import org.eventhub.infrastructure.messaging.TypeIdInterceptor; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -47,4 +46,9 @@ public ProducerFactory producerFactory(KafkaProperties kafkaProp public KafkaTemplate kafkaTemplate(KafkaProperties kafkaProperties) { return new KafkaTemplate<>(producerFactory(kafkaProperties)); } + + @Bean + public PartitionFinder finder(@Qualifier("producerFactory") ProducerFactory producerFactory) { + return new PartitionFinder(producerFactory); + } } diff --git a/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java b/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java new file mode 100644 index 00000000..3e3149cd --- /dev/null +++ b/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java @@ -0,0 +1,23 @@ +package org.eventhub.config; + +import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class PartitionFinder { + + private final ProducerFactory producerFactory; + + public String[] partitions(String topic) { + try (Producer producer = producerFactory.createProducer()) { + return producer.partitionsFor(topic).stream() + .map(pi -> "" + pi.partition()) + .toArray(String[]::new); + } + } + +} diff --git a/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java b/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java index 74c1a741..1b58bd94 100644 --- a/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java +++ b/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java @@ -3,6 +3,8 @@ import org.eventhub.application.port.CycleInfoEventPublisher; import org.eventhub.domain.CycleInfoList; import org.eventhub.infrastructure.messaging.command.CycleInfoListCommand; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -19,6 +21,8 @@ public class KafkaCycleInfoEventPublisher implements CycleInfoEventPublisher { @Override @Async + @KafkaListener(topicPartitions = @TopicPartition(topic = "cycleInfo-json-topic", + partitions = "#{@finder.partitions('cycleInfo-json-topic')}")) public void publishEvent(CycleInfoList cycleInfoList) { CycleInfoListCommand message = CycleInfoListCommand.from(cycleInfoList);