Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion get_dependencies.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

INSTALL_COMMAND="sudo pip-3.4 install"
dependencies="warcio requests requests_file boto3 botocore py4j"
dependencies="warcio boto3 botocore py4j"

for dep in $dependencies; do
$INSTALL_COMMAND $dep
Expand Down
13 changes: 5 additions & 8 deletions phone_number_analysis.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
#!/usr/bin/env Python3

from warcio.archiveiterator import ArchiveIterator
from warcio.recordloader import ArchiveLoadFailed

from tempfile import TemporaryFile

import argparse

from pyspark import SparkContext, SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

import requests
from requests_file import FileAdapter

import boto3
import botocore

Expand Down Expand Up @@ -57,14 +53,15 @@ def run(self):

sqlc.createDataFrame(phone_numb_agg_web, schema=self.output_schema) \
.write \
.mode("overwrite") \
.format("parquet") \
.save(self.output_dir)

self.log(sc, "Failed segments: {}".format(self.failed_segment.value))
self.log(sc, "Failed parses: {}".format(self.failed_record_parse.value))

def log(self, sc, message, level="warn"):
log = sc._jvm.org.apache.log4j.LogManager.getLogger(self.name)
log = sc._jvm.org.apache.log4j.LogManager.getLogger(self.name)
if level == "info":
log.info(message)
elif level == "warn":
Expand Down Expand Up @@ -110,7 +107,7 @@ def process_file_warc(self, input_file):
print("Error ocurred loading file: {}".format(input_file))
self.failed_segment.add(1)
return None

def process_records(self, stream):
try:
for rec in ArchiveIterator(stream):
Expand All @@ -136,7 +133,7 @@ def find_phone_numbers(self, content):
re.sub(self.replace_filter, "", num))
for num in numbers}
for num in nums_filt:
yield num
yield num

if __name__ == "__main__":
parser = argparse.ArgumentParser("Phone number analysis using Apache Spark")
Expand Down