From 3e1c4a22591556db58c53f7d79a2d8d393821989 Mon Sep 17 00:00:00 2001 From: namo Date: Tue, 12 Oct 2021 12:26:18 +0900 Subject: [PATCH 1/2] docs: revise README.md - add how to use Kafka CLI by docker image --- SpringCloudStream/words/README.md | 46 ++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/SpringCloudStream/words/README.md b/SpringCloudStream/words/README.md index 6b53130..d8921b0 100644 --- a/SpringCloudStream/words/README.md +++ b/SpringCloudStream/words/README.md @@ -1,4 +1,4 @@ -# drinking +# words * abstract: source -> processor -> sink * spring initializr: @@ -7,29 +7,55 @@ * Spring Web: for generation end-point by web * Lombok: for convenience -## local kafka +## Local kafka start and terminate -* docker-compose up -d -* docker-compose ps -* docker-compose down +```shell +$ docker-compose up -d +$ docker-compose ps + + Name Command State Ports +----------------------------------------------------------------------------------------------------------------------------------- +words_kafka_1 /opt/bitnami/scripts/kafka ... Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp +words_zookeeper_1 /opt/bitnami/scripts/zooke ... Up 0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp + +$ docker-compose down +``` ## source generation -* http://localhost:8080/swagger-ui.html +* http://localhost:5000/swagger-ui.html ## topic monitoring -* ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list +* Use https://kafka.apache.org/downloads or Bitnami Docker's image has script files already. + * location: /opt/bitnami/kafka/bin +```shell +$ docker ps + +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +fa8ec08e2ee5 bitnami/kafka:2 "/opt/bitnami/script…" 11 minutes ago Up 11 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp words_kafka_1 +fc0ba093b001 bitnami/zookeeper:3.7 "/opt/bitnami/script…" 11 minutes ago Up 11 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 8080/tcp words_zookeeper_1 + +$ docker exec -it fa8ec08e2ee5 /bin/bash +$ cd /opt/bitnami/kafka/bin ``` + +### List up kafka consumer groups +```shell +$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list words-upper-group words-console-out-group ``` -* ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group -``` + +### Describe specific consumer groups +```shell +$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID words-upper-group words-source 0 125 125 0 consumer-words-upper-group-2-0966a14e-b83c-4815-be1b-1f88ef296e9c /172.18.0.1 consumer-words-upper-group-2 ``` -``` +```shell +$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group + Consumer group 'words-upper-group' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID From 2b9cf45c50036981b9da143df06908adea520f22 Mon Sep 17 00:00:00 2001 From: namo Date: Tue, 12 Oct 2021 14:48:07 +0900 Subject: [PATCH 2/2] feat(sink): test for exception handling - add actuator - sink to repository for duplication exception --- .../words/error/DuplicationWordException.java | 9 ++++++++ SpringCloudStream/words/sink/build.gradle | 2 ++ .../namocom/words/SinkApplication.java | 19 +++++++++++++-- .../repository/CollectionWordRepository.java | 23 +++++++++++++++++++ .../words/sink/repository/WordRepository.java | 7 ++++++ .../sink/src/main/resources/application.yaml | 21 ++++++++++++++++- .../endpoint/WordGenerationEndpoint.java | 13 +++++++++-- 7 files changed, 89 insertions(+), 5 deletions(-) create mode 100644 SpringCloudStream/words/common/src/main/java/com/tistory/namocom/words/error/DuplicationWordException.java create mode 100644 SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/CollectionWordRepository.java create mode 100644 SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/WordRepository.java diff --git a/SpringCloudStream/words/common/src/main/java/com/tistory/namocom/words/error/DuplicationWordException.java b/SpringCloudStream/words/common/src/main/java/com/tistory/namocom/words/error/DuplicationWordException.java new file mode 100644 index 0000000..c0e6a56 --- /dev/null +++ b/SpringCloudStream/words/common/src/main/java/com/tistory/namocom/words/error/DuplicationWordException.java @@ -0,0 +1,9 @@ +package com.tistory.namocom.words.error; + +import com.tistory.namocom.words.model.Word; + +public class DuplicationWordException extends RuntimeException { + public DuplicationWordException(Word word) { + super(String.format("Word '%s' is already exists with ID: %s", word.getValue(), word.getId())); + } +} diff --git a/SpringCloudStream/words/sink/build.gradle b/SpringCloudStream/words/sink/build.gradle index 024f4b5..e34dea6 100644 --- a/SpringCloudStream/words/sink/build.gradle +++ b/SpringCloudStream/words/sink/build.gradle @@ -7,6 +7,8 @@ plugins { dependencies { implementation project(':common') + implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}" + implementation("org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}") implementation "org.springframework.cloud:spring-cloud-stream:${springCloudStreamVersion}" implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka:${springCloudStreamVersion}" implementation "org.apache.kafka:kafka-streams:${kafkaVersion}" diff --git a/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/SinkApplication.java b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/SinkApplication.java index b2236b7..7f66620 100644 --- a/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/SinkApplication.java +++ b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/SinkApplication.java @@ -1,9 +1,14 @@ package com.tistory.namocom.words; +import com.tistory.namocom.words.error.DuplicationWordException; import com.tistory.namocom.words.model.Word; +import com.tistory.namocom.words.sink.repository.WordRepository; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.StreamRetryTemplate; import org.springframework.context.annotation.Bean; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.retry.support.RetryTemplateBuilder; import java.util.function.Consumer; @@ -15,8 +20,18 @@ public static void main(String[] args) { } @Bean - public Consumer wordToConsole() { - return System.out::println; + public Consumer wordToConsole(WordRepository wordRepository) { + return word -> { + wordRepository.add(word); + System.out.println(word); + }; } +// @StreamRetryTemplate +// public RetryTemplate myRetryTemplate() { +// final RetryTemplateBuilder builder = RetryTemplate.builder(); +// builder.notRetryOn(DuplicationWordException.class); +// return builder.build(); +// } + } diff --git a/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/CollectionWordRepository.java b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/CollectionWordRepository.java new file mode 100644 index 0000000..170ecc5 --- /dev/null +++ b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/CollectionWordRepository.java @@ -0,0 +1,23 @@ +package com.tistory.namocom.words.sink.repository; + +import com.tistory.namocom.words.error.DuplicationWordException; +import com.tistory.namocom.words.model.Word; +import org.springframework.stereotype.Repository; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Repository +public class CollectionWordRepository implements WordRepository { + + private final Map storage = new ConcurrentHashMap<>(); + + @Override + public void add(Word word) { + if (storage.containsKey(word.getId())) { + throw new DuplicationWordException(word); + } + storage.put(word.getId(), word); + } + +} diff --git a/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/WordRepository.java b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/WordRepository.java new file mode 100644 index 0000000..c0d4a28 --- /dev/null +++ b/SpringCloudStream/words/sink/src/main/java/com/tistory/namocom/words/sink/repository/WordRepository.java @@ -0,0 +1,7 @@ +package com.tistory.namocom.words.sink.repository; + +import com.tistory.namocom.words.model.Word; + +public interface WordRepository { + void add(Word word); +} diff --git a/SpringCloudStream/words/sink/src/main/resources/application.yaml b/SpringCloudStream/words/sink/src/main/resources/application.yaml index 4f2e691..b5718b3 100644 --- a/SpringCloudStream/words/sink/src/main/resources/application.yaml +++ b/SpringCloudStream/words/sink/src/main/resources/application.yaml @@ -1,5 +1,5 @@ server: - port: 5001 + port: 5002 spring: cloud: function: @@ -10,3 +10,22 @@ spring: binder: kafka destination: words-upper group: words-console-out-group + consumer: +# retry-template-name: myRetryTemplate +# max-attempts: 1 # disable retry +# default-retryable: false + retryable-exceptions: + com.tistory.namocom.words.error.DuplicationWordException: false + +management: + endpoint: + health: + show-details: ALWAYS + endpoints: + web: + base-path: / + exposure: + include: health, info, metrics, bindings + health: + binders: + enabled: true diff --git a/SpringCloudStream/words/source/src/main/java/com/tistory/namocom/words/source/endpoint/WordGenerationEndpoint.java b/SpringCloudStream/words/source/src/main/java/com/tistory/namocom/words/source/endpoint/WordGenerationEndpoint.java index e5f5a9f..a4d8dd9 100644 --- a/SpringCloudStream/words/source/src/main/java/com/tistory/namocom/words/source/endpoint/WordGenerationEndpoint.java +++ b/SpringCloudStream/words/source/src/main/java/com/tistory/namocom/words/source/endpoint/WordGenerationEndpoint.java @@ -2,6 +2,7 @@ import com.tistory.namocom.words.model.Word; import com.tistory.namocom.words.source.queue.WordQueue; +import org.springframework.lang.Nullable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -18,10 +19,18 @@ public WordGenerationEndpoint(WordQueue wordQueue) { } @PostMapping(path = "/word") - public Word generateWord(@RequestParam String text) { - final Word word = new Word(UUID.randomUUID().toString(), text); + public Word generateWord(@RequestParam(required = false) @Nullable String id, @RequestParam String text) { + final String wordId = generateIfNotSupported(id); + final Word word = new Word(wordId, text); wordQueue.push(word); return word; } + private String generateIfNotSupported(@Nullable String parameterId) { + if (parameterId == null) { + return UUID.randomUUID().toString(); + } + return parameterId; + } + }