diff --git a/Dockerfile b/Dockerfile
index 76cecc5..110287e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,7 +1,5 @@
-FROM adoptopenjdk/openjdk11:jdk-11.0.2.9-slim
+FROM adoptopenjdk/openjdk11:jre-11.0.8_10-alpine
WORKDIR /opt
-ENV PORT 8080
-EXPOSE 8080
COPY kafka.client.truststore.jks /opt/kafka.client.truststore.jks
COPY target/*.jar /opt/app.jar
-ENTRYPOINT exec java $JAVA_OPTS -jar app.jar
\ No newline at end of file
+ENTRYPOINT exec java $JAVA_OPTS -jar app.jar --spring.profiles.active=aws --spring.config.location=/tmp/
\ No newline at end of file
diff --git a/docker-compose.yaml b/docker-compose.yaml
deleted file mode 100644
index de568b3..0000000
--- a/docker-compose.yaml
+++ /dev/null
@@ -1,42 +0,0 @@
-version: "3.8"
-services:
- kafka:
- image: lensesio/fast-data-dev
- restart: unless-stopped
- environment:
- - ADV_HOST=240.12.0.2
- - RUNNING_SAMPLEDATA=0
- - RUNTESTS=0
- - CONNECT_PORT=0
- networks:
- kafka:
- ipv4_address: 240.12.0.2
-
- elasticsearch:
- image: elasticsearch:7.7.1
- network_mode: "service:kafka"
- restart: unless-stopped
- environment:
- - discovery.type=single-node
-
- grafana:
- image: grafana/grafana
- network_mode: "service:kafka"
- restart: unless-stopped
-
- prometheus:
- image: prom/prometheus
- network_mode: "service:kafka"
- restart: unless-stopped
- command: --config.file=/etc/prometheus/prometheus.yml
- volumes:
- - ${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml
-
-networks:
- public:
- external: true
- kafka:
- driver: bridge
- ipam:
- config:
- - subnet: 240.12.0.0/24
\ No newline at end of file
diff --git a/load.yaml b/load.yaml
new file mode 100644
index 0000000..a01e901
--- /dev/null
+++ b/load.yaml
@@ -0,0 +1,54 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+ generateName: load-
+ labels:
+ app: load
+ namespace: load
+spec:
+ parallelism: 1
+ template:
+ spec:
+ serviceAccountName: statement-demo
+ containers:
+ - name: load
+ image: 837108680928.dkr.ecr.us-east-1.amazonaws.com/load:latest
+ imagePullPolicy: Always
+ volumeMounts:
+ - mountPath: /tmp/application-aws.properties
+ name: config
+ subPath: application-aws.properties
+ resources:
+ limits:
+ memory: "1Gi"
+ cpu: 1
+ requests:
+ memory: "1Gi"
+ cpu: 1
+ env:
+ - name: KAFKA_BOOTSTRAP
+ valueFrom:
+ configMapKeyRef:
+ name: kafka
+ key: bootstrap
+ - name: TOTAL_CONTAS
+ value: "4800000"
+ #value: "38709660" #
+ - name: DIAS_POR_CONTA
+ value: "213"
+ - name: ULTIMO_DIA
+ value: "2020-05-31"
+ - name: ID_PREVISIVEL
+ value: "true"
+ - name: ID_INICIAL
+ value: "0"
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ volumes:
+ - configMap:
+ name: application.properties
+ name: config
+ restartPolicy: Never
+ backoffLimit: 0
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 797d757..a3ff34f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,13 +15,14 @@
Demo project for Spring Boot
- 1.8
+ 11
+ 1.11.887
org.springframework.boot
- spring-boot-starter-actuator
+ spring-boot-starter
@@ -49,40 +50,66 @@
1.0.2
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
+ io.micrometer
+ micrometer-core
+ 1.5.5
- org.springframework.kafka
- spring-kafka-test
- test
+ com.amazonaws
+ aws-java-sdk-bom
+ ${aws.sdk.version}
+ pom
+
+
+ com.amazonaws
+ aws-java-sdk-sts
+ ${aws.sdk.version}
+
+
+ com.amazonaws
+ aws-java-sdk-cloudwatch
+ ${aws.sdk.version}
+
+
+ org.springframework.cloud
+ spring-cloud-starter-aws
+ 2.2.4.RELEASE
+
+
+ io.micrometer
+ micrometer-registry-cloudwatch
+ 1.5.5
-
- junit
- junit
- test
-
- junit
- junit
- test
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.11.3
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
-
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ **/application*.properties
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
diff --git a/prometheus.yml b/prometheus.yml
deleted file mode 100644
index f19aa03..0000000
--- a/prometheus.yml
+++ /dev/null
@@ -1,26 +0,0 @@
-# my global config
-global:
- scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
- evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
- # scrape_timeout is set to the global default (10s).
-
-# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
-rule_files:
- # - "first_rules.yml"
- # - "second_rules.yml"
-
-# A scrape configuration containing exactly one endpoint to scrape:
-# Here it's Prometheus itself.
-scrape_configs:
- # The job name is added as a label `job=` to any timeseries scraped from this config.
- - job_name: 'prometheus'
- # metrics_path defaults to '/metrics'
- # scheme defaults to 'http'.
- static_configs:
- - targets: ['127.0.0.1:9090']
-
- - job_name: 'spring-actuator'
- metrics_path: '/actuator/prometheus'
- scrape_interval: 5s
- static_configs:
- - targets: ['192.168.100.123:8080']
diff --git a/src/main/java/br/com/exemplo/dataingestion/LoadApplication.java b/src/main/java/br/com/exemplo/dataingestion/LoadApplication.java
index feafe0e..4075b18 100644
--- a/src/main/java/br/com/exemplo/dataingestion/LoadApplication.java
+++ b/src/main/java/br/com/exemplo/dataingestion/LoadApplication.java
@@ -1,12 +1,17 @@
package br.com.exemplo.dataingestion;
-import br.com.exemplo.dataingestion.adapters.controllers.servers.GenerateLoadController;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import java.time.LocalDate;
+
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.format.annotation.DateTimeFormat;
+import org.springframework.format.annotation.DateTimeFormat.ISO;
+
+import br.com.exemplo.dataingestion.adapters.controllers.servers.GenerateLoadController;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
@SpringBootApplication
@RequiredArgsConstructor
@Slf4j
@@ -18,6 +23,16 @@ public class LoadApplication implements CommandLineRunner {
@Value("${dias.total:90}")
private int dias;
+ @DateTimeFormat(iso = ISO.DATE)
+ @Value("${dias.ultimo}")
+ private LocalDate termino;
+
+ @Value("${contas.id.previsiviel:false}")
+ private boolean idPrevisivel;
+
+ @Value("${contas.id.inicial:0}")
+ private int idInicial;
+
private final GenerateLoadController generateLoadController;
public static void main(String[] args) {
@@ -26,8 +41,8 @@ public static void main(String[] args) {
@Override
public void run(String... args) throws Exception {
- log.info("Iniciando a produção de {} com {} contas e com {} dias retroativos",contas,dias);
- generateLoadController.geraEvento(contas,dias);
+ log.info("Iniciando a produção de {} contas com {} dias retroativos à partir de {}", contas, dias, termino);
+ generateLoadController.geraEvento(contas, dias, termino, idPrevisivel, idInicial);
log.info("Liberando comando da aplicação");
}
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
index 273191f..9796605 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/beans/KafkaConfig.java
@@ -1,24 +1,21 @@
package br.com.exemplo.dataingestion.adapters.beans;
-import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
@Configuration
public class KafkaConfig {
@Bean
@Scope(value = "prototype")
public KafkaTemplate kafkaTemplate1(ProducerFactory producerFactory){
- Map map = new HashMap<>();
- map.put(ProducerConfig.CLIENT_ID_CONFIG,UUID.randomUUID().toString());
- return new KafkaTemplate(producerFactory,false,map);
+ // Map map = new HashMap<>();
+ // map.put(ProducerConfig.CLIENT_ID_CONFIG,UUID.randomUUID().toString());
+ KafkaTemplate template = new KafkaTemplate (producerFactory,false);
+ return template;
}
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java
index 5afb699..c6cf51b 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/controllers/servers/GenerateLoadController.java
@@ -1,17 +1,6 @@
package br.com.exemplo.dataingestion.adapters.controllers.servers;
-import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
-import br.com.exemplo.dataingestion.domain.producer.ProducerService;
-import io.micrometer.core.instrument.MeterRegistry;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
+import java.time.LocalDate;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -19,6 +8,19 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
+import br.com.exemplo.dataingestion.domain.producer.ProducerService;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
@Component
@RequiredArgsConstructor
@Slf4j
@@ -28,7 +30,6 @@ public class GenerateLoadController {
private ExecutorService executorService;
private final List producerServiceList;
- private final MeterRegistry simpleMeterRegistry;
@Value("${processamento.threads.producao:10}")
private int numeroThreadsProducao;
@@ -37,7 +38,7 @@ public class GenerateLoadController {
public void constroiProducer()
{
this.executorService = Executors.newFixedThreadPool(numeroThreadsProducao);
- log.debug("Inicializando produtores");
+ log.debug("Inicializando {} produtores", numeroThreadsProducao);
for(int i=0;i {
log.info("Inicializando thread {} com {} registros",Thread.currentThread().getId(),numeroItensThread.get());
- createConta(0,numeroItensThread.get(),producerService,qtdDias);
+ createConta(idInicial, numeroItensThread.get(),producerService, qtdDias, termino, idPrevisivel);
log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get());
});
}
@@ -70,7 +71,7 @@ public void geraEvento(int qtdConta, int qtdDias)
}
executorService.execute(() -> {
log.info("Inicializando thread {} com {} registros",Thread.currentThread().getId(),numeroItensThread.get());
- createConta(inicial,numeroItensThread.get(),producerService,qtdDias);
+ createConta(inicial, numeroItensThread.get(),producerService, qtdDias, termino, idPrevisivel);
log.info("Finalizando Thread thread {} ",Thread.currentThread().getId(),numeroItensThread.get());
});
}
@@ -83,11 +84,21 @@ private UUID getIdConta(int numeroConta)
{
return UUID.nameUUIDFromBytes(StringUtils.leftPad(String.valueOf(numeroConta),12,'0').getBytes());
}
- private void createConta(int inicial, int quantidadeContas, ProducerService producerService,int qtdDias)
+ private void createConta(int inicial, int quantidadeContas, ProducerService producerService, int qtdDias, LocalDate termino, boolean idPrevisivel)
{
for (int j = inicial;j<(inicial+quantidadeContas);j++)
{
- producerService.produce(LoadEntity.builder().idConta(getIdConta(j)).quantidadeDias(qtdDias).build());
+ producerService.produce(
+ LoadEntity
+ .builder()
+ .idConta(
+ idPrevisivel?
+ getIdConta(j):
+ UUID.randomUUID()
+ )
+ .quantidadeDias(qtdDias)
+ .dataFim(termino)
+ .build());
}
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java
index 563e906..29e2900 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/entities/LoadEntity.java
@@ -1,9 +1,15 @@
package br.com.exemplo.dataingestion.adapters.events.entities;
-import lombok.*;
-
+import java.time.LocalDate;
import java.util.UUID;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
@Getter
@Setter
@NoArgsConstructor
@@ -13,4 +19,5 @@
public class LoadEntity {
private UUID idConta;
private int quantidadeDias;
+ private LocalDate dataFim;
}
diff --git a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
index 518d45b..5b971f0 100644
--- a/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
+++ b/src/main/java/br/com/exemplo/dataingestion/adapters/events/producers/ProducerServiceImpl.java
@@ -1,17 +1,19 @@
package br.com.exemplo.dataingestion.adapters.events.producers;
-import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
-import br.com.exemplo.dataingestion.domain.producer.ProducerService;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-import lombok.RequiredArgsConstructor;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
-@RequiredArgsConstructor
+import br.com.exemplo.dataingestion.adapters.events.entities.LoadEntity;
+import br.com.exemplo.dataingestion.domain.producer.ProducerService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
@Component
@Scope(value = "prototype")
public class ProducerServiceImpl implements ProducerService {
@@ -19,14 +21,18 @@ public class ProducerServiceImpl implements ProducerService {
@Value("${data.ingestion.producer.topic}")
private String producerTopic;
- private final KafkaTemplate kafkaTemplate;
- private final MeterRegistry simpleMeterRegistry;
+ AtomicInteger records = new AtomicInteger(0);
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
@Override
public void produce(LoadEntity loadEntity) {
- simpleMeterRegistry.counter("kafka.contador","type","producao","thread",String.valueOf(Thread.currentThread().getId())).increment();
- Timer.Sample sample = Timer.start(simpleMeterRegistry);
- ProducerRecord producerRecord = new ProducerRecord(producerTopic, loadEntity);
- sample.stop(simpleMeterRegistry.timer("kafka.time","type","producao","thread",String.valueOf(Thread.currentThread().getId())));
+ ProducerRecord producerRecord = new ProducerRecord(producerTopic, loadEntity);
kafkaTemplate.send(producerRecord);
+ log.info(
+ "Records so far: {}",
+ records.incrementAndGet()
+ );
}
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application-aws.properties
similarity index 59%
rename from src/main/resources/application.properties
rename to src/main/resources/application-aws.properties
index 0da025d..0eace9e 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application-aws.properties
@@ -1,17 +1,22 @@
-server.port=8080
spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks
spring.kafka.producer.ssl.protocol=SSL
spring.kafka.properties.security.protocol=SSL
-data.ingestion.producer.topic=${KAFKA_TOPIC}
-management.endpoints.web.exposure.include=info,health,prometheus,metrics
-logging.dir=logs
-logging.filename=application.log
+data.ingestion.producer.topic=load
+
+spring.jackson.serialization.write-dates-as-timestamps=true
+logging.level.com.amazonaws.util.EC2MetadataUtils=error
+
processamento.threads.geracao.massa=10
processamento.threads.producao=10
-contas.total=${CONTAS}
-dias.total=${DIAS}
\ No newline at end of file
+contas.total=${TOTAL_CONTAS}
+dias.total=${DIAS_POR_CONTA}
+dias.ultimo=${ULTIMO_DIA}
+contas.id.previsiviel=${ID_PREVISIVEL}
+contas.id.inicial=${ID_INICIAL}
+
+cloud.aws.stack.auto=false
\ No newline at end of file
diff --git a/src/main/resources/application-local.properties b/src/main/resources/application-local.properties
new file mode 100644
index 0000000..432fbf1
--- /dev/null
+++ b/src/main/resources/application-local.properties
@@ -0,0 +1,21 @@
+spring.kafka.producer.bootstrap-servers=240.12.0.2:9092
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
+data.ingestion.producer.topic=load
+
+spring.jackson.serialization.write-dates-as-timestamps=true
+logging.level.com.amazonaws.util.EC2MetadataUtils=error
+
+processamento.threads.geracao.massa=10
+processamento.threads.producao=10
+
+contas.total=9
+dias.total=1
+dias.ultimo=2020-10-30
+contas.id.previsiviel=true
+contas.id.inicial=1
+
+cloud.aws.stack.auto=false
+cloud.aws.credentials.profileName=default
+cloud.aws.region.auto=false
+cloud.aws.region.static=us-east-1
\ No newline at end of file
diff --git a/src/main/resources/application-remote.properties b/src/main/resources/application-remote.properties
new file mode 100644
index 0000000..cfba7e8
--- /dev/null
+++ b/src/main/resources/application-remote.properties
@@ -0,0 +1,25 @@
+spring.kafka.producer.bootstrap-servers=b-7.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-9.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094,b-8.statement-kafka.99d6fi.c10.kafka.us-east-1.amazonaws.com:9094
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
+spring.kafka.producer.ssl.trust-store-location=file:kafka.client.truststore.jks
+spring.kafka.producer.ssl.protocol=SSL
+spring.kafka.properties.security.protocol=SSL
+
+data.ingestion.producer.topic=load
+
+spring.jackson.serialization.write-dates-as-timestamps=true
+logging.level.com.amazonaws.util.EC2MetadataUtils=error
+
+processamento.threads.geracao.massa=10
+processamento.threads.producao=10
+
+contas.total=5000
+dias.total=365
+dias.ultimo=2020-10-30
+contas.id.previsiviel=true
+contas.id.inicial=0
+
+cloud.aws.stack.auto=false
+cloud.aws.credentials.profileName=default
+cloud.aws.region.auto=false
+cloud.aws.region.static=us-east-1
\ No newline at end of file