From 0ae2c5747c1de2ced07f1656cb2e79da971d22ef Mon Sep 17 00:00:00 2001 From: Robin Hes Date: Fri, 20 Oct 2017 17:03:11 +0200 Subject: [PATCH] Removed unused dependencies; added output overwrite mode. --- get_dependencies.sh | 2 +- phone_number_analysis.py | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/get_dependencies.sh b/get_dependencies.sh index a0cc740..6a51260 100755 --- a/get_dependencies.sh +++ b/get_dependencies.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash INSTALL_COMMAND="sudo pip-3.4 install" -dependencies="warcio requests requests_file boto3 botocore py4j" +dependencies="warcio boto3 botocore py4j" for dep in $dependencies; do $INSTALL_COMMAND $dep diff --git a/phone_number_analysis.py b/phone_number_analysis.py index 9e7aad6..9b49781 100755 --- a/phone_number_analysis.py +++ b/phone_number_analysis.py @@ -1,19 +1,15 @@ #!/usr/bin/env Python3 from warcio.archiveiterator import ArchiveIterator -from warcio.recordloader import ArchiveLoadFailed from tempfile import TemporaryFile import argparse -from pyspark import SparkContext, SparkConf +from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import StructType, StructField, StringType, ArrayType -import requests -from requests_file import FileAdapter - import boto3 import botocore @@ -57,6 +53,7 @@ def run(self): sqlc.createDataFrame(phone_numb_agg_web, schema=self.output_schema) \ .write \ + .mode("overwrite") \ .format("parquet") \ .save(self.output_dir) @@ -64,7 +61,7 @@ def run(self): self.log(sc, "Failed parses: {}".format(self.failed_record_parse.value)) def log(self, sc, message, level="warn"): - log = sc._jvm.org.apache.log4j.LogManager.getLogger(self.name) + log = sc._jvm.org.apache.log4j.LogManager.getLogger(self.name) if level == "info": log.info(message) elif level == "warn": @@ -110,7 +107,7 @@ def process_file_warc(self, input_file): print("Error ocurred loading file: {}".format(input_file)) self.failed_segment.add(1) return None - + def process_records(self, stream): try: for rec in ArchiveIterator(stream): @@ -136,7 +133,7 @@ def find_phone_numbers(self, content): re.sub(self.replace_filter, "", num)) for num in numbers} for num in nums_filt: - yield num + yield num if __name__ == "__main__": parser = argparse.ArgumentParser("Phone number analysis using Apache Spark")