Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions SpringCloudStream/words/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# drinking
# words

* abstract: source -> processor -> sink
* spring initializr:
Expand All @@ -7,29 +7,55 @@
* Spring Web: for generation end-point by web
* Lombok: for convenience

## local kafka
## Local kafka start and terminate

* docker-compose up -d
* docker-compose ps
* docker-compose down
```shell
$ docker-compose up -d
$ docker-compose ps

Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------
words_kafka_1 /opt/bitnami/scripts/kafka ... Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp
words_zookeeper_1 /opt/bitnami/scripts/zooke ... Up 0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp

$ docker-compose down
```

## source generation

* http://localhost:8080/swagger-ui.html
* http://localhost:5000/swagger-ui.html

## topic monitoring

* ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
* Use https://kafka.apache.org/downloads or Bitnami Docker's image has script files already.
* location: /opt/bitnami/kafka/bin
```shell
$ docker ps

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
fa8ec08e2ee5 bitnami/kafka:2 "/opt/bitnami/script…" 11 minutes ago Up 11 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp words_kafka_1
fc0ba093b001 bitnami/zookeeper:3.7 "/opt/bitnami/script…" 11 minutes ago Up 11 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 8080/tcp words_zookeeper_1

$ docker exec -it fa8ec08e2ee5 /bin/bash
$ cd /opt/bitnami/kafka/bin
```

### List up kafka consumer groups
```shell
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
words-upper-group
words-console-out-group
```
* ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group
```

### Describe specific consumer groups
```shell
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
words-upper-group words-source 0 125 125 0 consumer-words-upper-group-2-0966a14e-b83c-4815-be1b-1f88ef296e9c /172.18.0.1 consumer-words-upper-group-2
```
```
```shell
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group words-upper-group

Consumer group 'words-upper-group' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.tistory.namocom.words.error;

import com.tistory.namocom.words.model.Word;

public class DuplicationWordException extends RuntimeException {
public DuplicationWordException(Word word) {
super(String.format("Word '%s' is already exists with ID: %s", word.getValue(), word.getId()));
}
}
2 changes: 2 additions & 0 deletions SpringCloudStream/words/sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ plugins {
dependencies {
implementation project(':common')

implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation("org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}")
implementation "org.springframework.cloud:spring-cloud-stream:${springCloudStreamVersion}"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka:${springCloudStreamVersion}"
implementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.tistory.namocom.words;

import com.tistory.namocom.words.error.DuplicationWordException;
import com.tistory.namocom.words.model.Word;
import com.tistory.namocom.words.sink.repository.WordRepository;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.retry.support.RetryTemplateBuilder;

import java.util.function.Consumer;

Expand All @@ -15,8 +20,18 @@ public static void main(String[] args) {
}

@Bean
public Consumer<Word> wordToConsole() {
return System.out::println;
public Consumer<Word> wordToConsole(WordRepository wordRepository) {
return word -> {
wordRepository.add(word);
System.out.println(word);
};
}

// @StreamRetryTemplate
// public RetryTemplate myRetryTemplate() {
// final RetryTemplateBuilder builder = RetryTemplate.builder();
// builder.notRetryOn(DuplicationWordException.class);
// return builder.build();
// }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.tistory.namocom.words.sink.repository;

import com.tistory.namocom.words.error.DuplicationWordException;
import com.tistory.namocom.words.model.Word;
import org.springframework.stereotype.Repository;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Repository
public class CollectionWordRepository implements WordRepository {

private final Map<String, Word> storage = new ConcurrentHashMap<>();

@Override
public void add(Word word) {
if (storage.containsKey(word.getId())) {
throw new DuplicationWordException(word);
}
storage.put(word.getId(), word);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.tistory.namocom.words.sink.repository;

import com.tistory.namocom.words.model.Word;

public interface WordRepository {
void add(Word word);
}
21 changes: 20 additions & 1 deletion SpringCloudStream/words/sink/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
server:
port: 5001
port: 5002
spring:
cloud:
function:
Expand All @@ -10,3 +10,22 @@ spring:
binder: kafka
destination: words-upper
group: words-console-out-group
consumer:
# retry-template-name: myRetryTemplate
# max-attempts: 1 # disable retry
# default-retryable: false
retryable-exceptions:
com.tistory.namocom.words.error.DuplicationWordException: false

management:
endpoint:
health:
show-details: ALWAYS
endpoints:
web:
base-path: /
exposure:
include: health, info, metrics, bindings
health:
binders:
enabled: true
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.tistory.namocom.words.model.Word;
import com.tistory.namocom.words.source.queue.WordQueue;
import org.springframework.lang.Nullable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -18,10 +19,18 @@ public WordGenerationEndpoint(WordQueue wordQueue) {
}

@PostMapping(path = "/word")
public Word generateWord(@RequestParam String text) {
final Word word = new Word(UUID.randomUUID().toString(), text);
public Word generateWord(@RequestParam(required = false) @Nullable String id, @RequestParam String text) {
final String wordId = generateIfNotSupported(id);
final Word word = new Word(wordId, text);
wordQueue.push(word);
return word;
}

private String generateIfNotSupported(@Nullable String parameterId) {
if (parameterId == null) {
return UUID.randomUUID().toString();
}
return parameterId;
}

}