Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CourseWork/Chart/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
plugins {
id 'java'
id 'org.springframework.boot' version '2.3.1.RELEASE'
}

group 'ru.otus'
version '1.0'
sourceCompatibility = JavaVersion.VERSION_14
targetCompatibility = JavaVersion.VERSION_14

repositories {
mavenCentral()
}

dependencies {
compile project(':CourseWork:MessageSystem')
// testCompile group: 'junit', name: 'junit', version: '4.12'
testImplementation("org.junit.jupiter:junit-jupiter-engine")
testImplementation("org.junit.jupiter:junit-jupiter-params")

implementation 'org.springframework.boot:spring-boot-starter'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.6.4'

implementation('org.springframework.boot:spring-boot-starter-web')
implementation('org.springframework.boot:spring-boot-starter-websocket')

compile group: 'org.webjars', name: 'sockjs-client', version: '1.1.2'
compile group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3-1'
compile group: 'org.webjars', name: 'bootstrap', version: '4.5.3'
compile group: 'org.webjars', name: 'webjars-locator', version: '0.40'

implementation("ch.qos.logback:logback-classic")
implementation("com.google.code.gson:gson:${Versions.gson}")
}
11 changes: 11 additions & 0 deletions CourseWork/Chart/src/main/java/ru/otus/chart/ChartMain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.otus.chart;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ChartMain {
public static void main(String[] args) {
SpringApplication.run(ChartMain.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ru.otus.chart.configurations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import ru.otus.chart.core.model.ModuleOperationsData;
import ru.otus.chart.core.utils.MessageDeserializer;
import ru.otus.messagesystem.message.Message;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

@Component
@PropertySource("classpath:/kafka.properties")
public class ChartKafkaConsumerConfig {

private static final Logger logger = LoggerFactory.getLogger(ChartKafkaConsumerConfig.class);
private final String kafkaServer;
private final String kafkaTopic;
private final String kafkaGroupId;
private final SimpMessagingTemplate template;

private final HashMap<String, ModuleOperationsData> modularStatistics = new HashMap<>();

public ChartKafkaConsumerConfig(
@Value("${kafka.server}") String kafkaServer,
@Value("${kafka.topic}") String kafkaTopic,
@Value("${kafka.chart.group.id}") String kafkaGroupId,
@Autowired SimpMessagingTemplate template
) {
this.kafkaServer = kafkaServer;
this.kafkaTopic = kafkaTopic;
this.kafkaGroupId = kafkaGroupId;
this.template = template;
}

public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // "latest"
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}

public ConsumerFactory<String, Message> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

public ConcurrentMessageListenerContainer ChartKafkaListener(
Consumer<Object> consumer
) {
ContainerProperties containerProperties = new ContainerProperties(this.kafkaTopic);
containerProperties.setMessageListener(
(MessageListener<String, Object>) record -> {
Object msg = record.value();
consumer.accept(msg);
});

ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
consumerFactory(),
containerProperties
);

return container;
}

void processOperation(Message msg) {
ModuleOperationsData module;
if(modularStatistics.containsKey(msg.getFrom())){
module = modularStatistics.get(msg.getFrom());
} else {
module = new ModuleOperationsData(msg.getFrom());
modularStatistics.put(msg.getFrom(), module);
}
Map<String,Integer> operations = module.getChartData();
String key = msg.getType().getName();
if(operations.containsKey(key)) {
operations.replace(key,operations.get(key) + 1);
} else {
operations.put(key,1);
}
}

@Bean("ChartListener")
public ConcurrentMessageListenerContainer getListener() {
Consumer<Object> messageConsumer = msg -> {
if(msg instanceof Message) {
processOperation((Message)msg);
Collection<ModuleOperationsData> collection = modularStatistics.values();
ModuleOperationsData[] moduleOperationsDataArr = collection.toArray(new ModuleOperationsData[collection.size()]);
this.template.convertAndSend("/topic/chartdata", moduleOperationsDataArr);
}
};

ConcurrentMessageListenerContainer listener = ChartKafkaListener(messageConsumer);
return listener;
}

public ModuleOperationsData[] getModularStatistics() {
// Map<String, Integer> mapFront1 = Stream.of(new Object[][] {
// { "FilteredUsersData", 8 },
// { "SaveUser", 2 }
// }).collect(Collectors.toMap(data -> (String) data[0], data -> (Integer) data[1]));
// modularStatistics.put("Frontend1", new ModuleOperationsData("Frontend1", mapFront1));
Collection<ModuleOperationsData> collection = modularStatistics.values();
ModuleOperationsData[] moduleOperationsDataArr = collection.toArray(new ModuleOperationsData[collection.size()]);
return moduleOperationsDataArr;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.otus.chart.configurations;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;

// Примечание: вызывается после инициализации всех бинов
@Component
public class ListenerStarter implements ApplicationListener<ContextRefreshedEvent> {

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext ctx = event.getApplicationContext();
ConcurrentMessageListenerContainer chartListener = (ConcurrentMessageListenerContainer)ctx.getBean("ChartListener");
chartListener.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ru.otus.chart.configurations;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/chartdata");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/gs-chart-websocket").withSockJS();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.otus.chart.controllers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import ru.otus.chart.configurations.ChartKafkaConsumerConfig;
import ru.otus.chart.core.model.ModuleOperationsData;

@Controller
public class ChartController {
private static final Logger logger = LoggerFactory.getLogger(ChartController.class);
private final SimpMessagingTemplate template;
private final ChartKafkaConsumerConfig chartKafkaConsumerConfig;

public ChartController(SimpMessagingTemplate template, ChartKafkaConsumerConfig chartKafkaConsumerConfig) {
this.template = template;
this.chartKafkaConsumerConfig = chartKafkaConsumerConfig;
}

@MessageMapping("/askchartdata")
public void getChartData() {
logger.info("askchartdata()");
ModuleOperationsData[] statistics = chartKafkaConsumerConfig.getModularStatistics();
this.template.convertAndSend("/topic/chartdata", statistics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ru.otus.chart.core.model;

import java.util.HashMap;
import java.util.Map;

public class ModuleOperationsData {
private String moduleName;
Map<String,Integer> chartData;

public ModuleOperationsData(String moduleName) {
this.moduleName = moduleName;
this.chartData = new HashMap<>();
}

public ModuleOperationsData(String moduleName, Map<String, Integer> chartData) {
this.moduleName = moduleName;
this.chartData = chartData;
}

public String getModuleName() {
return moduleName;
}

public void setModuleName(String moduleName) {
this.moduleName = moduleName;
}

public Map<String, Integer> getChartData() {
return chartData;
}

public void setChartData(Map<String, Integer> chartData) {
this.chartData = chartData;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ru.otus.chart.core.utils;

import org.apache.kafka.common.serialization.Deserializer;
import ru.otus.chart.core.utils.exceptions.SerializerError;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;

public class MessageDeserializer implements Deserializer {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public void close() {
}

@Override
public Object deserialize(String topic, byte[] data) {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream is = new ObjectInputStream(bis)) {
return is.readObject();
} catch (Exception e) {
throw new SerializerError("DeSerialization error", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.otus.chart.core.utils.exceptions;

public class SerializerError extends RuntimeException {

public SerializerError(String message, Throwable cause) {
super(message, cause);
}
}
3 changes: 3 additions & 0 deletions CourseWork/Chart/src/main/resources/kafka.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kafka.server=localhost:9092
kafka.topic=servicebus
kafka.chart.group.id=chart.group.id
7 changes: 7 additions & 0 deletions CourseWork/Chart/src/main/resources/static/Chart.min.js

Large diffs are not rendered by default.

Loading