diff --git a/.gitignore b/.gitignore index bdad4a9..dd27ab8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,6 @@ /app-logs /.DS_Store /lib/__pycache__ -/src/config/__pycache__ \ No newline at end of file +/src/config/__pycache__ + +checkpoint/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2860899 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,52 @@ +FROM python:3.11-bullseye as spark-base +ARG SPARK_VERSION=3.4.3 + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + sudo \ + curl \ + vim \ + unzip \ + rsync \ + openjdk-11-jdk \ + build-essential \ + software-properties-common \ + ssh && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} +ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} + +RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} +WORKDIR ${SPARK_HOME} + +RUN curl https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ + && tar xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ + && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz + +FROM spark-base as pyspark-base + +COPY requirements.txt . +RUN pip3 install -r requirements.txt + +FROM pyspark-base as pyspark + +ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" +ENV SPARK_MASTER="spark://spark-master:7077" +ENV SPARK_MASTER_HOST spark-master +ENV SPARK_MASTER_PORT 7077 +ENV PYSPARK_PYTHON python3 + +# Copy the default configurations into $SPARK_HOME/conf +COPY spark-defaults.conf "$SPARK_HOME/conf" +COPY log4j2.properties "$SPARK_HOME/conf" + +RUN chmod u+x /opt/spark/sbin/* && \ + chmod u+x /opt/spark/bin/* + +ENV PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH + +# Copy appropriate entrypoint script +COPY entrypoint.sh . +ENTRYPOINT ["./entrypoint.sh"] \ No newline at end of file diff --git a/README.md b/README.md index c26e86c..3752108 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,109 @@ -# Java Standard Edition Set up Instructions +# Exercise Vanilla Spark +This project is a simple spark project to demonstrate the spark capabilities. This project contains below spark jobs -The Java version that is used for the examples demonstrated on the Spark Shell is Java 8. Please download and install the correct java 8 version most suitable for your machine processor (Intel, Apple Silicon etc.) from [here](https://www.oracle.com/in/java/technologies/javase/javase8u211-later-archive-downloads.html). If you already have a java version installed, install Java 8 from the link provided and manage the version using [jenv](https://www.jenv.be) (You might have to install brew and jenv as mentioned in the link). Once done, make sure that your java version is showing version `1.8.x` if you execute `java -version` command from your terminal. +The examples in `src/` can be run locally using [docker container](#running-locally-using-docker) or on your +[local machine using PyCharm](#setting-up-pyspark-locally). The examples are written in Python and can be run using +PySpark. +# Running locally using docker +The docker build use below tech-stack: +- Spark 3.4.3 +- Python 3.11 +- OpenJDK 11 -# Python Set up Instructions -We use Python version 3.9 for this set up. Please install it from this [link](https://formulae.brew.sh/formula/python@3.9). You might be having other versions of Python installed on your machine already which should be fine. In the next section, we have provided an environment variable which should help your setup point to Python 3.9. +1. Setup docker on your local machine using colima or docker desktop. +2. Run below command to run using docker + ```shell + docker-compose build + docker-compose up -d + ``` -# Apache Spark Set up Instructions +3. Above command will setup below services + - Spark Master - This is a spark master service, where all the spark jobs will be submitted + - Spark Worker - This will setup spark executor nodes, which will connect with spark master + - Spark History Server [Optional] - This will setup spark history server, keep track of spark jobs + - Streaming App [Optional] - This will setup a small streaming app required for some exercises -**Note 1:** Before you proceed with the Apache Spark local installation here, please note that the exercises in House 9 of data-derp don't need this set up. They are instead done on Pycharm Spark installation of the next section of this readMe. So if you are facing any challenges in doing this Spark local set up, we request you to proceed towards the Pycharm spark installation in the next section and complete the exercises of House 9. You can come back to this set up later on. In case if you have come here after going through Vanilla Spark videos, and you would like to practice examples on the spark-shell (and you are facing challenges in this local spark set up), we request you to get in touch with your tour guide to help you out. + If Spark History Server is not required, you can comment out the service in `docker-compose.yml` file or can run below command + ```shell + docker-compose up -d spark-worker + ``` + Since worker is dependent on master, it will start the master service as well. -The Apache Spark version that I used for the examples demonstrated use version 3.0.2. You can set it up on your local machine using the following steps +4. You can run streaming exercises using below command [Required for streaming exercises only] + ```shell + docker-compose up -d streaming-app + docker-compose exec streaming-app sh + >> nc -lk 9999 + >> this is a test string + ``` -1. Please download the file named `spark-3.0.2-bin-hadoop2.7.tgz` (It should be ~ 200MB) from this [link](https://archive.apache.org/dist/spark/spark-3.0.2/) at a preferred location on your machine -2. Extract / Un tar it to get a folder by the name "spark-3.0.2-bin-hadoop2.7".`tar –xvzf spark-3.0.2-bin-hadoop2.7-hive1.2.tgz` -3. Set up the location of the folder extracted in step 2 as your `SPARK_HOME` in your `.bash_profile` or `.zshrc` file `export SPARK_HOME="/spark-3.0.2-bin-hadoop2.7"` + docker compose exec will open a shell in the streaming app container. You can run the `nc -lk 9999` command to start the + streaming server. You can then push the data to the server by typing the data in the terminal. The data will be processed + by the streaming app and you can see the output in the terminal. + +5. In new terminal tab, you can now run the spark job using below command + ```shell + docker-compose exec spark-master spark-submit ./src/ + ``` +6. Once the services are up and running, you can access the spark master UI using below URL + ```shell + http://localhost:9090 + ``` +7. You can access the spark history server using below URL + ```shell + http://localhost:18080 + ``` + +8. Clean up the docker containers using below command + ```shell + docker-compose down -v + ``` + +# Setting up PySpark locally + +## Install JDK 11 +The Java version that is used for the examples demonstrated on the Spark Shell is Java 11. Please download and install +the correct java 11 version most suitable for your machine processor (Intel, Apple Silicon etc.). If you already have +a java version installed, install Java 118 from the link provided and manage the version using [jenv](https://www.jenv.be) +(You might have to install brew and jenv as mentioned in the link). Once done, make sure that your java version is showing version +`1.11.x` if you execute `java -version` command from your terminal. + +## Setup Python 3.11 +We use Python version 3.11 for this set up. Please install it from this [link](https://formulae.brew.sh/formula/python@3.11). You might +be having other versions of Python installed on your machine already which should be fine. In the next section, we have provided +an environment variable which should help your setup point to Python 3.11. + +# Setup Apache Spark + +**Note 1:** Before you proceed with the Apache Spark local installation here, please note that the exercises in House 9 of +data-derp don't need this set up. They are instead done on Pycharm Spark installation of the next section of this readMe. So +if you are facing any challenges in doing this Spark local set up, we request you to proceed towards the Pycharm spark installation +in the next section and complete the exercises of House 9. You can come back to this set up later on. In case if you have come +here after going through Vanilla Spark videos, and you would like to practice examples on the spark-shell (and you are facing +challenges in this local spark set up), we request you to get in touch with your tour guide to help you out. + +The Apache Spark version that I used for the examples demonstrated use version 3.4.3. You can set it up on your local +machine using the following steps + +1. Please download the file named `spark-3.4.3-bin-hadoop2.7.tgz` (It should be ~ 200MB) from this [link](https://spark.apache.org/downloads.html) +at a preferred location on your machine +2. Extract/Untar it to get a folder by the name "spark-3.4.3-bin-hadoop2.7".`tar –xvzf spark-3.4.3-bin-hadoop2.7-hive1.2.tgz` +3. Set up the location of the folder extracted in step 2 as your `SPARK_HOME` in your `.bash_profile` or `.zshrc` file + ```shell + export SPARK_HOME="/spark-3.4.3-bin-hadoop2.7"` + ``` 4. Add the `bin` folder of SPARK_HOME to the path. `export PATH="$JAVA_HOME/bin:$SPARK_HOME/bin:$PATH"`) 5. You should be good to go now. Echo SPARK_HOME `echo $SPARK_HOME` from your terminal. You should be able to get the path to your spark installation location. -6. Open a new terminal and type `$SPARK_HOME/bin/spark-shell`. The spark shell should start with Spark version 3.0.2. ![Spark Shell .png](./assets/Spark%20Shell.png) -7. Before proceeding, set up `export PYSPARK_PYTHON=python3.9` for your pyspark to point to Python 3.9. -8. The same can be done for pyspark `$SPARK_HOME/bin/pyspark`. Please note that in order for pyspark to work, you +6. Open a new terminal and type `$SPARK_HOME/bin/spark-shell`. The spark shell should start with Spark version 3.4.3. ![Spark Shell .png](./assets/Spark%20Shell.png) +7. Before proceeding, set up `export PYSPARK_PYTHON=python3.11` for your pyspark to point to Python 3.11. +8. The same can be done for pyspark `$SPARK_HOME/bin/pyspark`. Please note that in order for pyspark to work, you need to have python installed on your machines as mentioned in the "Python Set up instructions above" above. ![PySpark Shell.png](./assets/PySpark%20Shell.png) # Repo set up in Pycharm -1. Please ensure that Python 3.9 is installed from the **Python Set up Instructions** above. +1. Please ensure that Python 3.11 is installed from the **Python Set up Instructions** above. 2. Please install PyCharm community edition from this [link](https://www.jetbrains.com/pycharm/download/#section=mac). -2. Clone this repo on the location of your choice on your machine. -3. Open the Repo in Pycharm. +3. Clone this repo on the location of your choice on your machine and import the Repo in Pycharm. 4. Go to your Pycharm **Settings**, select your project and select the **Python Interpreter** option. ![Pycharm ReadMe Step - 4.png](./assets/Pycharm%20ReadMe%20Step%20-%204.png) 5. Make sure that you select **No Interpreter** from the `Python Interpreter` **dropdown** and click **Add Interpreter** and select **Add Local Interpreter**. ![Pycharm ReadMe Step - 5.png](./assets/Pycharm%20ReadMe%20Step%20-%205.png) 6. A dialog box will Pop up. Select the option of **Virtualenv Environment**. ![Pycharm ReadMe Step - 6.png](./assets/Pycharm%20ReadMe%20Step%20-%206.png) @@ -35,5 +112,4 @@ The Apache Spark version that I used for the examples demonstrated use version 3 9. Now if your Python Interpreter is created and selected as per the instructions above, you should get a message like `Package requirements 'pyspark...' etc. are not installed`. Click on the **install requirement** link to install the plug-ins required for this repo. These plug-ins are listed down in the `requirements.txt` of this repo. ![Pycharm ReadMe Step - 9.png](./assets/Pycharm%20ReadMe%20Step%20-%209.png) 10. You are all set now to run your first program of this repo. Open source file `00 - File Reads.py` from the SRC folder of this repo and run it. It should give you the desired output of the dataframe as shown below. ![Pycharm ReadMe Step - 10.png](./assets/Pycharm%20ReadMe%20Step%20-%2010.png) -**PS**: Please note that you don't need a separate Apache Spark installation on your local machine to run the examples of this repo from Pycharm. The PySpark plug-in you have configured above should be sufficient. - +**PS**: Please note that you don't need a separate Apache Spark installation on your local machine to run the examples of this repo from Pycharm. The PySpark plug-in you have configured above should be sufficient. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5029f57 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,62 @@ +services: + spark-master: + container_name: spark-master + build: + context: . + image: spark-vanilla + entrypoint: [ './entrypoint.sh', 'master' ] + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8080" ] + interval: 5s + timeout: 3s + retries: 3 + volumes: + - ./src:/opt/spark/src + - ./data:/opt/spark/data + - ./lib:/opt/spark/lib + - spark-logs:/opt/spark/spark-events + environment: + - STREAMING_APP_HOST=streaming-app + - SPARK_NO_DAEMONIZE=true + ports: + - '4040:4040' + - '9090:8080' + - '7077:7077' + + spark-worker: + image: spark-vanilla + container_name: spark-worker + entrypoint: [ './entrypoint.sh', 'worker' ] + depends_on: + - spark-master + environment: + - STREAMING_APP_HOST=streaming-app + - SPARK_NO_DAEMONIZE=true + volumes: + - ./src:/opt/spark/src + - ./data:/opt/spark/data + - ./lib:/opt/spark/lib + - spark-logs:/opt/spark/spark-events + ports: + - '8081:8081' + + spark-history-server: + container_name: spark-history + image: spark-vanilla + entrypoint: [ './entrypoint.sh', 'history' ] + depends_on: + - spark-master + environment: + - SPARK_NO_DAEMONIZE=true + volumes: + - spark-logs:/opt/spark/spark-events + ports: + - '18080:18080' + + streaming-app: + container_name: streaming-app + image: subfuzion/netcat + entrypoint: [ 'tail', '-f', '/dev/null'] + +volumes: + spark-logs: \ No newline at end of file diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100755 index 0000000..0163548 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +SPARK_WORKLOAD=$1 + +echo "SPARK_WORKLOAD: $SPARK_WORKLOAD" + +if [ "$SPARK_WORKLOAD" == "master" ]; +then + start-master.sh -p 7077 + +elif [ "$SPARK_WORKLOAD" == "worker" ]; +then + WORKER_PORT=${2:-8081} + echo "$WORKER_PORT" + + start-worker.sh spark://spark-master:7077 --webui-port $WORKER_PORT +elif [ "$SPARK_WORKLOAD" == "history" ] +then + start-history-server.sh +elif [ "$SPARK_WORKLOAD" == "connect" ] +then + start-connect-server.sh --driver-memory 512M --executor-memory 500M --executor-cores 1 --packages org.apache.spark:spark-connect_2.12:3.4.0 +fi \ No newline at end of file diff --git a/log4j.properties b/log4j.properties deleted file mode 100644 index 87238fd..0000000 --- a/log4j.properties +++ /dev/null @@ -1,40 +0,0 @@ -# Set everything to be logged to the console -log4j.rootCategory=WARN, console - -# define console appender -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -#application log -log4j.logger.guru.learningjournal.spark.examples=INFO, console, file -log4j.additivity.guru.learningjournal.spark.examples=false - -#define rolling file appender -log4j.appender.file=org.apache.log4j.RollingFileAppender -log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log -#log4j.appender.file.File=app-logs/hello-spark.log -#define following in Java System -# -Dlog4j.configuration=file:log4j.properties -# -Dlogfile.name=hello-spark -# -Dspark.yarn.app.container.log.dir=app-logs -log4j.appender.file.ImmediateFlush=true -log4j.appender.file.Append=false -log4j.appender.file.MaxFileSize=500MB -log4j.appender.file.MaxBackupIndex=2 -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - - -# Recommendations from Spark template -log4j.logger.org.apache.spark.repl.Main=WARN -log4j.logger.org.spark_project.jetty=WARN -log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO -log4j.logger.org.apache.parquet=ERROR -log4j.logger.parquet=ERROR -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL -log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR - diff --git a/log4j2.properties b/log4j2.properties new file mode 100644 index 0000000..e30071c --- /dev/null +++ b/log4j2.properties @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = console + +# In the pattern layout configuration below, we specify an explicit `%ex` conversion +# pattern for logging Throwables. If this was omitted, then (by default) Log4J would +# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional +# class packaging information. That extra information can sometimes add a substantial +# performance overhead, so we disable it in our default logging config. +# For more information, see SPARK-39361. +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex + +# Set the default spark-shell/spark-sql log level to WARN. When running the +# spark-shell/spark-sql, the log level for these classes is used to overwrite +# the root logger's log level, so that the user can have different defaults +# for the shell and regular Spark apps. +logger.repl.name = org.apache.spark.repl.Main +logger.repl.level = warn \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 274e77c..beb3896 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pyspark~=3.0.2 +pyspark~=3.4.3 pip~=21.3.1 wheel~=0.37.1 setuptools~=60.2.0 \ No newline at end of file diff --git a/spark-defaults.conf b/spark-defaults.conf new file mode 100644 index 0000000..4bb9e28 --- /dev/null +++ b/spark-defaults.conf @@ -0,0 +1,11 @@ +spark.master=spark://spark-master:7077 + +# If running locally, use the following instead +# spark.master=local[3] + +# https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact +spark.eventLog.enabled=true +spark.eventLog.dir=/opt/spark/spark-events +spark.history.fs.logDirectory=/opt/spark/spark-events + +spark.sql.shuffle.partitions=2 \ No newline at end of file diff --git a/spark.conf b/spark.conf deleted file mode 100644 index bd96259..0000000 --- a/spark.conf +++ /dev/null @@ -1,3 +0,0 @@ -spark.app.name = HelloSpark -spark.master = local[3] -spark.sql.shuffle.partitions = 2 \ No newline at end of file diff --git a/src/00 - File Reads.py b/src/00 - File Reads.py index 64c365e..2fa8bc5 100644 --- a/src/00 - File Reads.py +++ b/src/00 - File Reads.py @@ -1,13 +1,9 @@ -from pyspark.sql import * # from lib.logger import Log4j from config.definitions import DATA_DIR +from config.spark import init_spark if __name__ == "__main__": - # conf = get_spark_app_config() - - spark = SparkSession.builder \ - .appName("Spark File Reads") \ - .getOrCreate() + spark = init_spark("File Read") # logger = Log4j(spark) diff --git a/src/01 - Shuffle Partitions.py b/src/01 - Shuffle Partitions.py index aab7da7..cf8d70c 100644 --- a/src/01 - Shuffle Partitions.py +++ b/src/01 - Shuffle Partitions.py @@ -1,6 +1,8 @@ from pyspark.sql import * from pyspark.sql.types import * + +from config.spark import init_spark from lib.logger import Log4j # from lib.utils import * from pyspark.sql.functions import year, month, dayofmonth @@ -10,9 +12,7 @@ from config.definitions import DATA_DIR, OUTPUT_DIR if __name__ == "__main__": - spark = SparkSession.builder \ - .appName("PySpark Shuffle Example") \ - .getOrCreate() + spark = init_spark("PySpark Shuffle Partitions") logger = Log4j(spark) diff --git a/src/02 - StreamingSocketRead.py b/src/02 - StreamingSocketRead.py index fd53eec..e708725 100644 --- a/src/02 - StreamingSocketRead.py +++ b/src/02 - StreamingSocketRead.py @@ -1,29 +1,19 @@ -from pyspark.sql import SparkSession +from config.spark import init_spark, get_socket_stream -spark = SparkSession \ - .builder \ - .appName("StructuredNetworkWordCount") \ - .getOrCreate() +if __name__ == "__main__": + spark = init_spark("PySpark Streaming Socket Read") -spark.sparkContext.setLogLevel("ERROR") + # Create DataFrame representing the stream of input lines from connection to localhost:9999 + # This acts like the source of data -# Create DataFrame representing the stream of input lines from connection to localhost:9999 -# This acts like the source of data -linesDF = spark \ - .readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + lines_df = get_socket_stream(spark) + # linesDF.show -# linesDF.show + # This is like writing the data to the sink, console in this case + query = lines_df \ + .writeStream \ + .outputMode("append") \ + .format("console") \ + .start() - -# This is like writing the data to the sink, console in this case -query = linesDF \ - .writeStream \ - .outputMode("append") \ - .format("console") \ - .start() - -query.awaitTermination() + query.awaitTermination() diff --git a/src/03 - StreamingSocketTrigger.py b/src/03 - StreamingSocketTrigger.py index 7bcef4a..b981cfd 100644 --- a/src/03 - StreamingSocketTrigger.py +++ b/src/03 - StreamingSocketTrigger.py @@ -1,28 +1,18 @@ -from pyspark.sql import SparkSession +from config.spark import init_spark, get_socket_stream -spark = SparkSession \ - .builder \ - .appName("TriggerExample") \ - .getOrCreate() +if __name__ == "__main__": + spark = init_spark("PySpark Streaming Socket Read") -spark.sparkContext.setLogLevel("ERROR") + # Create DataFrame representing the stream of input lines from connection to localhost:9999 + # This acts like the source of data + linesDF = get_socket_stream(spark) -# Create DataFrame representing the stream of input lines from connection to localhost:9999 -# This acts like the source of data -linesDF = spark \ - .readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + # This is like writing the data to the sink, console in this case + query = linesDF \ + .writeStream \ + .outputMode("append") \ + .format("console") \ + .trigger(processingTime='15 seconds') \ + .start() - -# This is like writing the data to the sink, console in this case -query = linesDF \ - .writeStream \ - .outputMode("append") \ - .format("console") \ - .trigger(processingTime='15 seconds') \ - .start() - -query.awaitTermination() + query.awaitTermination() diff --git a/src/04 - StreamingSocketWordCount.py b/src/04 - StreamingSocketWordCount.py index 612c7d2..4f9fb58 100644 --- a/src/04 - StreamingSocketWordCount.py +++ b/src/04 - StreamingSocketWordCount.py @@ -1,49 +1,41 @@ -from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split -spark = SparkSession \ - .builder \ - .appName("StructuredNetworkWordCount") \ - .getOrCreate() +from config.spark import init_spark, get_socket_stream -spark.sparkContext.setLogLevel("ERROR") +if __name__ == "__main__": + spark = init_spark("StructuredNetworkWordCount") -# Create DataFrame representing the stream of input lines from connection to localhost:9999 -# This acts like the source of data -lines = spark \ - .readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + # Create DataFrame representing the stream of input lines from connection to localhost:9999 + # This acts like the source of data + lines = get_socket_stream(spark) -# Split the lines into words -words = lines.select( - explode( - split(lines.value, " ") - ).alias("word") -) + # Split the lines into words + words = lines.select( + explode( + split(lines.value, " ") + ).alias("word") + ) -print(words.printSchema()) + print(words.printSchema()) -# Generate running word count -wordCounts = words.groupBy("word").count() + # Generate running word count + wordCounts = words.groupBy("word").count() -print(wordCounts.printSchema()) + print(wordCounts.printSchema()) -# Keep the partitions as 3 -spark.conf.set("spark.sql.shuffle.partitions", "3") + # Keep the partitions as 3 + spark.conf.set("spark.sql.shuffle.partitions", "3") -# print('Before Setting -> ' + spark.conf.get("spark.sql.adaptive.enabled")) -# spark.conf.set("spark.sql.adaptive.enabled", False) -# print('Before Setting -> ' + spark.conf.get("spark.sql.adaptive.enabled")) + # print('Before Setting -> ' + spark.conf.get("spark.sql.adaptive.enabled")) + # spark.conf.set("spark.sql.adaptive.enabled", False) + # print('Before Setting -> ' + spark.conf.get("spark.sql.adaptive.enabled")) -# Start running the query that prints the running counts to the console -query = wordCounts \ - .writeStream \ - .outputMode("complete") \ - .format("console") \ - .start() + # Start running the query that prints the running counts to the console + query = wordCounts \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .start() -query.awaitTermination() + query.awaitTermination() diff --git a/src/05 - Repartition.py b/src/05 - Repartition.py index 86c8e54..77b6703 100644 --- a/src/05 - Repartition.py +++ b/src/05 - Repartition.py @@ -1,20 +1,13 @@ -from pyspark.sql import * +from pyspark.sql import SparkSession -from pyspark.sql.types import * +from config.definitions import DATA_DIR, OUTPUT_DIR +from config.spark import init_spark from lib.logger import Log4j -from pyspark.sql.functions import year, month, dayofmonth -from pyspark.sql import SparkSession -from datetime import date, timedelta -from pyspark.sql.types import IntegerType, DateType, StringType, StructType, StructField -from config.definitions import DATA_DIR,OUTPUT_DIR if __name__ == "__main__": - spark = SparkSession.builder \ - .appName("PySpark Repartition Example") \ - .getOrCreate() + spark = init_spark("Repartition Example") logger = Log4j(spark) - logger.info("STARTED - Repartition Example ") # Paths diff --git a/src/06 - Partition By.py b/src/06 - Partition By.py index 67ee049..6df411e 100644 --- a/src/06 - Partition By.py +++ b/src/06 - Partition By.py @@ -1,18 +1,11 @@ -from pyspark.sql import * -import pyspark.sql.functions as F -from pyspark.sql.types import * -from lib.logger import Log4j from config.definitions import DATA_DIR, OUTPUT_DIR +from config.spark import init_spark +from lib.logger import Log4j if __name__ == "__main__": - spark = SparkSession.builder \ - .appName("Partitions App ->") \ - .getOrCreate() - - conf = spark.conf + spark = init_spark("PartitionBy Example") logger = Log4j(spark) - logger.info("STARTED Partition By") # Paths diff --git a/src/07 - Buckets By.py b/src/07 - Buckets By.py index eef8bf7..67a0bc4 100644 --- a/src/07 - Buckets By.py +++ b/src/07 - Buckets By.py @@ -1,12 +1,9 @@ -from lib.logger import Log4j -from pyspark.sql import SparkSession - from config.definitions import DATA_DIR +from config.spark import init_spark +from lib.logger import Log4j if __name__ == "__main__": - spark = SparkSession.builder \ - .appName("Spark Buckets") \ - .getOrCreate() + spark = init_spark("BucketsBy Example") conf = spark.conf # conf.set("spark.sql.shuffle.partitions", 2) diff --git a/src/08 - Long Spark Program.py b/src/08 - Long Spark Program.py index 7b462ef..9ed5a92 100644 --- a/src/08 - Long Spark Program.py +++ b/src/08 - Long Spark Program.py @@ -1,11 +1,9 @@ -from pyspark.sql import * +from config.spark import init_spark from lib.logger import Log4j if __name__ == "__main__": - spark = SparkSession.builder \ - .appName("Hello Spark") \ - .getOrCreate() + spark = init_spark("Long Spark Program") logger = Log4j(spark) diff --git a/src/10 - StreamingWindow.py b/src/10 - StreamingWindow.py index 6ad668a..8751859 100644 --- a/src/10 - StreamingWindow.py +++ b/src/10 - StreamingWindow.py @@ -1,41 +1,33 @@ -from pyspark.sql import SparkSession +from pyspark.sql.functions import current_timestamp from pyspark.sql.functions import explode from pyspark.sql.functions import split -from pyspark.sql.functions import current_timestamp from pyspark.sql.functions import window +from config.spark import init_spark, get_socket_stream -spark = SparkSession \ - .builder \ - .appName("StructuredNetworkWordCount") \ - .getOrCreate() - -spark.sparkContext.setLogLevel("ERROR") +if __name__ == "__main__": + spark = init_spark("StructuredNetworkWordCountWindowed") -socketStreamDF = spark.readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + socketStreamDF = get_socket_stream(spark) -currentTimeDF = socketStreamDF.withColumn("processingTime", current_timestamp()) + currentTimeDF = socketStreamDF.withColumn("processingTime", current_timestamp()) -# Split the lines into words -wordsDF = currentTimeDF.select( - explode( - split(currentTimeDF.value, " ") - ).alias("word"), currentTimeDF.processingTime.alias("Time")) + # Split the lines into words + wordsDF = currentTimeDF.select( + explode( + split(currentTimeDF.value, " ") + ).alias("word"), currentTimeDF.processingTime.alias("Time")) -windowedWords = wordsDF\ - .groupBy(window(wordsDF.Time, "1 minute"), wordsDF.word)\ - .count().orderBy("window") + windowedWords = wordsDF\ + .groupBy(window(wordsDF.Time, "1 minute"), wordsDF.word)\ + .count().orderBy("window") -# Start running the query that prints the running counts to the console -query = windowedWords \ - .writeStream \ - .outputMode("complete") \ - .format("console") \ - .option("truncate", "false") \ - .start() + # Start running the query that prints the running counts to the console + query = windowedWords \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .option("truncate", "false") \ + .start() -query.awaitTermination() + query.awaitTermination() diff --git a/src/11 - StreamingTumblingWindow.py b/src/11 - StreamingTumblingWindow.py index 1f6c300..68acd18 100644 --- a/src/11 - StreamingTumblingWindow.py +++ b/src/11 - StreamingTumblingWindow.py @@ -2,39 +2,33 @@ from pyspark.sql.functions import * from pyspark.sql.types import * -spark = SparkSession \ - .builder \ - .appName("Tumbling Window Wordcount") \ - .getOrCreate() - -spark.sparkContext.setLogLevel("ERROR") - -socketStreamDF = spark.readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() - -stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ - .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ - .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) - -stocksDF.printSchema() - -# Group the data by window and word and compute the count of each group -windowedWords = stocksDF\ - .groupBy(window("EventTime", "1 minute"), stocksDF.symbol)\ - .agg(sum("price").alias("totalPrice")) - -windowedWords.printSchema() - -# This is like writing the data to the sink, console in this case -query = windowedWords \ - .writeStream \ - .outputMode("complete") \ - .format("console") \ - .option('truncate', 'false') \ - .start() +from config.spark import init_spark, get_socket_stream + +if __name__ == "__main__": + spark = init_spark("Tumbling Window") + + socketStreamDF = get_socket_stream(spark) + + stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ + .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ + .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) + + stocksDF.printSchema() + + # Group the data by window and word and compute the count of each group + windowedWords = stocksDF\ + .groupBy(window("EventTime", "1 minute"), stocksDF.symbol)\ + .agg(sum("price").alias("totalPrice")) + + windowedWords.printSchema() + + # This is like writing the data to the sink, console in this case + query = windowedWords \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .option('truncate', 'false') \ + .start() query.awaitTermination() diff --git a/src/12 - StreamingTumblingWindowWithWatermark.py b/src/12 - StreamingTumblingWindowWithWatermark.py index 39d8dde..e9cf362 100644 --- a/src/12 - StreamingTumblingWindowWithWatermark.py +++ b/src/12 - StreamingTumblingWindowWithWatermark.py @@ -1,47 +1,40 @@ -from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * -spark = SparkSession \ - .builder \ - .appName("Tumbling Window Wordcount") \ - .getOrCreate() +from config.spark import init_spark, get_socket_stream -spark.sparkContext.setLogLevel("ERROR") +if __name__ == "__main__": + spark = init_spark("Tumbling Window With Watermark") -socketStreamDF = spark.readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + socketStreamDF = get_socket_stream(spark) -stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ - .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ - .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) + stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ + .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ + .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) -stocksDF.printSchema() + stocksDF.printSchema() -# Group the data by window and word and compute the count of each group -windowedWords = stocksDF\ - .withWatermark("EventTime", "2 minute") \ - .groupBy(window("EventTime", "1 minute"), stocksDF.symbol)\ - .agg(max("price").alias("totalPrice")) + # Group the data by window and word and compute the count of each group + windowedWords = stocksDF\ + .withWatermark("EventTime", "2 minute") \ + .groupBy(window("EventTime", "1 minute"), stocksDF.symbol)\ + .agg(max("price").alias("totalPrice")) -windowedWords.printSchema() + windowedWords.printSchema() -# outputDF = windowedWords.select("window.start", "window.end", "Symbol", "TotalPrice") + # outputDF = windowedWords.select("window.start", "window.end", "Symbol", "TotalPrice") -# outputDF.printSchema() + # outputDF.printSchema() -# This is like writing the data to the sink, console in this case -query = windowedWords \ - .writeStream \ - .outputMode("update") \ - .format("console") \ - .option('truncate', 'false') \ - .start() + # This is like writing the data to the sink, console in this case + query = windowedWords \ + .writeStream \ + .outputMode("update") \ + .format("console") \ + .option('truncate', 'false') \ + .start() -query.awaitTermination() + query.awaitTermination() diff --git a/src/13 - StreamingSlidingWindowWithWatermark.py b/src/13 - StreamingSlidingWindowWithWatermark.py index 094ff84..36e5b9f 100644 --- a/src/13 - StreamingSlidingWindowWithWatermark.py +++ b/src/13 - StreamingSlidingWindowWithWatermark.py @@ -1,47 +1,40 @@ -from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * -spark = SparkSession \ - .builder \ - .appName("Tumbling Window Wordcount") \ - .getOrCreate() +from config.spark import init_spark, get_socket_stream -spark.sparkContext.setLogLevel("ERROR") +if __name__ == "__main__": + spark = init_spark("Sliding Window With Watermark") -socketStreamDF = spark.readStream \ - .format("socket") \ - .option("host", "localhost") \ - .option("port", 9999) \ - .load() + socketStreamDF = get_socket_stream(spark) -stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ - .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ - .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) + stocksDF = socketStreamDF.withColumn("value", split("value", ","))\ + .withColumn("EventTime", to_timestamp(col("value")[0], "yyyy-MM-dd HH:mm:ss")) \ + .withColumn("symbol", col("value")[1]).withColumn("price", col("value")[2].cast(DoubleType())) -stocksDF.printSchema() + stocksDF.printSchema() -# Group the data by window and word and compute the count of each group -windowedWords = stocksDF\ - .withWatermark("EventTime", "2 minute") \ - .groupBy(window("EventTime", "1 minute", "30 seconds"), stocksDF.symbol)\ - .agg(max("price").alias("MaximumPrice")) + # Group the data by window and word and compute the count of each group + windowedWords = stocksDF\ + .withWatermark("EventTime", "2 minute") \ + .groupBy(window("EventTime", "1 minute", "30 seconds"), stocksDF.symbol)\ + .agg(max("price").alias("MaximumPrice")) -windowedWords.printSchema() + windowedWords.printSchema() -# outputDF = windowedWords.select("window.start", "window.end", "Symbol", "TotalPrice") + # outputDF = windowedWords.select("window.start", "window.end", "Symbol", "TotalPrice") -# outputDF.printSchema() + # outputDF.printSchema() -# This is like writing the data to the sink, console in this case -query = windowedWords \ - .writeStream \ - .outputMode("update") \ - .format("console") \ - .option('truncate', 'false') \ - .start() + # This is like writing the data to the sink, console in this case + query = windowedWords \ + .writeStream \ + .outputMode("update") \ + .format("console") \ + .option('truncate', 'false') \ + .start() -query.awaitTermination() + query.awaitTermination() diff --git a/src/config/spark.py b/src/config/spark.py new file mode 100644 index 0000000..575ece6 --- /dev/null +++ b/src/config/spark.py @@ -0,0 +1,23 @@ +from os import environ + +from pyspark.sql import SparkSession + + +def init_spark(app_name: str) -> SparkSession: + spark = SparkSession.builder \ + .appName(app_name) \ + .getOrCreate() + spark.sparkContext.setLogLevel("ERROR") + + return spark + + +def get_socket_stream(spark: SparkSession): + streaming_app = environ.get('STREAMING_APP_HOST', "localhost") + print(f"Streaming App: {streaming_app}") + + return spark.readStream \ + .format("socket") \ + .option("host", streaming_app) \ + .option("port", 9999) \ + .load() \ No newline at end of file diff --git a/src/log4j.properties b/src/log4j.properties deleted file mode 100644 index 87238fd..0000000 --- a/src/log4j.properties +++ /dev/null @@ -1,40 +0,0 @@ -# Set everything to be logged to the console -log4j.rootCategory=WARN, console - -# define console appender -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -#application log -log4j.logger.guru.learningjournal.spark.examples=INFO, console, file -log4j.additivity.guru.learningjournal.spark.examples=false - -#define rolling file appender -log4j.appender.file=org.apache.log4j.RollingFileAppender -log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log -#log4j.appender.file.File=app-logs/hello-spark.log -#define following in Java System -# -Dlog4j.configuration=file:log4j.properties -# -Dlogfile.name=hello-spark -# -Dspark.yarn.app.container.log.dir=app-logs -log4j.appender.file.ImmediateFlush=true -log4j.appender.file.Append=false -log4j.appender.file.MaxFileSize=500MB -log4j.appender.file.MaxBackupIndex=2 -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - - -# Recommendations from Spark template -log4j.logger.org.apache.spark.repl.Main=WARN -log4j.logger.org.spark_project.jetty=WARN -log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO -log4j.logger.org.apache.parquet=ERROR -log4j.logger.parquet=ERROR -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL -log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR -