diff --git a/README.adoc b/README.adoc
index 39dc73a..747d892 100644
--- a/README.adoc
+++ b/README.adoc
@@ -48,10 +48,6 @@ This access is not required for the workshop labs. You can skip this section.
SSH access is only required if you need to troubleshoot issues or want to poke around your clusters. The procedure to connect via SSH depends on the type of computer you're using:
-==== SSH into the cluster from the Web UI
-
-From the registration link, you can click on the link at the right side to connect to the cluster from a web based SSH client with the credential centos/supersecret1.
-
==== SSH into the cluster from Linux/Macos
From the registration link, download the PEM key required to access to your cluster with SSH. Run the following command:
@@ -87,6 +83,8 @@ Pick your lab and let's get started!
* link:streaming.adoc[From Edge to Streams Processing]
* link:spark_analytics.adoc[Spark and Fast Analytics with Kudu]
+* link:streaming_lite.adoc[From mqtt to Streams Processing]
+* link:streaming_flink.adoc[Streams Processing with Flink]
* link:datascience.adoc[CDSW Experiments and Models]
== Resources
diff --git a/images/FLINk_running_jobs.png b/images/FLINk_running_jobs.png
new file mode 100644
index 0000000..860f801
Binary files /dev/null and b/images/FLINk_running_jobs.png differ
diff --git a/images/Kafka_topic_simulation_sum.png b/images/Kafka_topic_simulation_sum.png
new file mode 100644
index 0000000..2b4cfbf
Binary files /dev/null and b/images/Kafka_topic_simulation_sum.png differ
diff --git a/images/SMM_topic_simulation_sum.png b/images/SMM_topic_simulation_sum.png
new file mode 100644
index 0000000..a3d7475
Binary files /dev/null and b/images/SMM_topic_simulation_sum.png differ
diff --git a/images/YARN_Application.png b/images/YARN_Application.png
new file mode 100644
index 0000000..1dbd947
Binary files /dev/null and b/images/YARN_Application.png differ
diff --git a/images/YARN_Application_details.png b/images/YARN_Application_details.png
new file mode 100644
index 0000000..541faed
Binary files /dev/null and b/images/YARN_Application_details.png differ
diff --git a/images/add_ConsumeMQTT_lite.png b/images/add_ConsumeMQTT_lite.png
new file mode 100644
index 0000000..267b6a6
Binary files /dev/null and b/images/add_ConsumeMQTT_lite.png differ
diff --git a/images/add_updateattribute_lite.png b/images/add_updateattribute_lite.png
new file mode 100644
index 0000000..ea3957f
Binary files /dev/null and b/images/add_updateattribute_lite.png differ
diff --git a/images/additional_controller_services_lite.png b/images/additional_controller_services_lite.png
new file mode 100644
index 0000000..522cf3c
Binary files /dev/null and b/images/additional_controller_services_lite.png differ
diff --git a/images/create_pgroup_lite.png b/images/create_pgroup_lite.png
new file mode 100644
index 0000000..43e42a2
Binary files /dev/null and b/images/create_pgroup_lite.png differ
diff --git a/images/flink_ssl_lite.png b/images/flink_ssl_lite.png
new file mode 100644
index 0000000..fae1d0c
Binary files /dev/null and b/images/flink_ssl_lite.png differ
diff --git a/images/from_kafka_to_kudu_flow_lite.png b/images/from_kafka_to_kudu_flow_lite.png
new file mode 100644
index 0000000..01a1e43
Binary files /dev/null and b/images/from_kafka_to_kudu_flow_lite.png differ
diff --git a/images/iot_streamingFlinkDataflowCount.png b/images/iot_streamingFlinkDataflowCount.png
new file mode 100644
index 0000000..c184a79
Binary files /dev/null and b/images/iot_streamingFlinkDataflowCount.png differ
diff --git a/images/iot_streamingFlinkDataflowFilter.png b/images/iot_streamingFlinkDataflowFilter.png
new file mode 100644
index 0000000..1000cb5
Binary files /dev/null and b/images/iot_streamingFlinkDataflowFilter.png differ
diff --git a/images/kafka_success_lite.png b/images/kafka_success_lite.png
new file mode 100644
index 0000000..89a3a21
Binary files /dev/null and b/images/kafka_success_lite.png differ
diff --git a/images/kudu_success_lite.png b/images/kudu_success_lite.png
new file mode 100644
index 0000000..d5bbfbd
Binary files /dev/null and b/images/kudu_success_lite.png differ
diff --git a/images/publishKafka_flow_lite.png b/images/publishKafka_flow_lite.png
new file mode 100644
index 0000000..0c76ccf
Binary files /dev/null and b/images/publishKafka_flow_lite.png differ
diff --git a/images/table_select_lite.png b/images/table_select_lite.png
new file mode 100644
index 0000000..b8d5ca6
Binary files /dev/null and b/images/table_select_lite.png differ
diff --git a/sample_app_flink/pom.xml b/sample_app_flink/pom.xml
new file mode 100644
index 0000000..301ab0b
--- /dev/null
+++ b/sample_app_flink/pom.xml
@@ -0,0 +1,159 @@
+
+
+ 4.0.0
+
+ org.github.zBrainiac.edge2ailab
+ edge2ailab
+ 0.2.1
+
+ 1.8
+ 1.8
+ 2.12
+ 1.9.0
+ 2.3.0
+ UTF-8
+ UTF-8
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.11.0
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.26
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+
+ com.101tec
+ zkclient
+ 0.11
+
+
+
+
+ com.fasterxml.jackson.module
+ jackson-module-scala_${scala.version}
+ 2.9.9
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.version}
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.version}
+ ${flink.version}
+
+
+
+
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+
+
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.8
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+ true
+ jar-with-dependencies
+
+
+ *:*
+
+
+
+
+ reference.conf
+
+
+
+ FlinkConsumer.iotConsumer
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/sample_app_flink/src/main/java/FlinkConsumer/iotConsumer.java b/sample_app_flink/src/main/java/FlinkConsumer/iotConsumer.java
new file mode 100644
index 0000000..b32d38a
--- /dev/null
+++ b/sample_app_flink/src/main/java/FlinkConsumer/iotConsumer.java
@@ -0,0 +1,115 @@
+package FlinkConsumer;
+
+import commons.Commons;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.Properties;
+
+/**
+ * @author Marcel Daeppen
+ * @version 2019/09/16 15:08
+ */
+@Slf4j
+public class iotConsumer {
+ public static void main(String[] args) throws Exception {
+
+ // set up the streaming execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, Commons.GROUP_ID_CONFIG);
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+ // get iot stream from kafka - topic "iot"
+ DataStream iotStream = env.addSource(
+ new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties));
+
+ iotStream.print("input message: ");
+
+ // split and sum on 'sensor_id'
+ DataStream> aggStream = iotStream
+ .flatMap(new trxJSONDeserializer())
+ .keyBy(1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
+ .sum(4) ;
+
+ // write the aggregated data stream to a Kafka sink
+ aggStream.addSink(new FlinkKafkaProducer<>(
+ Commons.EXAMPLE_KAFKA_SERVER,
+ "simulation_sum",
+ new serializeSum2String()));
+
+ // execute program
+ JobExecutionResult result = env.execute("myFistFlinkApp");
+ JobID jobId = result.getJobID();
+ System.out.println("jobId=" + jobId);
+ }
+
+ public static class trxJSONDeserializer implements FlatMapFunction> {
+ private transient ObjectMapper jsonParser;
+
+ @Override
+ public void flatMap(String value, Collector> out) throws Exception {
+ if (jsonParser == null) {
+ jsonParser = new ObjectMapper();
+ }
+ JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
+
+ // get sensor_ts, sensor_id, sensor_0 AND sensor_1 from JSONObject
+ Long sensor_ts = jsonNode.get("sensor_ts").asLong();
+ Integer sensor_id = jsonNode.get("sensor_id").asInt();
+ Integer sensor_0 = jsonNode.get("sensor_0").asInt();
+ Integer sensor_1 = jsonNode.get("sensor_1").asInt();
+ out.collect(new Tuple5<>(sensor_ts, sensor_id, sensor_0, sensor_1, 1));
+
+ }
+
+ }
+
+
+ private static class serializeSum2String implements KeyedSerializationSchema> {
+ @Override
+ public byte[] serializeKey(Tuple5 element) {
+ return (null);
+ }
+ @Override
+ public byte[] serializeValue(Tuple5 value) {
+
+ String str = "{"
+ + "\"type\"" + ":" +"\"Sum over 10 sec window\""
+ + "," + "\"sensor_ts_start\"" + ":" + value.getField(0).toString()
+ + "," + "\"sensor_id\"" + ":" + value.getField(1).toString()
+ + "," + "\"sensor_0\"" + ":" + value.getField(2).toString()
+ + "," + "\"window_count\"" + ":" + value.getField(4).toString() + "}";
+ return str.getBytes();
+ }
+ @Override
+ public String getTargetTopic(Tuple5 tuple5) {
+ // use always the default topic
+ return null;
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/sample_app_flink/src/main/java/FlinkConsumer/iotConsumerFilter.java b/sample_app_flink/src/main/java/FlinkConsumer/iotConsumerFilter.java
new file mode 100644
index 0000000..d69807d
--- /dev/null
+++ b/sample_app_flink/src/main/java/FlinkConsumer/iotConsumerFilter.java
@@ -0,0 +1,123 @@
+package FlinkConsumer;
+
+
+import commons.Commons;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.Properties;
+
+/**
+ * @author Marcel Daeppen
+ * @version 2020/02/16 11:08
+ */
+@Slf4j
+public class iotConsumerFilter {
+ public static void main(String[] args) throws Exception {
+
+ // set up the streaming execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, Commons.GROUP_ID_CONFIG);
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+ // get iot stream from kafka - topic "iot"
+ DataStream iotStream = env.addSource(
+ new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties));
+
+ iotStream.print("input message: ");
+
+ // split on 'sensor_id' & filter on sensor_0
+ DataStream> aggStream = iotStream
+ .flatMap(new trxJSONDeserializer())
+ .keyBy(1) // sensor_id
+ .sum(4)
+ .filter(new FilterFunction>() {
+ @Override
+ public boolean filter(Tuple5 value) throws Exception {
+ return value.f2 >= 50 ;
+ }
+ });
+
+ // write the aggregated data stream to a Kafka sink
+ aggStream.addSink(new FlinkKafkaProducer<>(
+ Commons.EXAMPLE_KAFKA_SERVER,
+ "iot_filter",
+ new serializeSum2String()));
+
+ // execute program
+ JobExecutionResult result = env.execute("myFlinkAppFilter");
+ JobID jobId = result.getJobID();
+ System.out.println("jobId=" + jobId);
+ }
+
+ public static class trxJSONDeserializer implements FlatMapFunction> {
+ private transient ObjectMapper jsonParser;
+
+ @Override
+ public void flatMap(String value, Collector> out) throws Exception {
+ if (jsonParser == null) {
+ jsonParser = new ObjectMapper();
+ }
+ JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
+
+ // get sensor_ts, sensor_id, sensor_0 AND sensor_1 from JSONObject
+ Long sensor_ts = jsonNode.get("sensor_ts").asLong();
+ Integer sensor_id = jsonNode.get("sensor_id").asInt();
+ Integer sensor_0 = jsonNode.get("sensor_0").asInt();
+ Integer sensor_1 = jsonNode.get("sensor_1").asInt();
+ out.collect(new Tuple5<>(sensor_ts, sensor_id, sensor_0, sensor_1, 1));
+
+ }
+
+ }
+
+
+ private static class serializeSum2String implements KeyedSerializationSchema> {
+ @Override
+ public byte[] serializeKey(Tuple5 element) {
+ return (null);
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple5 value) {
+
+ String str = "{"
+ + "\"type\"" + ":" + "\"alert sensor_0 over 50\""
+ + "," + "\"sensor_ts_start\"" + ":" + value.getField(0).toString()
+ + "," + "\"sensor_id\"" + ":" + value.getField(1).toString()
+ + "," + "\"sensor_0\"" + ":" + value.getField(2).toString() + "}";
+ return str.getBytes();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple5 tuple5) {
+ // use always the default topic
+ return null;
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/sample_app_flink/src/main/java/META-INF/MANIFEST.MF b/sample_app_flink/src/main/java/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..d64b8a4
--- /dev/null
+++ b/sample_app_flink/src/main/java/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+Main-Class: iotConsumer
+
diff --git a/sample_app_flink/src/main/java/commons/Commons.java b/sample_app_flink/src/main/java/commons/Commons.java
new file mode 100644
index 0000000..f85b988
--- /dev/null
+++ b/sample_app_flink/src/main/java/commons/Commons.java
@@ -0,0 +1,11 @@
+package commons;
+
+public class Commons {
+ public final static String EXAMPLE_KAFKA_TOPIC = System.getenv("EXAMPLE_KAFKA_TOPIC") != null ?
+ System.getenv("EXAMPLE_KAFKA_TOPIC") : "iot";
+ public final static String EXAMPLE_KAFKA_SERVER = System.getenv("EXAMPLE_KAFKA_SERVER") != null ?
+ System.getenv("EXAMPLE_KAFKA_SERVER") : "edge2ai-1.dim.local:9092";
+ // System.getenv("EXAMPLE_KAFKA_SERVER") : "localhost:9092";
+ public final static String GROUP_ID_CONFIG = System.getenv("GROUP_ID_CONFIG") != null ?
+ System.getenv("GROUP_ID_CONFIG") : "iot-sensor-consumer-flink";
+}
\ No newline at end of file
diff --git a/sample_app_flink/src/test/java/producer/sensorSimulator.java b/sample_app_flink/src/test/java/producer/sensorSimulator.java
new file mode 100644
index 0000000..110a10d
--- /dev/null
+++ b/sample_app_flink/src/test/java/producer/sensorSimulator.java
@@ -0,0 +1,98 @@
+package producer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import commons.Commons;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Instant;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class sensorSimulator {
+ private static final String BOOTSTRAP_SERVERS = Commons.EXAMPLE_KAFKA_SERVER;
+ private static final String TOPIC = Commons.EXAMPLE_KAFKA_TOPIC;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final Random random = new Random();
+
+ public static long sleeptime;
+
+
+ public static void main(String[] args) throws Exception {
+
+
+ if( args.length > 0 ) {
+ setsleeptime(Long.parseLong(args[0]));
+ System.out.println("sleeptime (ms): " + sleeptime);
+ } else {
+ System.out.println("no sleeptime defined - use default");
+ setsleeptime(1000);
+ System.out.println("default sleeptime (ms): " + sleeptime);
+ }
+
+ Producer producer = createProducer();
+ try {
+ for (int i = 0; i < 1000000; i++) {
+ publishMessage(producer);
+ Thread.sleep(sleeptime);
+ }
+ } finally {
+ producer.close();
+ }
+ }
+
+ private static Producer createProducer() {
+ Properties config = new Properties();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "md");
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ return new KafkaProducer<>(config);
+ }
+
+ private static void publishMessage(Producer producer) throws Exception {
+ String key = UUID.randomUUID().toString();
+
+ ObjectNode messageJsonObject = JsonOnject();
+ byte[] valueJson = objectMapper.writeValueAsBytes(messageJsonObject);
+
+ ProducerRecord record = new ProducerRecord<>(TOPIC, key, valueJson);
+
+ RecordMetadata md = producer.send(record).get();
+ System.out.println("Published " + md.topic() + "/" + md.partition() + "/" + md.offset()
+ + " (key=" + key + ") : " + messageJsonObject);
+ }
+
+ // build random json object
+ private static ObjectNode JsonOnject() {
+
+ int i= random.nextInt(5);
+
+ ObjectNode report = objectMapper.createObjectNode();
+ report.put("sensor_ts", Instant.now().toEpochMilli());
+ report.put("sensor_id", (random.nextInt(11)));
+ report.put("sensor_0", (random.nextInt(99)));
+ report.put("sensor_1", (random.nextInt(99)));
+ report.put("sensor_2", (random.nextInt(99)));
+ report.put("sensor_3", (random.nextInt(99)));
+ report.put("sensor_4", (random.nextInt(99)));
+ report.put("sensor_5", (random.nextInt(99)));
+ report.put("sensor_6", (random.nextInt(99)));
+ report.put("sensor_7", (random.nextInt(99)));
+ report.put("sensor_8", (random.nextInt(99)));
+ report.put("sensor_9", (random.nextInt(99)));
+ report.put("sensor_10", (random.nextInt(99)));
+ report.put("sensor_11", (random.nextInt(99)));
+
+ return report;
+ }
+
+ public static void setsleeptime(long sleeptime) {
+ sensorSimulator.sleeptime = sleeptime;
+ }
+
+}
diff --git a/streaming_flink.adoc b/streaming_flink.adoc
new file mode 100644
index 0000000..3698fe7
--- /dev/null
+++ b/streaming_flink.adoc
@@ -0,0 +1,144 @@
+= Streams Processing with Apache Flink
+
+In this workshop is build on top of the "edge2AI Workshop" (pre-condition) and add enhanced features in stream processing.
+
+== Labs summary
+
+* *Lab 1* - Count by sensor_id
+* *Lab 2* - Filtering on sensor_0 value
+
+
+[[lab_1, Lab 1]]
+== Lab 1 - Apache Flink - Count by sensor_id
+
+. Let's use the #iot stream form the sensors from the previous lab
++
+.. *Dataflow:*
++
+image::images/iot_streamingFlinkDataflowCount.png[width=800]
++
+.. Open two SSH connections to your environment
++
+image::images/flink_ssl_lite.png[width=800]
++
+let's have look at the code:
++
+Local Execution Environment
++
+[source,java]
+----
+// get iot stream from kafka - topic "iot"
+ DataStream iotStream = env.addSource(
+ new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties));
+----
++
+Collection Data Sources
++
+[source,java]
+----
+// split and sum on 'sensor_id'
+ DataStream> aggStream = iotStream
+ .flatMap(new trxJSONDeserializer())
+ .keyBy(1)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
+ .sum(4) ;
+----
++
+Iterator Data Sink
++
+----
+// write the aggregated data stream to a Kafka sink
+ aggStream.addSink(new FlinkKafkaProducer<>(
+ Commons.EXAMPLE_KAFKA_SERVER,
+ "simulation_sum",
+ new serializeSum2String()));
+----
++
+. Let's run the application
+
++
+use the first SSH connection to run the Flink application
++
+[source,shell]
+----
+$ cd /opt/cloudera/parcels/FLINK
+$ sudo wget https://github.com/zBrainiac/edge2ailab/releases/download/0.2.1/edge2ailab-0.2.1-jar-with-dependencies.jar -P /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming
+$ ./bin/flink run -m yarn-cluster -c FlinkConsumer.iotConsumer -ynm myFirstFlinkApp lib/flink/examples/streaming/edge2ailab-0.2.1-jar-with-dependencies.jar
+----
++
+. Let's see how the application works
++
+.. use the second SSH connection to see the result
++
+[source,shell]
+----
+$ cd /opt/cloudera/parcels/CDH
+$ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic simulation_sum
+----
++
+SSH connection
++
+image::images/Kafka_topic_simulation_sum.png[width=800]
++
+.. SMM view:
++
+image::images/SMM_topic_simulation_sum.png[width=800]
++
+.. YARN & FLINK UI view:
++
+goto Cloudera Manager > YARN > Applications
++
+image::images/YARN_Application.png[width=800]
++
+see details of the running Flink job and use the
++
+image::images/YARN_Application_details.png[width=800]
++
+Flink UI provide more details and monitoring of the job
++
+image::images/FLINk_running_jobs.png[width=800]
+
+
+[[lab_2, Lab 2]]
+== Lab 2 - Filtering on sensor_0 value
+. Let’s use the #iot stream form the sensors from the previous lab
++
+.. *Dataflow:*
++
+image::images/iot_streamingFlinkDataflowFilter.png[width=800]
+Collection Data Sources
++
+[source,java]
+----
+// split on 'sensor_id' & filter on sensor_0
+ DataStream> aggStream = iotStream
+ .flatMap(new trxJSONDeserializer())
+ .keyBy(1) // sensor_id
+ .sum(4)
+ .filter(new FilterFunction>() {
+ @Override
+ public boolean filter(Tuple5 value) throws Exception {
+ return value.f2 >= 50 ; // sensor_0
+ }
+ });
+----
++
+. Let's run the application
++
+use the new SSH connection to run the Flink application
++
+[source,shell]
+----
+$ cd /opt/cloudera/parcels/FLINK
+$ ./bin/flink run -m yarn-cluster -c FlinkConsumer.iotConsumerFilter -ynm myFlinkAppFilter lib/flink/examples/streaming/edge2ailab-0.2.1-jar-with-dependencies.jar
+----
++
+. Let's see how the application works
++
+.. use the second SSH connection to see the result
++
+[source,shell]
+----
+$ cd /opt/cloudera/parcels/CDH
+$ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic iot_filter
+----
\ No newline at end of file
diff --git a/streaming_lite.adoc b/streaming_lite.adoc
new file mode 100644
index 0000000..2f9a7c7
--- /dev/null
+++ b/streaming_lite.adoc
@@ -0,0 +1,469 @@
+= From Edge to Streams Processing
+
+In this workshop you'll implement a data pipeline, using NiFi to ingest data from an IoT device into Kafka and then consume data from Kafka and write it to Kudu tables.
+
+== Labs summary
+
+* *Lab 1* - On the Apache NiFi, run a simulator to send IoT sensors data to the MQTT broker.
+* *Lab 2* - On Schema Registry, register the schema describing the data generated by the IoT sensors.
+* *Lab 3* - On the NiFi cluster, prepare the data and send it to the *Kafka* cluster.
+* *Lab 4* - On the *Streams Messaging Manager (SMM)* Web UI, monitor the Kafka cluster and confirm data is being ingested correctly.
+* *Lab 5* - Use NiFi to consume each record from *Kafka* and save results to *Kudu*.
+* *Lab 6* - Check the data on Kudu.
+* *Lab 7* - Apache Flink- your friend for streaming use cases
+
+[[lab_1, Lab 1]]
+== Lab 1 - Apache NiFi: setup machine sensors simulator
+
+In this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (link:https://mosquitto.org/[mosquitto]). The gateway host is connected to many and different type of sensors, but they generally all share the same transport protocol, "mqtt".
+
+. Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas.
++
+image::images/simulate1.png[width=800]
+
+. Right-click the processor, select *Configure* (or, alternatively, just double-click the processor). On the *PROPERTIES* tab, set the properties shown below to run our Python simulate script.
++
+[source]
+----
+Command: python
+Command Arguments: /opt/demo/simulate.py
+----
++
+image::images/simulate2.png[width=500]
+
+. In the *SCHEDULING* tab, set to *Run Schedule: 1 sec*
++
+Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc...
++
+image::images/runSimulator1or30.png[width=500]
+
+. In the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*.
++
+image::images/nifiTerminateRelationships.png[width=600]
+
+. You can then right-click to *Start* this simulator runner.
++
+image::images/nifiDemoStart.png[width=400]
+
+. Right-click and select *Stop* after a few seconds and look at the *provenance*. You'll see that it has run a number of times and produced results.
++
+image::images/NiFiViewDataProvenance.png[width=400]
++
+image::images/NiFiDataProvenance.png[width=800]
+
+
+
+[[lab_2, Lab 2]]
+== Lab 2 - Registering our schema in Schema Registry
+
+The data produced by the temperature sensors is described by the schema in file `link:https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc[sensor.avsc]`. In this lab we will register this schema in Schema Registry so that our flows in NiFi can refer to schema using an unified service. This will also allow us to evolve the schema in the future, if needed, keeping older versions under version control, so that existing flows and flowfiles will continue to work.
+
+. Go the following URL, which contains the schema definition we'll use for this lab. Select all contents of the page and copy it.
++
+`link:https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc[https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc, window="_blank"]`
+
+. In the Schema Registry Web UI, click the `+` sign to register a new schema.
+
+. Click on a blank area in the *Schema Text* field and paste the contents you copied.
+
+. Complete the schema creation by filling the following properties:
++
+[source]
+----
+Name: SensorReading
+Description: Schema for the data generated by the IoT sensors
+Type: Avro schema provider
+Schema Group: Kafka
+Compatibility: Backward
+Evolve: checked
+----
++
+image::images/register_schema.png[width=800]
+
+. Save the schema
+
+[[lab_3, Lab 3]]
+== Lab 3 - Configuring the NiFi flow and pushing data to Kafka
+
+In this lab, you will create a NiFi flow to receive the data from mqtt gateways and push it to **Kafka**.
+
+=== Creating a Process Group
+
+Before we start building our flow, let's create a Process Group to help organizing the flows in the NiFi canvas and also to enable flow version control.
+
+. Open the NiFi Web UI, create a new Process Group and name it something like *Process Sensor Data*.
++
+image::images/create_pgroup_lite.png[width=800]
+
+. We want to be able to version control the flows we will add to the Process Group. In order to do that, we first need to connect NiFi to the *NiFi Registry*. On the NiFi global menu, click on "Controller Settings", navigate to the "Registry Clients" tab and add a Registry client with the following URL:
++
+----
+Name: NiFi Registry
+URL: http://edge2ai-1.dim.local:18080
+----
++
+image::images/global_controller_settings.png[width=800]
++
+image::images/add_registry_client.png[width=800]
+
+. On the *NiFi Registry* Web UI, add another bucket for storing the Sensor flow we're about to build'. Call it `SensorFlows`:
++
+image::images/sensor_flows_bucket.png[width=800]
+
+. Back on the *NiFi* Web UI, to enable version control for the Process Group, right-click on it and select *Version > Start version control* and enter the details below. Once you complete, a image:images/version_control_tick.png[width=20] will appear on the Process Group, indicating that version control is now enabled for it.
++
+[source]
+----
+Registry: NiFi Registry
+Bucket: SensorFlows
+Flow Name: SensorProcessGroup
+----
+
+. Let's also enable processors in this Process Group to use schemas stored in Schema Registry. Right-click on the Process Group, select *Configure* and navigate to the *Controller Services* tab. Click the *`+`* icon and add a *HortonworksSchemaRegistry* service. After the service is added, click on the service's _cog_ icon (image:images/cog_icon.png[width=20]), go to the *Properties* tab and configure it with the following *Schema Registry URL* and click *Apply*.
++
+[source]
+----
+URL: http://edge2ai-1.dim.local:7788/api/v1
+----
++
+image::images/added_hwx_sr_service.png[width=800]
+
+. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *HortonworksSchemaRegistry* Controller Service.
+
+. Still on the *Controller Services* screen, let's add two additional services to handle the reading and writing of JSON records. Click on the image:images/plus_button.png[width=25] button and add the following two services:
+** *`JsonTreeReader`*, with the following properties:
++
+[source]
+----
+Schema Access Strategy: Use 'Schema Name' Property
+Schema Registry: HortonworksSchemaRegistry
+Schema Name: ${schema.name} -> already set by default!
+----
+
+** *`JsonRecordSetWriter`*, with the following properties:
++
+[source]
+----
+Schema Write Strategy: HWX Schema Reference Attributes
+Schema Access Strategy: Inherit Record Schema
+Schema Registry: HortonworksSchemaRegistry
+----
+
+. Enable the *JsonTreeReader* and the *JsonRecordSetWriter* Controller Services you just created, by clicking on their respective _lightning bolt_ icons (image:images/enable_icon.png[width=20]).
++
+image::images/controller_services.png[width=800]
+
+=== Creating the flow
+
+. Double-click on the newly created process group to expand it.
+
+. Inside the process group, add a new _ConsumeMQTT_ processor.
++
+image::images/add_ConsumeMQTT_lite.png[width=800]
+
++
+*PROPERTIES* tab:
++
+[source]
+----
+Broker URI: tcp://edge2ai-1.dim.local:1883
+Client ID: sensor-iot
+Topic Filter: iot/#
+Max Queue Size: 60
+----
+
+. We need to tell NiFi which schema should be used to read and write the Sensor data. For this we'll use an _UpdateAttribute_ processor to add an attribute to the FlowFile indicating the schema name.
++
+Add an _UpdateAttribute_ processor by dragging the processor icon to the canvas:
++
+image::images/add_updateattribute_lite.png[width=800]
+
+. Double-click the _UpdateAttribute_ processor and configure it as follows:
+.. In the _SETTINGS_ tab:
++
+[source]
+----
+Name: Set Schema Name
+----
+.. In the _PROPERTIES_ tab:
+** Click on the image:images/plus_button.png[width=25] button and add the following property:
++
+[source]
+----
+Property Name: schema.name
+Property Value: SensorReading
+----
+.. Click *Apply*
+
+. Connect the *Consume mqtt* input port to the *Set Schema Name* processor.
+
+. Add a _PublishKafkaRecord_2.0_ processor and configure it as follows:
++
+*SETTINGS* tab:
++
+[source]
+----
+Name: Publish to Kafka topic: iot
+----
++
+*PROPERTIES* tab:
++
+[source]
+----
+Kafka Brokers: edge2ai-1.dim.local:9092
+Topic Name: iot
+Record Reader: JsonTreeReader
+Record Writer: JsonRecordSetWriter
+Use Transactions: false
+Attributes to Send as Headers (Regex): schema.*
+----
++
+NOTE: Make sure you use the PublishKafkaRecord_2.0 processor and *not* the PublishKafka_2.0 one
+
+. While still in the _PROPERTIES_ tab of the _PublishKafkaRecord_2.0_ processor, click on the image:images/plus_button.png[width=25] button and add the following property:
++
+[source]
+----
+Property Name: client.id
+Property Value: nifi-sensor-data
+----
++
+Later, this will help us clearly identify who is producing data into the Kafka topic.
+
+. Connect the *Set Schema Name* processor to the *Publish to Kafka topic: iot* processor.
+
+. Add a new _Funnel_ to the canvas and connect the PublishKafkaRecord processor to it. When the "Create connection" dialog appears, select "*failure*" and click *Add*.
++
+image::images/add_kafka_failure_connection.png[width=600]
+
+. Double-click on the *Publish to Kafka topic: iot* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*.
++
+image::images/terminate_publishkafka_relationship.png[width=600]
+
+. Start all three processors. Your canvas should now look like the one below:
++
+image::images/publishKafka_flow_lite.png[width=800]
+
+
+. Refresh the screen (`Ctrl+R` on Linux/Windows; `Cmd+R` on Mac) and you should see that the records were processed by the _PublishKafkaRecord_ processor and there should be no records queued on the "failure" output queue.
++
+image::images/kafka_success_lite.png[width=800]
++
+At this point, the messages are already in the Kafka topic. You can add more processors as needed to process, split, duplicate or re-route your FlowFiles to all other destinations and processors.
+
+. To complete this Lab, let's commit and version the work we've just done. Go back to the NiFi root canvas, clicking on the "Nifi Flow" breadcrumb. Right-click on the *Process Sensor Data* Process Group and select *Version > Commit local changes*. Enter a descriptive comment and save.
+
+[[lab_4, Lab 4]]
+== Lab 4 - Use SMM to confirm that the data is flowing correctly
+
+Now that our NiFi flow is pushing data to Kafka, it would be good to have a confirmation that everything is running as expected. In this lab you will use Streams Messaging Manager (SMM) to check and monitor Kafka.
+
+. Start the *NiFi ExecuteProcess* simulator again and confirm you can see the messages queued in NiFi. Leave it running.
+
+. Go to the Stream Messaging Manager (SMM) Web UI and familiarize yourself with the options there. Notice the filters (blue boxes) at the top of the screen.
++
+image::images/smm.png[width=800]
+
+. Click on the *Producers* filter and select only the *`nifi-sensor-data`* producer. This will hide all the irrelevant topics and show only the ones that producer is writing to.
+
+. If you filter by *Topic* instead and select the `iot` topic, you'll be able to see all the *producers* and *consumers* that are writing to and reading from it, respectively. Since we haven't implemented any consumers yet, the consumer list should be empty.
+
+. Click on the topic to explore its details. You can see more details, metrics and the break down per partition. Click on one of the partitions and you'll see additional information and which producers and consumers interact with that partition.
++
+image::images/producers.png[width=800]
+
+. Click on the *EXPLORE* link to visualize the data in a particular partition. Confirm that there's data in the Kafka topic and it looks like the JSON produced by the sensor simulator.
++
+image::images/explore_partition.png[width=800]
+
+. Check the data from the partition. You'll notice something odd. These are readings from temperature sensors and we don't expect any of the sensors to measure temperatures greater than 150 degrees in the conditions they are used. It seems, though, that `sensor_0` and `sensor_1` are intermittently producing noise and some of the measurements have very high values for these measurements.
++
+image::images/troubled_sensors.png[width=800]
+
+. Stop the *NiFi ExecuteProcess* simulator again.
+
+. In the next Lab we'll eliminate with these problematic measurements to avoid problems later in our data flow.
+
+[[lab_5, Lab 5]]
+== Lab 5 - Use NiFi to consume each record from *Kafka* and save results to *Kudu*.
+
+In this lab, you will use NiFi to consume the Kafka messages containing the IoT data we ingested in the previous lab.
+
+=== Add new Controller Services
+
+When the sensor data was sent to Kafka using the _PublishKafkaRecord_ processor, we chose to attach the schema information in the header of Kafka messages. Now, instead of hard-coding which schema we should use to read the message, we can leverage that metadata to dynamically load the correct schema for each message.
+
+To do this, though, we need to configure a different _JsonTreeReader_ that will use the schema properties in the header, instead of the `${schema.name}` attribute, as we did before.
+
+
+. If you're not in the *Process Sensor Data* process group, double-click on it to expand it. On the *Operate* panel (left-hand side), click on the _cog_ icon (image:images/cog_icon.png[width=25]) to access the *Process Sensor Data* process group's configuration page.
++
+image::images/operate_panel_cog.png[width=300]
+
+. Click on the _plus_ button (image:images/plus_button.png[width=25]), add a new *JsonTreeReader*, configure it as shown below and click *Apply* when you're done:
++
+On the *SETTINGS* tab:
++
+[source]
+----
+Name: JsonTreeReader - With schema identifier
+----
++
+On the *PROPERTIES* tab:
++
+[source]
+----
+Schema Access Strategy: HWX Schema Reference Attributes
+Schema Registry: HortonworksSchemaRegistry
+----
+
+. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *JsonTreeReader - With schema identifier* controller service.
+
+
++
+image::images/additional_controller_services_lite.png[width=800]
+
+. Close the *Process Sensor Data Configuration* page.
+
+=== Create the flow
+
+We'll now create the flow to read the sensor data from Kafka, execute a model prediction for each of them and write the results to Kudu. At the end of this section you flow should look like the one below:
+
+image::images/from_kafka_to_kudu_flow_lite.png[width=800]
+
+==== ConsumeKafkaRecord_2_0 processor
+
+. We'll add a new flow to the same canvas we were using before (inside the *Process Sensor Data* Process Group). Click on an empty area of the canvas and drag it to the side to give you more space to add new processors.
+
+. Add a *ConsumeKafkaRecord_2_0* processor to the canvas and configure it as shown below:
++
+*SETTINGS* tab:
++
+[source]
+----
+Name: Consume Kafka iot messages
+----
++
+*PROPERTIES* tab:
++
+[source]
+----
+Kafka Brokers: edge2ai-1.dim.local:9092
+Topic Name(s): iot
+Topic Name Format: names
+Record Reader: JsonTreeReader - With schema identifier
+Record Writer: JsonRecordSetWriter
+Honor Transactions: false
+Group ID: iot-sensor-consumer
+Offset Reset: latest
+Headers to Add as Attributes (Regex): schema.*
+----
+
+. Reuse existing _Funnel_ to the canvas and connect the *Consume Kafka iot messages* to it. When prompted, check the *parse.failure* relationship for this connection:
++
+image:images/parse_failure_relationship.png[width=500]
+
+==== PutKudu processor
+
+. Add a *PutKudu* processor to the canvas and configure it as shown below:
++
+*SETTINGS* tab:
++
+[source]
+----
+Name: Write to Kudu
+----
++
+*PROPERTIES* tab:
++
+[source]
+----
+Kudu Masters: edge2ai-1.dim.local:7051
+Table Name: impala::default.sensors
+Record Reader: JsonTreeReader - With schema identifier
+----
+
+. Connect the *Consume Kafka iot message* processor to the *Write to Kudu* one. When prompted, check the *success* relationship for this connection.
+
+. Connect the *Write to Kudu* to the same _Funnel_ you had created above. When prompted, check the *failure* relationship for this connection.
+
+. Double-click on the *Write to Kudu* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*.
+
+==== Create the Kudu table
+
+NOTE: If you already created this table in a previous workshop, please skip the table creation here.
+
+. Go to the Hue Web UI and login. The first user to login to a Hue installation is automatically created and granted admin privileges in Hue.
+
+. The Hue UI should open with the Impala Query Editor by default. If it doesn't, you can always find it by clicking on *Query button > Editor -> Impala*:
++
+image::images/impala_editor.png[width=800]
+
+. First, create the Kudu table. Login into Hue, and in the Impala Query, run this statement:
++
+[source,sql]
+----
+CREATE TABLE sensors
+(
+ sensor_id INT,
+ sensor_ts TIMESTAMP,
+ sensor_0 DOUBLE,
+ sensor_1 DOUBLE,
+ sensor_2 DOUBLE,
+ sensor_3 DOUBLE,
+ sensor_4 DOUBLE,
+ sensor_5 DOUBLE,
+ sensor_6 DOUBLE,
+ sensor_7 DOUBLE,
+ sensor_8 DOUBLE,
+ sensor_9 DOUBLE,
+ sensor_10 DOUBLE,
+ sensor_11 DOUBLE,
+ is_healthy INT,
+ PRIMARY KEY (sensor_ID, sensor_ts)
+)
+PARTITION BY HASH PARTITIONS 16
+STORED AS KUDU
+TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
+----
++
+image::images/create_table.png[width=800]
+
+==== Running the flow
+
+We're ready now to run and test our flow. Follow the steps below:
+
+. Start all the processors in your flow.
+
+. Refresh your NiFi page and you should see messages passing through your flow. The failure queues should have no records queued up.
++
+image::images/kudu_success_lite.png[width=800]
+
+[[lab_6, Lab 6]]
+== Lab 6 - Check the data on Kudu
+
+In this lab, you will run some SQL queries using the Impala engine and verify that the Kudu table is being updated as expected.
+
+. Login into Hue and run the following queries in the Impala Query Editor:
++
+[source,sql]
+----
+SELECT count(*)
+FROM sensors;
+----
++
+[source,sql]
+----
+SELECT *
+FROM sensors
+ORDER by sensor_ts DESC
+LIMIT 100;
+----
++
+. Run the queries a few times \and verify that the number of sensor readings are increasing as the data is ingested into the Kudu table. This allows you to build real-time reports for fast action.
++
+image::images/table_select_lite.png[width=800]
+
+[[lab_7, Lab 7]]
+== Lab 7 - Apache Flink- your friend for streaming use cases
+* click here for the next Lab: link:streaming_flink.adoc[Streams Processing with Flink]