diff --git a/Dockerfile b/Dockerfile index cc000e6..c9713d0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,24 @@ RUN apt-get update ENV \ SCALA_MAJOR_VERSION=2.11 \ SBT_VERSION=0.13.8 \ - KAFKA_CLIENT_VERSION=0.10.1.0 + KAFKA_CLIENT_VERSION=0.10.1.0 \ + DEV_INSTALL_HOME=~ \ + STREAMAI_HOME=$DEV_INSTALL_HOME/streamai \ + CONFIG_HOME=$STREAMAI_HOME/config \ + SCRIPTS_HOME=$STREAMAI_HOME/binsc \ + LOGS_HOME=$STREAMAI_HOME/logs \ + JAVA_HOME=/usr/lib/jvm/java-8-oracle \ + JRE_HOME=$JAVA_HOME/jre \ + PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH \ + PATH=$SCRIPTS_HOME/service:$SCRIPTS_HOME/util:$PATH \ + KAFKA_HOME=$DEV_INSTALL_HOME/kafka_2.11-0.10.1.0 \ + PATH=$KAFKA_HOME/bin:$PATH \ + ZOOKEEPER_HOME=$KAFKA_HOME \ + PATH=$ZOOKEEPER_HOME/bin:$PATH \ + FLASK_APP=$STREAMAI_HOME/python/flask-producer.py \ + LC_ALL=C.UTF-8 \ + LANG=C.UTF-8 + RUN \ apt-get update \ @@ -37,7 +54,7 @@ RUN \ && apt-get install -y apache2 \ && apt-get install -y libssl-dev \ && apt-get install -y python3.5 \ - && apt-get install -y python3-pip + && apt-get install -y python3-pip RUN \ # Maven for custom builds @@ -50,23 +67,32 @@ RUN \ && pip3 install kafka-python==1.3.2 \ && pip3 install bokeh==0.12.4 +# Create a directory to +RUN mkdir ~/.ssh +COPY ./config/ssh_config .ssh/config + RUN \ -# Sbt +# git clone during image build and setting enviroment variables + cd ~ \ + && git clone https://yogeshgo05@github.com/abgoswam/streamai + +RUN \ + #Sbt cd ~ \ && wget https://dl.bintray.com/sbt/native-packages/sbt/${SBT_VERSION}/sbt-${SBT_VERSION}.tgz \ && tar xvzf sbt-${SBT_VERSION}.tgz \ && rm sbt-${SBT_VERSION}.tgz \ && ln -s /root/sbt/bin/sbt /usr/local/bin \ -# Sbt Clean - This seems weird, but it triggers the full Sbt install which involves a lot of external downloads - && sbt clean clean-files - + #Sbt Clean - This seems weird, but it triggers the full Sbt install which involves a lot of external downloads + && sbt clean clean-files RUN \ -# Apache Kafka + #Apache Kafka cd ~ \ && wget http://apache.claz.org/kafka/${KAFKA_CLIENT_VERSION}/kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz \ && tar -xvzf kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz \ && rm kafka_${SCALA_MAJOR_VERSION}-${KAFKA_CLIENT_VERSION}.tgz + diff --git a/python/flask-producer.py b/python/flask-producer.py index 36c7ceb..c19aa2c 100644 --- a/python/flask-producer.py +++ b/python/flask-producer.py @@ -1,16 +1,40 @@ +import csv +import random +import time + +import flask from flask import Flask, Response +from flask import Flask, jsonify +from jinja2 import Template + + +from bokeh.plotting import figure +from bokeh.models import AjaxDataSource +from bokeh.embed import components +from bokeh.resources import INLINE +from bokeh.util.string import encode_utf8 # Creating a flask app app = Flask(__name__) -import csv - -import random -import time -import os from kafka import KafkaProducer +def stream_template(template_name, **context): + """Utility function to test the flask + streaming capability + + :param template_name: name of the template + :param context: context streaming . + :return: + """ + app.update_template_context(context) + t = app.jinja_env.get_template(template_name) + rv = t.stream(context) + rv.enable_buffering(5) + return rv + + def load_csv(filename): with open(filename) as f: csv_reader = csv.reader(f, delimiter=',') @@ -28,18 +52,23 @@ def read_input_file(input_file): dataset = load_csv(input_file) return dataset +count = 0 +train_Y_1 = [] +train_Y_2 = [] -@app.route('/') -def plotly(): - """This reads the from winequality-red-scaled.csv for now . +x, y = 0.0, 0.0 - Returns actual html as a response red from the file - """ +script = '' +div = '' - data_file_winequality = os.path.join(os.environ['STREAMAI_HOME'], 'python/data_files/winequality-red-scaled.csv') - dataset = read_input_file(data_file_winequality) - producer = KafkaProducer(bootstrap_servers='localhost:9092') +def generate_next_row(): + """Utility for asynchronous request to + api for sending the data to kafka + + Yields the one row from wine file at the time . + """ + dataset = read_input_file("./data_files/winequality-red-scaled.csv") for line in dataset: @@ -47,48 +76,91 @@ def plotly(): for col in line: one_line.append(col) - one_line = ','.join(str(i) for i in one_line) + quality = one_line[-1] + one_line = ','.join(str(i) for i in one_line[:-1]) + r = random.randint(1, 10) # random sleep time between [1,10] seconds time.sleep(r) - producer.send("teststreamai1", str.encode(one_line)) + all_values = one_line.split(',') + + send_value = "("+quality+",[" + one_line + "])" + + producer = KafkaProducer(bootstrap_servers='localhost:9092') + + producer.send('teststreamai1', str.encode(send_value)) producer.flush() - from bokeh.plotting import figure, show, output_file - from bokeh.sampledata.iris import flowers + yield one_line.join("Send this data to moron abo \n") - import pandas as pd - df = pd.read_csv(data_file_winequality, sep=',') +@app.route("/data", methods=['POST']) +def send_data_to_bokeh_plot(): + """ This sends to data to bokeh plot currently . + Usually there is a internal api called from an + exposed endpoint . + """ + global count + x = count + y = random.randint(1, 10) + count += 1 - train = df[:1000] - test = df[1000:] + return flask.jsonify(x=[x], y=[y]) - train_X, train_Y = train.drop(['quality'], axis=1), train['quality'] - test_X, test_Y = test.drop(['quality'], axis=1), test['quality'] - print(train_Y) +@app.route('/') +def send_data_to_kafka_consumer(): + """This reads the from winequality-red-scaled.csv for now . - train_Y = [(float(x) + random.uniform(0, 1)) for x in train_Y] + """ + return Response(generate_next_row(), mimetype="text/html") + + +# Simple html / javascript template that could will be +# replaced by bokek with embeded plot javascript +# and css . +template = Template(''' + + + + Streaming Example + {{ js_resources }} + {{ css_resources }} + + + {{ plot_div }} + {{ plot_script }} + + +''') + + +@app.route('/plot_demo') +def bokeh_demo(): + """End point demoing the real time injection of the data + to bokeh plots + + Returns streaming response from the bokeh server . + """ - train_Y_1 = train_Y[:500] - train_Y_2 = train_Y[500:] + source = AjaxDataSource(data_url="http://localhost:5000/data", + polling_interval=3000, mode='append') - colors = ['red' for x in range(1000)] + source.data = dict(x=[], y=[]) - p = figure(title="Flask Stack") - p.xaxis.axis_label = 'Actual Value' - p.yaxis.axis_label = 'Predicted Value' + fig = figure(title="Streaming Example") + fig.line('x', 'y', source=source) - p.circle(train_Y_1, train_Y_2, - color=colors, fill_alpha=0.2, size=10) + js_resources = INLINE.render_js() + css_resources = INLINE.render_css() - try: - output_file("log_lines.html", title="iris.py example") - except Exception as e: - print(e.args) + script, div = components(fig, INLINE) - show(p) + html = template.render( + plot_script=script, + plot_div=div, + js_resources=js_resources, + css_resources=css_resources + ) - return Response(open('log_lines.html').read(), - mimetype="text/html") + return encode_utf8(html)