From c4c03f8a7763a02b7f329fa5b125fd53e50702b6 Mon Sep 17 00:00:00 2001 From: Pluseon7 Date: Tue, 29 May 2018 15:04:09 +0800 Subject: [PATCH 1/2] pyspark rdd first commit --- examples-python/01SparkRDD/README.md | 59 +++++++++++++++++++ examples-python/01SparkRDD/spark-hw01-map.py | 8 +++ .../01SparkRDD/spark-hw02-flatmap.py | 6 ++ .../01SparkRDD/spark-hw03-filter.py | 7 +++ .../01SparkRDD/spark-hw04-mapToPair.py | 8 +++ examples-python/README.md | 21 +++++++ 6 files changed, 109 insertions(+) create mode 100644 examples-python/01SparkRDD/README.md create mode 100644 examples-python/01SparkRDD/spark-hw01-map.py create mode 100644 examples-python/01SparkRDD/spark-hw02-flatmap.py create mode 100644 examples-python/01SparkRDD/spark-hw03-filter.py create mode 100644 examples-python/01SparkRDD/spark-hw04-mapToPair.py diff --git a/examples-python/01SparkRDD/README.md b/examples-python/01SparkRDD/README.md new file mode 100644 index 0000000..46f1f2d --- /dev/null +++ b/examples-python/01SparkRDD/README.md @@ -0,0 +1,59 @@ +### 測試資料 +``` +a,123,456,789,11344,2142,123 +b,1234,124,1234,123,123 +c,123,4123,5435,1231,5345 +d,123,456,789,113,2142,143 +e,123,446,789,14,2142,113 +f,123,446,789,14,2142,1113,323 +``` + +### Map 練習: +* spark-hw01-map.py +找出測試資料所有的英文字母,結果如下: +``` +a +b +c +d +e +f +``` + +### FlatMap 練習: +* spark-hw02-flatmap.py +找出測試資料所有以","切割的資料 +結果如下: +``` +a +123 +456 +789 +11344 +2142 +123 +… +``` + +### filter 練習: +* spark-hw03-filter.py +找出測試資料所有以123與456的資料,結果如下: +``` +123 +456 +123 +1234 +1234 +… +``` + +### mapToPair 練習: +* spark-hw04-mapToPair.py +將測試資料轉換成(str,1),結果如下: +``` +(a,1) +(123,1) +(456,1) +(789,1) +… +``` \ No newline at end of file diff --git a/examples-python/01SparkRDD/spark-hw01-map.py b/examples-python/01SparkRDD/spark-hw01-map.py new file mode 100644 index 0000000..7ce4899 --- /dev/null +++ b/examples-python/01SparkRDD/spark-hw01-map.py @@ -0,0 +1,8 @@ + + +from pyspark import SparkContext +sc = SparkContext("local", "Simple App") +textFile = sc.textFile("hdfs:/spark/hw/test.txt") + +numBs = textFile.map(lambda s: s.split(',')).map(lambda x:print(x[0])).collect() + diff --git a/examples-python/01SparkRDD/spark-hw02-flatmap.py b/examples-python/01SparkRDD/spark-hw02-flatmap.py new file mode 100644 index 0000000..247af20 --- /dev/null +++ b/examples-python/01SparkRDD/spark-hw02-flatmap.py @@ -0,0 +1,6 @@ + + +from pyspark import SparkContext +sc = SparkContext("local", "Simple App") +textFile = sc.textFile("hdfs:/spark/hw/test.txt") +numBs = textFile.flatMap(lambda s: s.split(',')).map(lambda s:print(s)).collect() diff --git a/examples-python/01SparkRDD/spark-hw03-filter.py b/examples-python/01SparkRDD/spark-hw03-filter.py new file mode 100644 index 0000000..09b9d86 --- /dev/null +++ b/examples-python/01SparkRDD/spark-hw03-filter.py @@ -0,0 +1,7 @@ + +from pyspark import SparkContext +sc = SparkContext("local", "Simple App") +textFile = sc.textFile("hdfs:/spark/hw/test.txt") +numBs = textFile.flatMap(lambda s: s.split(','))\ + .filter(lambda line:"123"in line or "456" in line)\ + .map(lambda w:print(w)).collect() diff --git a/examples-python/01SparkRDD/spark-hw04-mapToPair.py b/examples-python/01SparkRDD/spark-hw04-mapToPair.py new file mode 100644 index 0000000..058af94 --- /dev/null +++ b/examples-python/01SparkRDD/spark-hw04-mapToPair.py @@ -0,0 +1,8 @@ + + +from pyspark import SparkContext +sc = SparkContext("local", "Simple App") +textFile = sc.textFile("hdfs:/spark/hw/test.txt") +numBs = textFile.flatMap(lambda s: s.split(','))\ + .map(lambda w:(w,1))\ + .map(lambda w:print(w)).collect() diff --git a/examples-python/README.md b/examples-python/README.md index 583287c..7320492 100644 --- a/examples-python/README.md +++ b/examples-python/README.md @@ -1,2 +1,23 @@ # Spark examples for Python 這邊將針對使用 Python 來進行 Spark 巨量資料處理的應用程式開發。 + + + + +### 上傳檔案到 HDFS +Spark API Example 採用以下測試資料來完成操作,可以透過 vim 或 nano 新增: +```txt +$ vim test.txt +# test data +a,123,456,789,11344,2142,123 +b,1234,124,1234,123,123 +c,123,4123,5435,1231,5345 +d,123,456,789,113,2142,143 +e,123,446,789,14,2142,113 +f,123,446,789,14,2142,1113,323 +``` +新增完成後,上傳至 HDFS 或者 OpenStack Swift 上,以下為 HDFS 範例: +```sh +$ hadoop fs -mkdir -p /spark/hw +$ hadoop fs -put test.txt /spark/hw +``` \ No newline at end of file From 49cc305c4decc50cf696c2ba23a82b55aedc6be4 Mon Sep 17 00:00:00 2001 From: Pluseon7 Date: Wed, 30 May 2018 13:49:22 +0800 Subject: [PATCH 2/2] add rudece and reduceByKey --- examples-python/01SparkRDD/README.md | 27 ++++++++++++------- .../01SparkRDD/spark-hw04-reduce.py | 9 +++++++ ...mapToPair.py => spark-hw05-reduceByKey.py} | 7 +++-- 3 files changed, 32 insertions(+), 11 deletions(-) create mode 100644 examples-python/01SparkRDD/spark-hw04-reduce.py rename examples-python/01SparkRDD/{spark-hw04-mapToPair.py => spark-hw05-reduceByKey.py} (65%) diff --git a/examples-python/01SparkRDD/README.md b/examples-python/01SparkRDD/README.md index 46f1f2d..4166288 100644 --- a/examples-python/01SparkRDD/README.md +++ b/examples-python/01SparkRDD/README.md @@ -1,4 +1,4 @@ -### 測試資料 +### 測試資料內容 ``` a,123,456,789,11344,2142,123 b,1234,124,1234,123,123 @@ -47,13 +47,22 @@ a … ``` -### mapToPair 練習: -* spark-hw04-mapToPair.py -將測試資料轉換成(str,1),結果如下: -``` -(a,1) -(123,1) -(456,1) -(789,1) +### reduce練習: +* spark-hw04-reduce.py +找出測試資料所有英文字母,並用reduce將之append成一個字串,結果如下: +``` +abcdef +``` + +### reduceByKey練習: +* spark-hw05-reduceByKey.py +找出測試資料所有英文字母,並用reduce將之append成一個字串,結果如下: +``` +(d,1) +(1113,1) +(1231,1) +(e,1) +(14,2) +(113,2) … ``` \ No newline at end of file diff --git a/examples-python/01SparkRDD/spark-hw04-reduce.py b/examples-python/01SparkRDD/spark-hw04-reduce.py new file mode 100644 index 0000000..855d43d --- /dev/null +++ b/examples-python/01SparkRDD/spark-hw04-reduce.py @@ -0,0 +1,9 @@ + +from pyspark import SparkContext +sc = SparkContext("local", "Simple App") +textFile = sc.textFile("hdfs:/spark/hw/test.txt") +numBs = textFile.map(lambda s: s.split(','))\ + .map(lambda w:w[0])\ + .reduce(lambda a, b:a + b) +print(numBs) + diff --git a/examples-python/01SparkRDD/spark-hw04-mapToPair.py b/examples-python/01SparkRDD/spark-hw05-reduceByKey.py similarity index 65% rename from examples-python/01SparkRDD/spark-hw04-mapToPair.py rename to examples-python/01SparkRDD/spark-hw05-reduceByKey.py index 058af94..fe2f54f 100644 --- a/examples-python/01SparkRDD/spark-hw04-mapToPair.py +++ b/examples-python/01SparkRDD/spark-hw05-reduceByKey.py @@ -1,8 +1,11 @@ +from operator import add from pyspark import SparkContext sc = SparkContext("local", "Simple App") textFile = sc.textFile("hdfs:/spark/hw/test.txt") numBs = textFile.flatMap(lambda s: s.split(','))\ - .map(lambda w:(w,1))\ - .map(lambda w:print(w)).collect() + .map(lambda b: (b,1))\ + .reduceByKey(add)\ + .collect() +print(numBs)