Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ This access is not required for the workshop labs. You can skip this section.

SSH access is only required if you need to troubleshoot issues or want to poke around your clusters. The procedure to connect via SSH depends on the type of computer you're using:

==== SSH into the cluster from the Web UI

From the registration link, you can click on the link at the right side to connect to the cluster from a web based SSH client with the credential centos/supersecret1.

==== SSH into the cluster from Linux/Macos

From the registration link, download the PEM key required to access to your cluster with SSH. Run the following command:
Expand Down Expand Up @@ -87,6 +83,8 @@ Pick your lab and let's get started!

* link:streaming.adoc[From Edge to Streams Processing]
* link:spark_analytics.adoc[Spark and Fast Analytics with Kudu]
* link:streaming_lite.adoc[From mqtt to Streams Processing]
* link:streaming_flink.adoc[Streams Processing with Flink]
* link:datascience.adoc[CDSW Experiments and Models]

== Resources
Expand Down
Binary file added images/FLINk_running_jobs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/Kafka_topic_simulation_sum.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/SMM_topic_simulation_sum.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/YARN_Application.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/YARN_Application_details.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/add_ConsumeMQTT_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/add_updateattribute_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/additional_controller_services_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/create_pgroup_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/flink_ssl_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/from_kafka_to_kudu_flow_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/iot_streamingFlinkDataflowCount.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/iot_streamingFlinkDataflowFilter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/kafka_success_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/kudu_success_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/publishKafka_flow_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/table_select_lite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
159 changes: 159 additions & 0 deletions sample_app_flink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.github.zBrainiac.edge2ailab</groupId>
<artifactId>edge2ailab</artifactId>
<version>0.2.1</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12</scala.version>
<flink.version>1.9.0</flink.version>
<kafka.version>2.3.0</kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Zookeeper -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>

<!--JSON-->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.version}</artifactId>
<version>2.9.9</version>
</dependency>

<!--Flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>



<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>




</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>FlinkConsumer.iotConsumer</Main-Class>
</manifestEntries>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
115 changes: 115 additions & 0 deletions sample_app_flink/src/main/java/FlinkConsumer/iotConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package FlinkConsumer;

import commons.Commons;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
* @author Marcel Daeppen
* @version 2019/09/16 15:08
*/
@Slf4j
public class iotConsumer {
public static void main(String[] args) throws Exception {

// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Commons.GROUP_ID_CONFIG);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

// get iot stream from kafka - topic "iot"
DataStream<String> iotStream = env.addSource(
new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties));

iotStream.print("input message: ");

// split and sum on 'sensor_id'
DataStream<Tuple5<Long, Integer, Integer, Integer, Integer>> aggStream = iotStream
.flatMap(new trxJSONDeserializer())
.keyBy(1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(4) ;

// write the aggregated data stream to a Kafka sink
aggStream.addSink(new FlinkKafkaProducer<>(
Commons.EXAMPLE_KAFKA_SERVER,
"simulation_sum",
new serializeSum2String()));

// execute program
JobExecutionResult result = env.execute("myFistFlinkApp");
JobID jobId = result.getJobID();
System.out.println("jobId=" + jobId);
}

public static class trxJSONDeserializer implements FlatMapFunction<String, Tuple5<Long, Integer, Integer, Integer, Integer>> {
private transient ObjectMapper jsonParser;

@Override
public void flatMap(String value, Collector<Tuple5<Long, Integer, Integer, Integer, Integer>> out) throws Exception {
if (jsonParser == null) {
jsonParser = new ObjectMapper();
}
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);

// get sensor_ts, sensor_id, sensor_0 AND sensor_1 from JSONObject
Long sensor_ts = jsonNode.get("sensor_ts").asLong();
Integer sensor_id = jsonNode.get("sensor_id").asInt();
Integer sensor_0 = jsonNode.get("sensor_0").asInt();
Integer sensor_1 = jsonNode.get("sensor_1").asInt();
out.collect(new Tuple5<>(sensor_ts, sensor_id, sensor_0, sensor_1, 1));

}

}


private static class serializeSum2String implements KeyedSerializationSchema<Tuple5<Long, Integer, Integer, Integer, Integer>> {
@Override
public byte[] serializeKey(Tuple5 element) {
return (null);
}
@Override
public byte[] serializeValue(Tuple5 value) {

String str = "{"
+ "\"type\"" + ":" +"\"Sum over 10 sec window\""
+ "," + "\"sensor_ts_start\"" + ":" + value.getField(0).toString()
+ "," + "\"sensor_id\"" + ":" + value.getField(1).toString()
+ "," + "\"sensor_0\"" + ":" + value.getField(2).toString()
+ "," + "\"window_count\"" + ":" + value.getField(4).toString() + "}";
return str.getBytes();
}
@Override
public String getTargetTopic(Tuple5 tuple5) {
// use always the default topic
return null;
}
}


}
Loading