Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
11 changes: 10 additions & 1 deletion data_transformations/citibike/distance_transformer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as fn
from pyspark.sql import types as tp

METERS_PER_FOOT = 0.3048
FEET_PER_MILE = 5280
EARTH_RADIUS_IN_METERS = 6371e3
EARTH_RADIUS_IN_MILES = 3963.0
DISTANCE_SCALE = 2
METERS_PER_MILE = METERS_PER_FOOT * FEET_PER_MILE


def compute_distance(_spark: SparkSession, dataframe: DataFrame) -> DataFrame:
return dataframe
scale_factor = fn.pow(fn.lit(10), DISTANCE_SCALE).cast(tp.DoubleType())
return dataframe.withColumn("distance", fn.floor(fn.acos(
fn.sin(fn.radians("start_station_latitude")) * fn.sin(fn.radians("end_station_latitude")) +
fn.cos(fn.radians("start_station_latitude")) * fn.cos(fn.radians("end_station_latitude")) *
fn.cos(fn.radians("start_station_longitude") - fn.radians("end_station_longitude"))
) * fn.lit(EARTH_RADIUS_IN_MILES) * scale_factor) / scale_factor)


def run(spark: SparkSession, input_dataset_path: str, transformed_dataset_path: str) -> None:
Expand Down
13 changes: 12 additions & 1 deletion data_transformations/wordcount/word_count_transformer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import logging

from pyspark.sql import SparkSession
from pyspark.sql import functions as f


def run(spark: SparkSession, input_path: str, output_path: str) -> None:
logging.info("Reading text file from: %s", input_path)
input_df = spark.read.text(input_path)

is_not_none = lambda t: ((t != "") & (t != "'"))
words_df = (input_df.select(f.split(input_df.value, r"[^'a-zA-Z]").alias("tokens"))
.withColumn("words", f.filter("tokens", is_not_none))
.select(f.explode("words").alias("word"))
.filter(~ f.col("word").like("'%"))
.select(f.lower(f.col("word")).alias("word"))
.groupBy("word")
.agg(f.count("*").alias("count"))
.orderBy("word"))

logging.info("Writing csv to directory: %s", output_path)

input_df.coalesce(1).write.csv(output_path, header=True)
words_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")
6 changes: 3 additions & 3 deletions jobs/citibike_distance_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
arguments = sys.argv

if len(arguments) is not 3:
if len(arguments) != 3:
logging.warning("Dataset file path and output path not specified!")
sys.exit(1)

dataset_path = arguments[2]
output_path = arguments[3]
dataset_path = arguments[1]
output_path = arguments[2]

spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
logging.info("Application Initialized: " + spark.sparkContext.appName)
Expand Down
2 changes: 1 addition & 1 deletion jobs/citibike_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
logging.info(sys.argv)

if len(sys.argv) is not 3:
if len(sys.argv) != 3:
logging.warning("Input source and output path are required")
sys.exit(1)

Expand Down
2 changes: 1 addition & 1 deletion jobs/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
logging.info(sys.argv)

if len(sys.argv) is not 3:
if len(sys.argv) != 3:
logging.warning("Input .txt file and output path are required")
sys.exit(1)

Expand Down
17 changes: 14 additions & 3 deletions tests/integration/test_distance_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import pytest
from pyspark.sql.types import StructField, DoubleType
from pyspark.sql import DataFrame
from pyspark.sql import functions as fn

from data_transformations.citibike import distance_transformer
from tests.integration import SPARK
Expand Down Expand Up @@ -92,11 +94,10 @@ def test_should_maintain_all_data_it_reads() -> None:
expected_columns = set(given_dataframe.columns)
expected_schema = set(given_dataframe.schema)

assert expected_columns == actual_columns
assert expected_columns.issubset(actual_columns)
assert expected_schema.issubset(actual_schema)


@pytest.mark.skip
def test_should_add_distance_column_with_calculated_distance() -> None:
given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders()
distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder)
Expand All @@ -114,7 +115,7 @@ def test_should_add_distance_column_with_calculated_distance() -> None:
actual_distance_schema = actual_dataframe.schema['distance']

assert expected_distance_schema == actual_distance_schema
assert expected_dataframe.collect() == actual_dataframe.collect()
assert __check_dataframes_equality(expected_dataframe, actual_dataframe)


def __create_ingest_and_transform_folders() -> Tuple[str, str]:
Expand All @@ -124,3 +125,13 @@ def __create_ingest_and_transform_folders() -> Tuple[str, str]:
ingest_dataframe = SPARK.createDataFrame(SAMPLE_DATA, BASE_COLUMNS)
ingest_dataframe.write.parquet(ingest_folder, mode='overwrite')
return ingest_folder, transform_folder


def __check_dataframes_equality(df1: DataFrame, df2: DataFrame) -> bool:
df1_cols = sorted(df1.columns)
df2_cols = sorted(df2.columns)
df1_groupedby_cols = df1.groupby(df1_cols).agg(fn.count(df1_cols[1]))
df2_groupedby_cols = df2.groupby(df2_cols).agg(fn.count(df2_cols[1]))
if df1_groupedby_cols.subtract(df2_groupedby_cols).rdd.isEmpty():
return df2_groupedby_cols.subtract(df1_groupedby_cols).rdd.isEmpty()
return False
2 changes: 1 addition & 1 deletion tests/integration/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __create_ingest_and_transform_folders() -> Tuple[str, str]:


def __write_csv_file(file_path: str, content: List[List[str]]) -> None:
with open(file_path, 'w') as csv_file:
with open(file_path, 'w', encoding="utf8") as csv_file:
input_csv_writer = csv.writer(csv_file)
input_csv_writer.writerows(content)
csv_file.close()
3 changes: 1 addition & 2 deletions tests/integration/test_word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]:
base_path = tempfile.mkdtemp()

input_text_path = "%s%sinput.txt" % (base_path, os.path.sep)
with open(input_text_path, 'w') as input_file:
with open(input_text_path, 'w', encoding="utf8") as input_file:
input_file.writelines(input_file_lines)

output_path = "%s%soutput" % (base_path, os.path.sep)
return input_text_path, output_path


@pytest.mark.skip
def test_should_tokenize_words_and_count_them() -> None:
lines = [
"In my younger and more vulnerable years my father gave me some advice that I've been "
Expand Down