From 921e2c4c556bc2cff8a47f3efc1c1881e4f609f7 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Sat, 11 Mar 2017 09:20:18 -0800 Subject: [PATCH 1/7] First ugly version --- python/flask-producer.py | 209 +++++++++++++++++++++++++++++++++------ 1 file changed, 181 insertions(+), 28 deletions(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index 36c7ceb..630725b 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -1,5 +1,17 @@ + +import flask from flask import Flask, Response +from flask import Flask, jsonify +from jinja2 import Template +import math + +from bokeh.plotting import figure +from bokeh.models import AjaxDataSource +from bokeh.embed import components +from bokeh.resources import INLINE +from bokeh.util.string import encode_utf8 + # Creating a flask app app = Flask(__name__) @@ -11,6 +23,14 @@ from kafka import KafkaProducer +def stream_template(template_name, **context): + app.update_template_context(context) + t = app.jinja_env.get_template(template_name) + rv = t.stream(context) + rv.enable_buffering(5) + return rv + + def load_csv(filename): with open(filename) as f: csv_reader = csv.reader(f, delimiter=',') @@ -28,6 +48,84 @@ def read_input_file(input_file): dataset = load_csv(input_file) return dataset +count = 0 +train_Y_1 = [] +train_Y_2 = [] + +x, y = 0.0, 0.0 + +script ='' +div ='' + +def generate_next_row() : + dataset = read_input_file("./data_files/winequality-red-scaled.csv") + + for line in dataset: + + one_line = [] + for col in line: + one_line.append(col) + + one_line = ','.join(str(i) for i in one_line) + all_values = one_line.split(',') + yield float(all_values[len(all_values) - 1]) + + +next_data_row = generate_next_row() + + +@app.route("/data", methods=['POST']) +def send_data_to_sgd(): + global count + global train_Y_1 + global train_Y_2 + global script + global div + + # producer = KafkaProducer(bootstrap_servers='localhost:9092') + # producer.send(topic_name, str.encode(data)) + # producer.flush() + + from bokeh.plotting import figure, show, output_file + global x + + # if len(train_Y_1) > 1: + # x = train_Y_1[len(train_Y_1)-1] + + x = count + y = random.randint(1,10) + count = count +1 + # y = x + random.randrange(0, 1) + + print("Print X and Y : ") + print((x, y)) + + + return flask.jsonify(x=[x], y=[y]) + + # train_Y_1.append(float(all_values[len(all_values)-1])) + # train_Y_2.append(float(all_values[len(all_values)-1])) + + # print("--------Debug Train Ys -----") + # print(train_Y_1) + # print(len(train_Y_1)) + # print(train_Y_2) + # print(len(train_Y_2)) + # + # count= count + 1 + # + # colors = ['red' for x in range(len(train_Y_1))] + # + # p = figure(title="Flask Stack") + # p.xaxis.axis_label = 'Actual Value' + # p.yaxis.axis_label = 'Predicted Value' + # + # p.circle(train_Y_1, train_Y_2, + # color=colors, fill_alpha=0.2, size=10) + # + # from bokeh.embed import components + # script, div = components(p) + @app.route('/') def plotly(): @@ -48,47 +146,102 @@ def plotly(): one_line.append(col) one_line = ','.join(str(i) for i in one_line) - r = random.randint(1, 10) # random sleep time between [1,10] seconds - time.sleep(r) - producer.send("teststreamai1", str.encode(one_line)) + + # r = random.randint(1, 10) # random sleep time between [1,10] seconds + # time.sleep(r) + # send_data_to_sgd("AboPunk", one_line) + all_values = one_line.split(',') + train_Y_1.append(float(all_values[len(all_values)-1])) + producer = KafkaProducer(bootstrap_servers='localhost:9092') + producer.send('AboPunk', str.encode(one_line)) producer.flush() - from bokeh.plotting import figure, show, output_file - from bokeh.sampledata.iris import flowers + global train_Y_1 + global train_Y_2 + global script + global div + + return Response(stream_template('log_lines.html', rows=len(train_Y_1))) + + +# @app.route("/data", methods=['POST']) +# def get_x(): +# global x, y +# x = x + 0.1 +# y = math.sin(x) +# return flask.jsonify(x=[x], y=[y]) + +template = Template(''' + + + + Streaming Example + {{ js_resources }} + {{ css_resources }} + + + {{ plot_div }} + {{ plot_script }} + + +''') + + +@app.route('/second_shot') +def second_shot(): + + streaming = True + source = AjaxDataSource(data_url="http://localhost:5000/data", + polling_interval=3000, mode='append') + + dataset = read_input_file("./data_files/winequality-red-scaled.csv") + - import pandas as pd + producer = KafkaProducer(bootstrap_servers='localhost:9092') - df = pd.read_csv(data_file_winequality, sep=',') - train = df[:1000] - test = df[1000:] + # for line in dataset: + # + # one_line = [] + # for col in line: + # one_line.append(col) + # + # one_line = ','.join(str(i) for i in one_line) + # + # # r = random.randint(1, 10) # random sleep time between [1,10] seconds + # # time.sleep(r) + # global train_Y_1 + # global train_Y_2 + # # send_data_to_sgd("AboPunk", one_line) + # all_values = one_line.split(',') + # train_Y_1.append(float(all_values[len(all_values) - 1])) + # + # # r = random.randint(1, 10) + # # time.sleep(1) + # # producer = KafkaProducer(bootstrap_servers='localhost:9092') + # # producer.send('AboPunk', str.encode(one_line)) + # # producer.flush() - train_X, train_Y = train.drop(['quality'], axis=1), train['quality'] - test_X, test_Y = test.drop(['quality'], axis=1), test['quality'] - print(train_Y) + source.data = dict(x=[], y=[]) - train_Y = [(float(x) + random.uniform(0, 1)) for x in train_Y] + fig = figure(title="Streaming Example") + fig.line('x', 'y', source=source) - train_Y_1 = train_Y[:500] - train_Y_2 = train_Y[500:] + js_resources = INLINE.render_js() + css_resources = INLINE.render_css() - colors = ['red' for x in range(1000)] + script, div = components(fig, INLINE) - p = figure(title="Flask Stack") - p.xaxis.axis_label = 'Actual Value' - p.yaxis.axis_label = 'Predicted Value' + html = template.render( + plot_script=script, + plot_div=div, + js_resources=js_resources, + css_resources=css_resources + ) - p.circle(train_Y_1, train_Y_2, - color=colors, fill_alpha=0.2, size=10) + return encode_utf8(html) - try: - output_file("log_lines.html", title="iris.py example") - except Exception as e: - print(e.args) - show(p) - return Response(open('log_lines.html').read(), - mimetype="text/html") From e95807f4de104624a975bc2dcf67d82b0e62bfd7 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Sun, 12 Mar 2017 21:21:52 -0700 Subject: [PATCH 2/7] Flask producer changes to handle bokeh server streaming plots and separation kafa producer --- python/flask-producer.py | 212 ++++++++++++--------------------------- 1 file changed, 63 insertions(+), 149 deletions(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index 630725b..90994ee 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -1,10 +1,12 @@ +import csv +import random +import time import flask from flask import Flask, Response - from flask import Flask, jsonify from jinja2 import Template -import math + from bokeh.plotting import figure from bokeh.models import AjaxDataSource @@ -15,15 +17,17 @@ # Creating a flask app app = Flask(__name__) -import csv - -import random -import time -import os from kafka import KafkaProducer def stream_template(template_name, **context): + """Utility function to test the flask + streaming capability + + :param template_name: name of the template + :param context: context streaming . + :return: + """ app.update_template_context(context) t = app.jinja_env.get_template(template_name) rv = t.stream(context) @@ -54,10 +58,16 @@ def read_input_file(input_file): x, y = 0.0, 0.0 -script ='' -div ='' +script = '' +div = '' -def generate_next_row() : + +def generate_next_row(): + """Utility for asynchronous request to + api for sending the data to kafka + + Yields the one row from wine file at the time . + """ dataset = read_input_file("./data_files/winequality-red-scaled.csv") for line in dataset: @@ -67,111 +77,43 @@ def generate_next_row() : one_line.append(col) one_line = ','.join(str(i) for i in one_line) - all_values = one_line.split(',') - yield float(all_values[len(all_values) - 1]) + r = random.randint(1, 10) # random sleep time between [1,10] seconds + time.sleep(r) -next_data_row = generate_next_row() - - -@app.route("/data", methods=['POST']) -def send_data_to_sgd(): - global count - global train_Y_1 - global train_Y_2 - global script - global div + producer = KafkaProducer(bootstrap_servers='localhost:9092') - # producer = KafkaProducer(bootstrap_servers='localhost:9092') - # producer.send(topic_name, str.encode(data)) - # producer.flush() + producer.send('StreamAI', str.encode(one_line)) + producer.flush() - from bokeh.plotting import figure, show, output_file - global x + yield one_line.join("Send this data to moron abo \n") - # if len(train_Y_1) > 1: - # x = train_Y_1[len(train_Y_1)-1] +@app.route("/data", methods=['POST']) +def send_data_to_bokeh_plot(): + """ This sends to data to bokeh plot currently . + Usually there is a internal api called from an + exposed endpoint . + """ + global count x = count - y = random.randint(1,10) - count = count +1 - # y = x + random.randrange(0, 1) - - print("Print X and Y : ") - print((x, y)) - + y = random.randint(1, 10) + count += 1 return flask.jsonify(x=[x], y=[y]) - # train_Y_1.append(float(all_values[len(all_values)-1])) - # train_Y_2.append(float(all_values[len(all_values)-1])) - - # print("--------Debug Train Ys -----") - # print(train_Y_1) - # print(len(train_Y_1)) - # print(train_Y_2) - # print(len(train_Y_2)) - # - # count= count + 1 - # - # colors = ['red' for x in range(len(train_Y_1))] - # - # p = figure(title="Flask Stack") - # p.xaxis.axis_label = 'Actual Value' - # p.yaxis.axis_label = 'Predicted Value' - # - # p.circle(train_Y_1, train_Y_2, - # color=colors, fill_alpha=0.2, size=10) - # - # from bokeh.embed import components - # script, div = components(p) - @app.route('/') -def plotly(): - """This reads the from winequality-red-scaled.csv for now . - - Returns actual html as a response red from the file - """ - - data_file_winequality = os.path.join(os.environ['STREAMAI_HOME'], 'python/data_files/winequality-red-scaled.csv') - dataset = read_input_file(data_file_winequality) - - producer = KafkaProducer(bootstrap_servers='localhost:9092') - - for line in dataset: - - one_line = [] - for col in line: - one_line.append(col) - - one_line = ','.join(str(i) for i in one_line) +def send_data_to_kafka_consumer(): + """This reads the from winequality-red-scaled.csv for now . + """ + return Response(generate_next_row(),mimetype="text/html") - # r = random.randint(1, 10) # random sleep time between [1,10] seconds - # time.sleep(r) - # send_data_to_sgd("AboPunk", one_line) - all_values = one_line.split(',') - train_Y_1.append(float(all_values[len(all_values)-1])) - producer = KafkaProducer(bootstrap_servers='localhost:9092') - producer.send('AboPunk', str.encode(one_line)) - producer.flush() - - global train_Y_1 - global train_Y_2 - global script - global div - - return Response(stream_template('log_lines.html', rows=len(train_Y_1))) - - -# @app.route("/data", methods=['POST']) -# def get_x(): -# global x, y -# x = x + 0.1 -# y = math.sin(x) -# return flask.jsonify(x=[x], y=[y]) +# Simple html / javascript template that could will be +# replaced by bokek with embeded plot javascript +# and css . template = Template(''' @@ -188,60 +130,32 @@ def plotly(): ''') -@app.route('/second_shot') -def second_shot(): - - streaming = True - source = AjaxDataSource(data_url="http://localhost:5000/data", - polling_interval=3000, mode='append') +@app.route('/plot_demo') +def bokeh_demo(): + """End point demoing the real time injection of the data + to bokeh plots - dataset = read_input_file("./data_files/winequality-red-scaled.csv") - - - producer = KafkaProducer(bootstrap_servers='localhost:9092') - - - # for line in dataset: - # - # one_line = [] - # for col in line: - # one_line.append(col) - # - # one_line = ','.join(str(i) for i in one_line) - # - # # r = random.randint(1, 10) # random sleep time between [1,10] seconds - # # time.sleep(r) - # global train_Y_1 - # global train_Y_2 - # # send_data_to_sgd("AboPunk", one_line) - # all_values = one_line.split(',') - # train_Y_1.append(float(all_values[len(all_values) - 1])) - # - # # r = random.randint(1, 10) - # # time.sleep(1) - # # producer = KafkaProducer(bootstrap_servers='localhost:9092') - # # producer.send('AboPunk', str.encode(one_line)) - # # producer.flush() - - - source.data = dict(x=[], y=[]) - - fig = figure(title="Streaming Example") - fig.line('x', 'y', source=source) + Returns streaming response from the bokeh server . + """ - js_resources = INLINE.render_js() - css_resources = INLINE.render_css() + source = AjaxDataSource(data_url="http://localhost:5000/data", + polling_interval=3000, mode='append') - script, div = components(fig, INLINE) + source.data = dict(x=[], y=[]) - html = template.render( - plot_script=script, - plot_div=div, - js_resources=js_resources, - css_resources=css_resources - ) + fig = figure(title="Streaming Example") + fig.line('x', 'y', source=source) - return encode_utf8(html) + js_resources = INLINE.render_js() + css_resources = INLINE.render_css() + script, div = components(fig, INLINE) + html = template.render( + plot_script=script, + plot_div=div, + js_resources=js_resources, + css_resources=css_resources + ) + return encode_utf8(html) From 38d4a94810bb4c598d54343789633febf0cdec29 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Sun, 12 Mar 2017 22:08:56 -0700 Subject: [PATCH 3/7] Reduced Changes to docker , it will not automatically start the spark app --- Dockerfile | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index cc000e6..c9713d0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,24 @@ RUN apt-get update ENV \ SCALA_MAJOR_VERSION=2.11 \ SBT_VERSION=0.13.8 \ - KAFKA_CLIENT_VERSION=0.10.1.0 + KAFKA_CLIENT_VERSION=0.10.1.0 \ + DEV_INSTALL_HOME=~ \ + STREAMAI_HOME=$DEV_INSTALL_HOME/streamai \ + CONFIG_HOME=$STREAMAI_HOME/config \ + SCRIPTS_HOME=$STREAMAI_HOME/binsc \ + LOGS_HOME=$STREAMAI_HOME/logs \ + JAVA_HOME=/usr/lib/jvm/java-8-oracle \ + JRE_HOME=$JAVA_HOME/jre \ + PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH \ + PATH=$SCRIPTS_HOME/service:$SCRIPTS_HOME/util:$PATH \ + KAFKA_HOME=$DEV_INSTALL_HOME/kafka_2.11-0.10.1.0 \ + PATH=$KAFKA_HOME/bin:$PATH \ + ZOOKEEPER_HOME=$KAFKA_HOME \ + PATH=$ZOOKEEPER_HOME/bin:$PATH \ + FLASK_APP=$STREAMAI_HOME/python/flask-producer.py \ + LC_ALL=C.UTF-8 \ + LANG=C.UTF-8 + RUN \ apt-get update \ @@ -37,7 +54,7 @@ RUN \ && apt-get install -y apache2 \ && apt-get install -y libssl-dev \ && apt-get install -y python3.5 \ - && apt-get install -y python3-pip + && apt-get install -y python3-pip RUN \ # Maven for custom builds @@ -50,23 +67,32 @@ RUN \ && pip3 install kafka-python==1.3.2 \ && pip3 install bokeh==0.12.4 +# Create a directory to +RUN mkdir ~/.ssh +COPY ./config/ssh_config .ssh/config + RUN \ -# Sbt +# git clone during image build and setting enviroment variables + cd ~ \ + && git clone https://yogeshgo05@github.com/abgoswam/streamai + +RUN \ + #Sbt cd ~ \ && wget https://dl.bintray.com/sbt/native-packages/sbt/${SBT_VERSION}/sbt-${SBT_VERSION}.tgz \ && tar xvzf sbt-${SBT_VERSION}.tgz \ && rm sbt-${SBT_VERSION}.tgz \ && ln -s /root/sbt/bin/sbt /usr/local/bin \ -# Sbt Clean - This seems weird, but it triggers the full Sbt install which involves a lot of external downloads - && sbt clean clean-files - + #Sbt Clean - This seems weird, but it triggers the full Sbt install which involves a lot of external downloads + && sbt clean clean-files RUN \ -# Apache Kafka + #Apache Kafka cd ~ \ && wget http://apache.claz.org/kafka/${KAFKA_CLIENT_VERSION}/kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz \ && tar -xvzf kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz \ && rm kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz + From 5777ebbbe0cb6ac3fd59b0fda58c6004a949c82b Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Sun, 12 Mar 2017 22:38:39 -0700 Subject: [PATCH 4/7] autopep8 --- python/flask-producer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index 90994ee..39d0543 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -105,10 +105,10 @@ def send_data_to_bokeh_plot(): @app.route('/') def send_data_to_kafka_consumer(): - """This reads the from winequality-red-scaled.csv for now . + """This reads the from winequality-red-scaled.csv for now . - """ - return Response(generate_next_row(),mimetype="text/html") + """ + return Response(generate_next_row(), mimetype="text/html") # Simple html / javascript template that could will be From 0f7da550da90ed1e23e29b2dc6054c8875e247d0 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Mon, 13 Mar 2017 00:15:34 -0700 Subject: [PATCH 5/7] Data format as the spark streaming app --- python/flask-producer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index 39d0543..c9ec18a 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -81,9 +81,14 @@ def generate_next_row(): r = random.randint(1, 10) # random sleep time between [1,10] seconds time.sleep(r) + all_values = one_line.split(',') + quality = all_values[len(all_values)-1] + + send_value = "("+quality+",[" + one_line + "])" + producer = KafkaProducer(bootstrap_servers='localhost:9092') - producer.send('StreamAI', str.encode(one_line)) + producer.send('StreamAI', str.encode(send_value)) producer.flush() yield one_line.join("Send this data to moron abo \n") From b703b37d1d02a775ded9562e3ffc281b7325d700 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Mon, 13 Mar 2017 00:16:50 -0700 Subject: [PATCH 6/7] topic name changed --- python/flask-producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index c9ec18a..73786e6 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -88,7 +88,7 @@ def generate_next_row(): producer = KafkaProducer(bootstrap_servers='localhost:9092') - producer.send('StreamAI', str.encode(send_value)) + producer.send('teststreamai1', str.encode(send_value)) producer.flush() yield one_line.join("Send this data to moron abo \n") From 8acba3874f81a7ca78f216bccee83149a246d260 Mon Sep 17 00:00:00 2001 From: Yogesh Srihari Date: Mon, 13 Mar 2017 00:33:29 -0700 Subject: [PATCH 7/7] Change to format --- python/flask-producer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/flask-producer.py b/python/flask-producer.py index 73786e6..c19aa2c 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -76,13 +76,13 @@ def generate_next_row(): for col in line: one_line.append(col) - one_line = ','.join(str(i) for i in one_line) + quality = one_line[-1] + one_line = ','.join(str(i) for i in one_line[:-1]) r = random.randint(1, 10) # random sleep time between [1,10] seconds time.sleep(r) all_values = one_line.split(',') - quality = all_values[len(all_values)-1] send_value = "("+quality+",[" + one_line + "])"