diff --git a/Multi-cloud/DATA/photo1.txt b/Multi-cloud/DATA/photo1.txt new file mode 100644 index 0000000..047e2e7 --- /dev/null +++ b/Multi-cloud/DATA/photo1.txt @@ -0,0 +1 @@ +apple aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo10.txt b/Multi-cloud/DATA/photo10.txt new file mode 100644 index 0000000..2a03a3a --- /dev/null +++ b/Multi-cloud/DATA/photo10.txt @@ -0,0 +1,2 @@ +duck aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + \ No newline at end of file diff --git a/Multi-cloud/DATA/photo11.txt b/Multi-cloud/DATA/photo11.txt new file mode 100644 index 0000000..18f65df --- /dev/null +++ b/Multi-cloud/DATA/photo11.txt @@ -0,0 +1 @@ +dove aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo12.txt b/Multi-cloud/DATA/photo12.txt new file mode 100644 index 0000000..013db28 --- /dev/null +++ b/Multi-cloud/DATA/photo12.txt @@ -0,0 +1 @@ +mango aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo2.txt b/Multi-cloud/DATA/photo2.txt new file mode 100644 index 0000000..7ce8469 --- /dev/null +++ b/Multi-cloud/DATA/photo2.txt @@ -0,0 +1 @@ +zebra aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo3.txt b/Multi-cloud/DATA/photo3.txt new file mode 100644 index 0000000..1c86d11 --- /dev/null +++ b/Multi-cloud/DATA/photo3.txt @@ -0,0 +1 @@ +parrot aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo4.txt b/Multi-cloud/DATA/photo4.txt new file mode 100644 index 0000000..434ce9f --- /dev/null +++ b/Multi-cloud/DATA/photo4.txt @@ -0,0 +1 @@ +orange aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo5.txt b/Multi-cloud/DATA/photo5.txt new file mode 100644 index 0000000..8fda6d2 --- /dev/null +++ b/Multi-cloud/DATA/photo5.txt @@ -0,0 +1 @@ +pear aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo6.txt b/Multi-cloud/DATA/photo6.txt new file mode 100644 index 0000000..38b8140 --- /dev/null +++ b/Multi-cloud/DATA/photo6.txt @@ -0,0 +1 @@ +lion aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo7.txt b/Multi-cloud/DATA/photo7.txt new file mode 100644 index 0000000..455646c --- /dev/null +++ b/Multi-cloud/DATA/photo7.txt @@ -0,0 +1 @@ +cat aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo8.txt b/Multi-cloud/DATA/photo8.txt new file mode 100644 index 0000000..079b9a8 --- /dev/null +++ b/Multi-cloud/DATA/photo8.txt @@ -0,0 +1 @@ +dog aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/Multi-cloud/DATA/photo9.txt b/Multi-cloud/DATA/photo9.txt new file mode 100644 index 0000000..fce431a --- /dev/null +++ b/Multi-cloud/DATA/photo9.txt @@ -0,0 +1 @@ +banana aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \ No newline at end of file diff --git a/Multi-cloud/Multi-CloudStreaming.md b/Multi-cloud/Multi-CloudStreaming.md new file mode 100644 index 0000000..999bbe6 --- /dev/null +++ b/Multi-cloud/Multi-CloudStreaming.md @@ -0,0 +1,176 @@ +# Multi-Cloud Streaming + +This Project contains demos that shows how to use SODA Foundations multi-cloud project for streaming usecases. The Apache Kafka project is used as the Stream Processing platform for these use demos. + +Two cases are identified for the purpose of demonstrating multi-cloud features for Streaming. We can connect the Stream Processor in two ways to multi-cloud as described below. + +## Case 1: Connecting Stream Processor after Multi-Cloud + +In this case Stream Processor get its input from Multi-cloud, where this inputs are objects stored in cloud and/or on-prem. These objects can be previously stored Stream Processor output, or any objects that are stored and managed by multi-cloud. + +This demo shows how to get data/files from the cloud using multi-cloud, and do stream processing like analysis & categorization and generate processed output. In this simplified usecase, the Stream processor download a list of photo files, which are labelled with names of animals, birds and fruits. Stream processor will identify these from labels and categorize them to Animals, Birds, Fruits and Unknown and outputs, the count of these categories. + + ### Steps to setup this demo: + - Deploy multi-cloud v0.10.0 with Dashboard and upload input data + - Configure and Start Kafka streaming + - Start Kafka console client for output checking + - Start Kafka demo application that process on input stream data + - Start SODA multi-cloud services + - Start GO application that uses multi-cloud client to download data for iinput to stream processor + + ### Detailed steps + - Deploy SODA Multi-Cloud and Dashboard Project & upload input data +Refer document https://github.com/sodafoundation/opensds/wiki/OpenSDS-Cluster-Installation-through-Ansible0 + + ```bash + git clone https://github.com/sodafoundation/opensds-installer.git + cd opensds-installer + git checkout v0.10.0 + + # change "opensds-installer/ansible/group_vars/common.yml + host_ip: 192.168.20.128 + deploy_project: gelato + + # Deploy multi-cloud and keystone and Dashboard + cd ansible + chmod +x ./install_ansible.sh && ./install_ansible.sh + ansible-playbook site.yml -i local.hosts + ``` + + Use Dashboard GUI to create an AKSK, add backend, create bucket (`bkt001`) and upload input files for demo + + - Start Kafka, Zookeeper, and create topics + ```bash + git clone https://github.com/sodafoundation/demos + cd ./demos/mc-demo/streams-processor + + ./setup.sh # Download and Start Kafka, Create kafka topics + cd kafka_2.12-2.4.1/ + bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic multicloud-input + bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic multicloud-input2 + bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic multicloud-output + bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic multicloud-output2 + ``` + + - Using SODA Dashboard upload input data files in folder ./mc-demo/DATA + ```bash + # DATA folder contains 12 files with photo*.txt to be uploaded + ``` + + - Build and start JAVA application that do the Stream processing + ```bash + cd ./mc-demo/streams-processor + mvn clean package + mvn exec:java -Dexec.mainClass=myapps.DemoMultiCloudInput + ``` + + - Start Kafka console client for checking the generated output + ```bash + # Open a new terminal + cd ./mc-demo/kafka_2.12-2.4.1 + bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic multicloud-output + + ``` + + - Start GO app with multi-cloud client + ```bash + # Untar multi-cloud branch with Client support + # Open Another terminal, and + + mkdir -p /src/github.com/opensds + tar xzvf multi-cloud.tar.gz -C /src/github.com/opensds + cd /src/github.com/opensds/multi-cloud + make all + + #Get AKSK generated from OpenSDS Dashboard UI + export OS_ACCESS_KEY=5gWcxQ9HfzJ1xkvmQMz6 + + # Export environment variables + export MULTI_CLOUD_IP=192.168.20.158 + export MICRO_SERVER_ADDRESS=:8089 + export OS_AUTH_AUTHSTRATEGY=keystone + + export OPENSDS_ENDPOINT=http://192.168.20.158:50040 + export OPENSDS_AUTH_STRATEGY=keystone + export OS_AUTH_URL=http://192.168.20.158/identity + export OS_USERNAME=admin + export OS_PASSWORD=opensds@123 + export OS_TENANT_NAME=admin + export OS_PROJECT_NAME=admin + export OS_USER_DOMIN_ID=default + + export KAFKA_URL=localhost:9092 + export KAFKA_TOPIC=streams-wordcount-processor-output + export KAFKA_GROUP_ID=TestGroupID + + # Run mc-demo project + cd /path/to/mc-demo + go run input-demo.go gelato_client.go + + ``` + + + - Check the Java APP output and output topic + ```bash + # Check JAVA app output & output topic + ``` + + ### Case 1 Demo Video + [Demo1.mp4](https://drive.google.com/open?id=1J1pNPLuyxi9oIj9YzzD-WRkJfdBrwbMh) + + +## Case 2: Connecting Stream processing in front of Multi-Cloud +In this case we convert stream output from the Stream processor and create an object that multicloud can work on. We use Apache Kafka as Stream Processing platform. + +Scenario for this kind of use-case is, when we need to store a report generated by a streaming framework into multiple cloud back ends for later consumption. We use multi-cloud services of Data Mover or Migration to store these objects in the Multi-cloud backend. + +This demo shows a stream processing application which counts words on the console input, and generate a word count table. A go application will use this table to generate a file and store the file object to a cloud storage and apply different policies on the uploaded file using multi-cloud project. + +### Steps to setup this demo: + - Deploy multi-cloud v0.10.0 with Dashboard + - Configure and Start Kafka streaming + - Start Kafka console producer for input generation + - Start Kafka demo application that process on input stream data + - Start SODA multi-cloud services + - Start GO application that uses multi-cloud client to upload output of stream processor + + ### Detailed steps + - Deploy multi-cloud with Dashboard + ```bash + # Similar to Demo1, but no need to updload input files. + ``` + - Configure and Start kafka, and create topics streaming + ```bash + # Similar to Demo1 + ``` + - Start Kafka console producer for input capturing for processing + ```bash + #Start in a new terminal window + bin/kafka-console-producer.sh --broker-list localhost:9092 --topic multicloud-input2 + + >this is test + >another test + ``` + + - Start Kafka demo application that process on input stream data + ```bash + cd ./mc-demo/streams-processor + mvn clean package + mvn exec:java -Dexec.mainClass=myapps.DemoMultiCloudOutput + ``` + + - Start GO application that uses multi-cloud client + ```bash + cd ./mc-demo + go run input-demo.go gelato_client.go + + ``` + - Verify generated output file from Dashboard for the count output + ```bash + # Dashboard will contain a file out.txt, with output + ``` + + ### Case 2 Dem2 Video + [Demo2.mp4](https://drive.google.com/open?id=1l01C20C4eQypNw-rKeN8CEk68GusII1P) + + diff --git a/Multi-cloud/README.md b/Multi-cloud/README.md new file mode 100644 index 0000000..51c1d13 --- /dev/null +++ b/Multi-cloud/README.md @@ -0,0 +1,13 @@ +# Demos + +This is demo of SODA Foundations multi-cloud using streaming + + + - DATA: Input data files for the Case 1 demo. This needs to be uploaded to Cloud backend using SODA Foundation Dashboard. + - multi-cloud-client: Multi-Cloud Client with dependant project. This client is used to interact with deployed Multi-Cloud instance + - streams-processor: JAVA application that uses Kafka Stream processing engine + - input_demo.go: Go application that implement Case 1 demo + - output_demo.go: Go application that implement Case 2 demo + - Demo1.mp4: Video explaining the Case 1 demo + - Demo2.mp4: Video explaining the Case 1 demo + - Multi-CloudStreaming.md: Document explaining Multi-cloud Streaming usecase and demos diff --git a/Multi-cloud/gelato_client.go b/Multi-cloud/gelato_client.go new file mode 100644 index 0000000..ba1b4ea --- /dev/null +++ b/Multi-cloud/gelato_client.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + "os" + + "github.com/opensds/multi-cloud/client" +) + +const ( + fname = "out.txt" +) + +var ( + c *client.Client +) + +func exitErrorf(msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, msg+"\n", args...) + os.Exit(1) +} + +func GelatoUpload(bucket, filename string) { + // + resp, err := c.UploadObject(bucket, filename, filename) + if err != nil { + fmt.Println("Error Upload", err) + } + + fmt.Println("Response:", resp) +} + +func GelatoDownload(bucket, filename string) { + // + err := c.DownloadObject(bucket, filename) + if err != nil { + fmt.Println("Error Download", err) + } +} + +func init () { + + fmt.Println ("Initialize multicloud") + + // Environment variables to be exported + + // export MULTI_CLOUD_IP=192.168.20.158 + // export MICRO_SERVER_ADDRESS=:8089 + // export OS_AUTH_AUTHSTRATEGY=keystone + // export OS_ACCESS_KEY=ZNRJARg7wkfm9wxzuIeD + // export OPENSDS_ENDPOINT=http://192.168.20.158:50040 + // export OPENSDS_AUTH_STRATEGY=keystone + // export OS_AUTH_URL=http://192.168.20.158/identity + // export OS_USERNAME=admin + // export OS_PASSWORD=opensds@123 + // export OS_TENANT_NAME=admin + // export OS_PROJECT_NAME=admin + // export OS_USER_DOMIN_ID=default + + ip, ok := os.LookupEnv("MULTI_CLOUD_IP") + if !ok { + fmt.Errorf("ERROR: You must provide the ip by setting " + + "the environment variable MULTI_CLOUD_IP") + } + + cfg := &client.Config{ + Endpoint: "http://" + ip + os.Getenv(client.MicroServerAddress), + } + authStrategy := os.Getenv(client.OsAuthAuthstrategy) + + switch authStrategy { + case client.Keystone: + cfg.AuthOptions = client.LoadKeystoneAuthOptions() + case client.Noauth: + cfg.AuthOptions = client.NewNoauthOptions("adminTenantId") + default: + cfg.AuthOptions = client.NewNoauthOptions("tenantId") + } + + c = client.NewClient(cfg) + + // c.ListBackends() +} diff --git a/Multi-cloud/input_demo.go b/Multi-cloud/input_demo.go new file mode 100644 index 0000000..fa34723 --- /dev/null +++ b/Multi-cloud/input_demo.go @@ -0,0 +1,116 @@ +package main + +import ( + "context" + "os" + "fmt" + "strings" + + kafka "github.com/segmentio/kafka-go" +) + +const ( + bucket = "bkt001" + KAFKA_URL = "localhost:9092" + KAFKA_TOPIC = "multicloud-input" + KAFKA_GROUP_ID = "TestGroupID" +) + +var inps = [12] string { + // Input DATA with labels + + // photo1.txt apple + // photo2.txt zebra + // photo3.txt parrot + // photo4.txt orange + // photo5.txt pear + // photo6.txt lion + // photo7.txt cat + // photo8.txt dog + // photo9.txt banana + // photo10.txt duck + // photo11.txt dove + // photo12.txt mango + + "photo1.txt", + "photo2.txt", + "photo3.txt", + "photo4.txt", + "photo5.txt", + "photo6.txt", + "photo7.txt", + "photo8.txt", + "photo9.txt", + "photo10.txt", + "photo11.txt", + "photo12.txt", + +} + +func getKafkaWriter(kafkaURL, topic, groupID string) *kafka.Writer { + brokers := strings.Split(kafkaURL, ",") + return kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: topic, + Balancer: &kafka.LeastBytes{}, + }) +} + +func upload() { + + for _, fname := range inps { + fmt.Println("Uploading [", bucket, "] [", fname, "]") + GelatoUpload(bucket, fname) + } + +} + +func processFile(filename string) []byte { + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + fmt.Println("Error Opening input file", filename, err) + return nil + } + + defer f.Close() + + buf := make([]byte, 16) + n, err := f.Read(buf) + if err != nil { + fmt.Println("Error Reading input file", filename, err) + return nil + } + + if n > 0 { + // fmt.Println("[", n, "]", string(buf)) + } + + return buf +} + +func main() { + kafkaURL := KAFKA_URL + topic := KAFKA_TOPIC + groupID := KAFKA_GROUP_ID + + writer := getKafkaWriter(kafkaURL, topic, groupID) + defer writer.Close() + + + for _, fname := range inps { + fmt.Println("Downloading [" + bucket + "] [" + fname + "]") + GelatoDownload(bucket, fname) + + value := string (processFile(fname)) + values := strings.Fields(value) + value = values[0] + + fmt.Println("filename =", fname, "Value =", value) + writer.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte(fname), + Value: []byte(value), + }, + ) + } +} diff --git a/Multi-cloud/multi-cloud-client/multi-cloud.tar.gz b/Multi-cloud/multi-cloud-client/multi-cloud.tar.gz new file mode 100644 index 0000000..13e7d58 Binary files /dev/null and b/Multi-cloud/multi-cloud-client/multi-cloud.tar.gz differ diff --git a/Multi-cloud/multi-cloud-feature.tar.xz b/Multi-cloud/multi-cloud-feature.tar.xz new file mode 100644 index 0000000..05e48a2 Binary files /dev/null and b/Multi-cloud/multi-cloud-feature.tar.xz differ diff --git a/Multi-cloud/output_demo.go b/Multi-cloud/output_demo.go new file mode 100644 index 0000000..0be55df --- /dev/null +++ b/Multi-cloud/output_demo.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "log" + "os" + "fmt" + "strings" + + kafka "github.com/segmentio/kafka-go" +) + +const ( + filename = "out.txt" + bucket = "bkt001" + KAFKA_URL = "localhost:9092" + KAFKA_TOPIC = "multicloud-output2" + KAFKA_GROUP_ID = "TestGroupID" +) + + +func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { + brokers := strings.Split(kafkaURL, ",") + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + // GroupID: groupID, + Topic: topic, + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) +} + +func main() { + + kafkaURL := KAFKA_URL + topic := KAFKA_TOPIC + groupID := KAFKA_GROUP_ID + + reader := getKafkaReader(kafkaURL, topic, groupID) + + defer reader.Close() + + tbl := map[string]string{} + + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Println("Error Opening output file", err) + return + } + + defer f.Close() + + fmt.Println("start consuming stream ...") + + for { + m, err := reader.ReadMessage(context.Background()) + if err != nil { + log.Println("Error reading stream:", err) + os.Exit(1) + } + + fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + tbl[string(m.Key)] = string(m.Value) + + err = os.Truncate(filename, 0) + if err != nil { + fmt.Println("Error Truncate ->", err) + } + + + for key, value := range tbl { + fmt.Println(key, ":", value) + fmt.Fprintf(f, "%s : %s\n", string(key), string(value)) + GelatoUpload(bucket, filename) + } + } +} diff --git a/Multi-cloud/streams-processor/pom.xml b/Multi-cloud/streams-processor/pom.xml new file mode 100644 index 0000000..267a046 --- /dev/null +++ b/Multi-cloud/streams-processor/pom.xml @@ -0,0 +1,150 @@ + + + + 4.0.0 + + streams.examples + streams.examples + 0.1 + jar + + Kafka Streams Quickstart :: Java + + + UTF-8 + 2.4.0 + 1.7.7 + 1.2.17 + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + jdt + + + + org.eclipse.tycho + tycho-compiler-jdt + 0.21.0 + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-assembly-plugin + [2.4,) + + single + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.slf4j + slf4j-jdk14 + 1.7.28 + compile + + + + org.apache.httpcomponents + httpclient + 4.5.2 + + + + diff --git a/Multi-cloud/streams-processor/setup.sh b/Multi-cloud/streams-processor/setup.sh new file mode 100755 index 0000000..7535f0d --- /dev/null +++ b/Multi-cloud/streams-processor/setup.sh @@ -0,0 +1,146 @@ +#!/bin/bash + +# set -x + + +INPUT_TOPIC="multicloud-input" +OUTPUT_TOPIC="multicloud-output" +INPUT2_TOPIC="multicloud-input2" +OUTPUT2_TOPIC="multicloud-output2" + +KAFKA_VER="kafka_2.12-2.4.1" + +TOP_DIR=$(cd $(dirname "$0") && cd .. && pwd) +KAFKA_FOLDER=$TOP_DIR/$KAFKA_VER + +echo "$TOP_DIR" +echo "$KAFKA_FOLDER" + +# usage function +function usage() +{ + cat << HEREDOC + + Usage: setup.sh [ options ] + + optional arguments: + -h, --help show this help message and exit + -c Clean and purge current installation + -d Download kafka + -s Start kafka + +HEREDOC +} + + +function download_kafka() +{ + echo "Download Kafka" + cd $TOP_DIR + wget https://downloads.apache.org/kafka/2.4.1/kafka_2.12-2.4.1.tgz + tar -xzf kafka_2.12-2.4.1.tgz +} + +function start_kafka() +{ + cd $KAFKA_FOLDER + + # Update Kafka config for delete topic support, if needed + grep -qxF 'delete.topic.enable=true' config/server.properties || echo 'delete.topic.enable=true' >> config/server.properties + + echo "Start Zookeeper" + bin/zookeeper-server-start.sh config/zookeeper.properties >zookeeper.log 2>&1 & + sleep 2 + + + + echo "Start Kafka" + bin/kafka-server-start.sh config/server.properties >kafka.log 2>&1 & + sleep 2 + + # echo "Create Kafka topics" + # bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic $INPUT_TOPIC + # bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic $OUTPUT_TOPIC + # bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic $INPUT2_TOPIC + # bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic $OUTPUT2_TOPIC + + # echo "Kafka console" + # bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic $INPUT_TOPIC + # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $OUTPUT2_TOPIC + + echo "List Topics" + bin/kafka-topics.sh --list --bootstrap-server localhost:9092 +} + +function clean() +{ + cd $KAFKA_FOLDER + + # echo "Remove topics of Kafka" + # bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic $INPUT_TOPIC + # bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic $OUTPUT_TOPIC + # bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic $INPUT2_TOPIC + # bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic $OUTPUT2_TOPIC + + echo "Stopping Kafka" + bin/kafka-server-stop.sh + + echo "Stopping Zookeeper" + bin/zookeeper-server-stop.sh +} + +function start_app() +{ + cd $KAFKA_FOLDER + + echo "Start Stream Processor" + mvn clean package + mvn exec:java -Dexec.mainClass=myapps.DemoMultiCloudInput + # mvn exec:java -Dexec.mainClass=myapps.DemoMultiCloudOutput +} + +OPTIND=1 # Reset in case getopts has been used previously in the shell. + +while getopts ":h?cds" opt; do + case "$opt" in + h|\?) + usage + exit 0 + ;; + :) echo "option -$OPTARG requires an argumnet" + usage + ;; + c) clean="1" + ;; + d) download="1" + ;; + s) startkafka="1" + ;; + + esac +done + +shift $((OPTIND-1)) + +[ "${1:-}" = "--" ] && shift + +if [ "$clean" = "1" ]; then + clean + exit +fi + +if [ "$download" = "1" ]; then + download_kafka + exit +fi + +if [ "$startkafka" = "1" ]; then + start_kafka + exit +fi + +# --- Start --- + +download_kafka +start_kafka +# start_app diff --git a/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudInput.java b/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudInput.java new file mode 100644 index 0000000..c7bace2 --- /dev/null +++ b/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudInput.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package myapps; + +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** + * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + *

+ * Note: This is simplified code that only works correctly for single partition input topics. + * Check out {@link WordCountDemo} for a generic example. + *

+ * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + *

+ * Before running this example you must create the input topic and the output topic (e.g. via + * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via + * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic. + */ +public final class DemoMultiCloudInput { + + static class MyProcessorSupplier implements ProcessorSupplier { + + @Override + public Processor get() { + return new Processor() { + private ProcessorContext context; + private KeyValueStore kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(final ProcessorContext context) { + this.context = context; + this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { + try (final KeyValueIterator iter = kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); + + while (iter.hasNext()) { + final KeyValue entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + } + }); + this.kvStore = (KeyValueStore) context.getStateStore("Counts"); + } + + @Override + public void process(final String dummy, final String line) { + final String[] words = line.toLowerCase(Locale.getDefault()).split(" "); + + for (final String word : words) { + String thing; + + switch (line) { + case "apple": + case "orange": + case "pear": + case "banana": + case "mango": + thing = "Fruits"; + break; + case "parrot": + case "duck": + case "dove": + thing = "Birds"; + break; + case "zebra": + case "cat": + case "dog": + case "lion": + thing = "Animals"; + break; + default: + thing = "Unknown"; + } + + final Integer oldValue = this.kvStore.get(thing); + + if (oldValue == null) { + this.kvStore.put(thing, 1); + } else { + this.kvStore.put(thing, oldValue + 1); + } + } + + context.commit(); + } + + @Override + public void close() {} + }; + } + } + + public static void main(final String[] args) { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "multicloud-input-demo"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final Topology builder = new Topology(); + + builder.addSource("Source", "multicloud-input"); + + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("Counts"), + Serdes.String(), + Serdes.Integer()), + "Process"); + + builder.addSink("Sink", "multicloud-output", "Process"); + + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("multicloud-input-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (final Throwable e) { + System.exit(1); + } + System.out.println("In main"); + System.exit(0); + } +} diff --git a/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudOutput.java b/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudOutput.java new file mode 100644 index 0000000..b2c3a42 --- /dev/null +++ b/Multi-cloud/streams-processor/src/main/java/myapps/DemoMultiCloudOutput.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package myapps; + +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +/** + * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + *

+ * Note: This is simplified code that only works correctly for single partition input topics. + * Check out {@link WordCountDemo} for a generic example. + *

+ * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + *

+ * Before running this example you must create the input topic and the output topic (e.g. via + * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via + * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic. + */ +public final class DemoMultiCloudOutput { + + static class MyProcessorSupplier implements ProcessorSupplier { + + @Override + public Processor get() { + return new Processor() { + private ProcessorContext context; + private KeyValueStore kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(final ProcessorContext context) { + this.context = context; + this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { + try (final KeyValueIterator iter = kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); + + while (iter.hasNext()) { + final KeyValue entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + } + }); + this.kvStore = (KeyValueStore) context.getStateStore("Counts"); + } + + @Override + public void process(final String dummy, final String line) { + final String[] words = line.toLowerCase(Locale.getDefault()).split(" "); + + for (final String word : words) { + final Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } + } + + context.commit(); + } + + @Override + public void close() {} + }; + } + } + + public static void main(final String[] args) { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "multicloud-output-demo"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final Topology builder = new Topology(); + + builder.addSource("Source", "multicloud-input2"); + + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("Counts"), + Serdes.String(), + Serdes.Integer()), + "Process"); + + builder.addSink("Sink", "multicloud-output2", "Process"); + + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("multicloud-output-demo-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (final Throwable e) { + System.exit(1); + } + System.exit(0); + } +} diff --git a/Multi-cloud/streams-processor/src/main/resources/log4j.properties b/Multi-cloud/streams-processor/src/main/resources/log4j.properties new file mode 100644 index 0000000..b620f1b --- /dev/null +++ b/Multi-cloud/streams-processor/src/main/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file