diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..97c126b Binary files /dev/null and b/.DS_Store differ diff --git a/data_transformations/citibike/distance_transformer.py b/data_transformations/citibike/distance_transformer.py index fa3fb96..c0480f9 100644 --- a/data_transformations/citibike/distance_transformer.py +++ b/data_transformations/citibike/distance_transformer.py @@ -1,13 +1,22 @@ from pyspark.sql import SparkSession, DataFrame +from pyspark.sql import functions as fn +from pyspark.sql import types as tp METERS_PER_FOOT = 0.3048 FEET_PER_MILE = 5280 EARTH_RADIUS_IN_METERS = 6371e3 +EARTH_RADIUS_IN_MILES = 3963.0 +DISTANCE_SCALE = 2 METERS_PER_MILE = METERS_PER_FOOT * FEET_PER_MILE def compute_distance(_spark: SparkSession, dataframe: DataFrame) -> DataFrame: - return dataframe + scale_factor = fn.pow(fn.lit(10), DISTANCE_SCALE).cast(tp.DoubleType()) + return dataframe.withColumn("distance", fn.floor(fn.acos( + fn.sin(fn.radians("start_station_latitude")) * fn.sin(fn.radians("end_station_latitude")) + + fn.cos(fn.radians("start_station_latitude")) * fn.cos(fn.radians("end_station_latitude")) * + fn.cos(fn.radians("start_station_longitude") - fn.radians("end_station_longitude")) + ) * fn.lit(EARTH_RADIUS_IN_MILES) * scale_factor) / scale_factor) def run(spark: SparkSession, input_dataset_path: str, transformed_dataset_path: str) -> None: diff --git a/data_transformations/wordcount/word_count_transformer.py b/data_transformations/wordcount/word_count_transformer.py index ff270fd..2654a09 100644 --- a/data_transformations/wordcount/word_count_transformer.py +++ b/data_transformations/wordcount/word_count_transformer.py @@ -1,12 +1,23 @@ import logging from pyspark.sql import SparkSession +from pyspark.sql import functions as f def run(spark: SparkSession, input_path: str, output_path: str) -> None: logging.info("Reading text file from: %s", input_path) input_df = spark.read.text(input_path) + is_not_none = lambda t: ((t != "") & (t != "'")) + words_df = (input_df.select(f.split(input_df.value, r"[^'a-zA-Z]").alias("tokens")) + .withColumn("words", f.filter("tokens", is_not_none)) + .select(f.explode("words").alias("word")) + .filter(~ f.col("word").like("'%")) + .select(f.lower(f.col("word")).alias("word")) + .groupBy("word") + .agg(f.count("*").alias("count")) + .orderBy("word")) + logging.info("Writing csv to directory: %s", output_path) - input_df.coalesce(1).write.csv(output_path, header=True) + words_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite") diff --git a/jobs/citibike_distance_calculation.py b/jobs/citibike_distance_calculation.py index 2e04803..ddfc028 100644 --- a/jobs/citibike_distance_calculation.py +++ b/jobs/citibike_distance_calculation.py @@ -12,12 +12,12 @@ logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO) arguments = sys.argv - if len(arguments) is not 3: + if len(arguments) != 3: logging.warning("Dataset file path and output path not specified!") sys.exit(1) - dataset_path = arguments[2] - output_path = arguments[3] + dataset_path = arguments[1] + output_path = arguments[2] spark = SparkSession.builder.appName(APP_NAME).getOrCreate() logging.info("Application Initialized: " + spark.sparkContext.appName) diff --git a/jobs/citibike_ingest.py b/jobs/citibike_ingest.py index 6d8ca9d..101a355 100644 --- a/jobs/citibike_ingest.py +++ b/jobs/citibike_ingest.py @@ -12,7 +12,7 @@ logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO) logging.info(sys.argv) - if len(sys.argv) is not 3: + if len(sys.argv) != 3: logging.warning("Input source and output path are required") sys.exit(1) diff --git a/jobs/word_count.py b/jobs/word_count.py index a148c67..32acc05 100644 --- a/jobs/word_count.py +++ b/jobs/word_count.py @@ -12,7 +12,7 @@ logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO) logging.info(sys.argv) - if len(sys.argv) is not 3: + if len(sys.argv) != 3: logging.warning("Input .txt file and output path are required") sys.exit(1) diff --git a/tests/integration/test_distance_transformer.py b/tests/integration/test_distance_transformer.py index 2105fa5..d368562 100644 --- a/tests/integration/test_distance_transformer.py +++ b/tests/integration/test_distance_transformer.py @@ -4,6 +4,8 @@ import pytest from pyspark.sql.types import StructField, DoubleType +from pyspark.sql import DataFrame +from pyspark.sql import functions as fn from data_transformations.citibike import distance_transformer from tests.integration import SPARK @@ -92,11 +94,10 @@ def test_should_maintain_all_data_it_reads() -> None: expected_columns = set(given_dataframe.columns) expected_schema = set(given_dataframe.schema) - assert expected_columns == actual_columns + assert expected_columns.issubset(actual_columns) assert expected_schema.issubset(actual_schema) -@pytest.mark.skip def test_should_add_distance_column_with_calculated_distance() -> None: given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder) @@ -114,7 +115,7 @@ def test_should_add_distance_column_with_calculated_distance() -> None: actual_distance_schema = actual_dataframe.schema['distance'] assert expected_distance_schema == actual_distance_schema - assert expected_dataframe.collect() == actual_dataframe.collect() + assert __check_dataframes_equality(expected_dataframe, actual_dataframe) def __create_ingest_and_transform_folders() -> Tuple[str, str]: @@ -124,3 +125,13 @@ def __create_ingest_and_transform_folders() -> Tuple[str, str]: ingest_dataframe = SPARK.createDataFrame(SAMPLE_DATA, BASE_COLUMNS) ingest_dataframe.write.parquet(ingest_folder, mode='overwrite') return ingest_folder, transform_folder + + +def __check_dataframes_equality(df1: DataFrame, df2: DataFrame) -> bool: + df1_cols = sorted(df1.columns) + df2_cols = sorted(df2.columns) + df1_groupedby_cols = df1.groupby(df1_cols).agg(fn.count(df1_cols[1])) + df2_groupedby_cols = df2.groupby(df2_cols).agg(fn.count(df2_cols[1])) + if df1_groupedby_cols.subtract(df2_groupedby_cols).rdd.isEmpty(): + return df2_groupedby_cols.subtract(df1_groupedby_cols).rdd.isEmpty() + return False diff --git a/tests/integration/test_ingest.py b/tests/integration/test_ingest.py index 03df25d..5e5e424 100644 --- a/tests/integration/test_ingest.py +++ b/tests/integration/test_ingest.py @@ -38,7 +38,7 @@ def __create_ingest_and_transform_folders() -> Tuple[str, str]: def __write_csv_file(file_path: str, content: List[List[str]]) -> None: - with open(file_path, 'w') as csv_file: + with open(file_path, 'w', encoding="utf8") as csv_file: input_csv_writer = csv.writer(csv_file) input_csv_writer.writerows(content) csv_file.close() diff --git a/tests/integration/test_word_count.py b/tests/integration/test_word_count.py index 49c04c1..05758f0 100644 --- a/tests/integration/test_word_count.py +++ b/tests/integration/test_word_count.py @@ -12,14 +12,13 @@ def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]: base_path = tempfile.mkdtemp() input_text_path = "%s%sinput.txt" % (base_path, os.path.sep) - with open(input_text_path, 'w') as input_file: + with open(input_text_path, 'w', encoding="utf8") as input_file: input_file.writelines(input_file_lines) output_path = "%s%soutput" % (base_path, os.path.sep) return input_text_path, output_path -@pytest.mark.skip def test_should_tokenize_words_and_count_them() -> None: lines = [ "In my younger and more vulnerable years my father gave me some advice that I've been "