diff --git a/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java b/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java index 6f57f4bf..1ad1d08c 100644 --- a/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java +++ b/monicar-event-hub/src/main/java/org/eventhub/config/KafkaProducerConfig.java @@ -4,8 +4,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; - -import org.eventhub.infrastructure.messaging.TypeIdInterceptor; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -47,4 +46,9 @@ public ProducerFactory producerFactory(KafkaProperties kafkaProp public KafkaTemplate kafkaTemplate(KafkaProperties kafkaProperties) { return new KafkaTemplate<>(producerFactory(kafkaProperties)); } + + @Bean + public PartitionFinder finder(@Qualifier("producerFactory") ProducerFactory producerFactory) { + return new PartitionFinder(producerFactory); + } } diff --git a/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java b/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java new file mode 100644 index 00000000..3e3149cd --- /dev/null +++ b/monicar-event-hub/src/main/java/org/eventhub/config/PartitionFinder.java @@ -0,0 +1,23 @@ +package org.eventhub.config; + +import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; + +@Component +@RequiredArgsConstructor +public class PartitionFinder { + + private final ProducerFactory producerFactory; + + public String[] partitions(String topic) { + try (Producer producer = producerFactory.createProducer()) { + return producer.partitionsFor(topic).stream() + .map(pi -> "" + pi.partition()) + .toArray(String[]::new); + } + } + +} diff --git a/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java b/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java index 74c1a741..1b58bd94 100644 --- a/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java +++ b/monicar-event-hub/src/main/java/org/eventhub/infrastructure/messaging/KafkaCycleInfoEventPublisher.java @@ -3,6 +3,8 @@ import org.eventhub.application.port.CycleInfoEventPublisher; import org.eventhub.domain.CycleInfoList; import org.eventhub.infrastructure.messaging.command.CycleInfoListCommand; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -19,6 +21,8 @@ public class KafkaCycleInfoEventPublisher implements CycleInfoEventPublisher { @Override @Async + @KafkaListener(topicPartitions = @TopicPartition(topic = "cycleInfo-json-topic", + partitions = "#{@finder.partitions('cycleInfo-json-topic')}")) public void publishEvent(CycleInfoList cycleInfoList) { CycleInfoListCommand message = CycleInfoListCommand.from(cycleInfoList);