From 8fc276c787d7722f7a66ff2634687b789d1eb617 Mon Sep 17 00:00:00 2001 From: Rob Walczak Date: Wed, 7 May 2025 09:41:35 -0500 Subject: [PATCH 1/3] Add files via upload --- answers/aggregate_stats.py | 66 ++++++++++++++++++++ answers/etl_runner.py | 123 +++++++++++++++++++++++++++++++++++++ answers/ingest_weather.py | 101 ++++++++++++++++++++++++++++++ 3 files changed, 290 insertions(+) create mode 100644 answers/aggregate_stats.py create mode 100644 answers/etl_runner.py create mode 100644 answers/ingest_weather.py diff --git a/answers/aggregate_stats.py b/answers/aggregate_stats.py new file mode 100644 index 00000000..3626a30e --- /dev/null +++ b/answers/aggregate_stats.py @@ -0,0 +1,66 @@ +''' +Computes: +Average max temperature (°C) +Average min temperature (°C) +Total precipitation (cm) +Stores them in a new table weather_summary +''' +# IMPORTS +import sqlite3 +import logging +from datetime import datetime + +DB_PATH = "../ctva_weather.db" # Path to the SQLite DB +LOG_PATH = "../logs/etl.log" # Log file for operations + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# SUMMARY SETUP +CREATE_SUMMARY_SQL = """ +CREATE TABLE IF NOT EXISTS weather_summary ( + station_id TEXT, + year INTEGER, + avg_max_temp_c REAL, + avg_min_temp_c REAL, + total_precip_cm REAL, + PRIMARY KEY (station_id, year) +); +""" + +INSERT_SUMMARY_SQL = """ +INSERT OR REPLACE INTO weather_summary ( + station_id, year, avg_max_temp_c, avg_min_temp_c, total_precip_cm +) +SELECT + station_id, + strftime('%Y', date) AS year, + AVG(max_temp_c) / 10.0 AS avg_max, + AVG(min_temp_c) / 10.0 AS avg_min, + SUM(precipitation_mm) / 100.0 AS total_precip +FROM weather +WHERE max_temp_c IS NOT NULL AND min_temp_c IS NOT NULL AND precipitation_mm IS NOT NULL +GROUP BY station_id, year; +""" + +def main(): + start_time = datetime.now() + logging.info("Starting weather summary aggregation...") + + conn = sqlite3.connect(DB_PATH) + with conn: + conn.execute(CREATE_SUMMARY_SQL) + conn.execute(INSERT_SUMMARY_SQL) + + conn.close() + end_time = datetime.now() + + logging.info("Weather summary aggregation complete.") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/answers/etl_runner.py b/answers/etl_runner.py new file mode 100644 index 00000000..3b7d7f9e --- /dev/null +++ b/answers/etl_runner.py @@ -0,0 +1,123 @@ +''' +Monitors a watch directory (data/incoming/) for new .txt files +Cleans and ingests them into the database +Avoids duplicate entries +Moves processed files to an archive/ folder +Logs all activity +''' +# IMPORTS +import os +import shutil +import sqlite3 +import logging +from datetime import datetime + +# CONFIG +WATCH_DIR = "../data/incoming" +ARCHIVE_DIR = "../data/archive" +# decouples input and output for automation +DB_PATH = "../ctva_weather.db" +LOG_PATH = "../logs/etl.log" + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# DATABASE & SCHEMA +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS weather ( + station_id TEXT NOT NULL, + date DATE NOT NULL, + max_temp_c INTEGER, + min_temp_c INTEGER, + precipitation_mm INTEGER, + PRIMARY KEY (station_id, date) +); +""" + +INSERT_SQL = """ +INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm) +VALUES (?, ?, ?, ?, ?); +""" + +# FUNCTIONS +def connect_db(): + return sqlite3.connect(DB_PATH) + +def init_db(conn): + with conn: + conn.execute(CREATE_TABLE_SQL) + + +def parse_line(line): + parts = line.strip().split("\t") + if len(parts) != 4: # prevents bad data from entering the database + return None + date_str, max_temp, min_temp, precip = parts + try: + return { + "date": datetime.strptime(date_str, "%Y%m%d").date(), + "max_temp": None if max_temp == "-9999" else int(max_temp), + "min_temp": None if min_temp == "-9999" else int(min_temp), + "precip": None if precip == "-9999" else int(precip) + } + except: + return None + +def process_file(filepath, conn): #line by line ingestion + station_id = os.path.splitext(os.path.basename(filepath))[0] + count = 0 + with open(filepath, "r") as file: + for line in file: + data = parse_line(line) + if data: + conn.execute(INSERT_SQL, ( + station_id, + data["date"], + data["max_temp"], + data["min_temp"], + data["precip"] + )) + count += 1 + return station_id, count + +def archive_file(filepath): # prevent reprocessing + if not os.path.exists(ARCHIVE_DIR): + os.makedirs(ARCHIVE_DIR) + filename = os.path.basename(filepath) + shutil.move(filepath, os.path.join(ARCHIVE_DIR, filename)) + +def main(): + start_time = datetime.now() + logging.info("ETL process started...") + + conn = connect_db() + init_db(conn) + + total_files = 0 + total_records = 0 + + for file in os.listdir(WATCH_DIR): + if file.endswith(".txt"): + file_path = os.path.join(WATCH_DIR, file) + try: + station_id, count = process_file(file_path, conn) + logging.info(f"Ingested {count} records from {file} (station: {station_id})") + total_records += count + archive_file(file_path) + total_files += 1 + except Exception as e: + logging.error(f"Failed to process {file}: {e}") + + conn.commit() + conn.close() + + end_time = datetime.now() + logging.info(f"ETL process complete. Files processed: {total_files}, Records ingested: {total_records}") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() diff --git a/answers/ingest_weather.py b/answers/ingest_weather.py new file mode 100644 index 00000000..2263e8ed --- /dev/null +++ b/answers/ingest_weather.py @@ -0,0 +1,101 @@ +''' +Performs a one-time bulk ingestion of historical weather data +from a specified directory (data/wx_data/) into a SQLite database. +''' +# IMPORTS +import os +import sqlite3 +import logging +from datetime import datetime + +# CONFIG +DATA_DIR = "../data/wx_data" +DB_PATH = "../ctva_weather.db" +LOG_PATH = "../logs/etl.log" + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# DATABASE SETUP +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS weather ( + station_id TEXT NOT NULL, + date DATE NOT NULL, + max_temp_c INTEGER, + min_temp_c INTEGER, + precipitation_mm INTEGER, + PRIMARY KEY (station_id, date) +); +""" + +INSERT_SQL = """ +INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm) +VALUES (?, ?, ?, ?, ?); +""" + +# FUNCTIONS +def connect_db(): + return sqlite3.connect(DB_PATH) + +def init_db(conn): + with conn: + conn.execute(CREATE_TABLE_SQL) + +def parse_line(line): + parts = line.strip().split("\t") + if len(parts) != 4: + return None + date_str, max_temp, min_temp, precip = parts + return { + "date": datetime.strptime(date_str, "%Y%m%d").date(), + "max_temp": None if max_temp == "-9999" else int(max_temp), + "min_temp": None if min_temp == "-9999" else int(min_temp), + "precip": None if precip == "-9999" else int(precip) + } + +def ingest_file(filepath, station_id, conn): + count = 0 + with open(filepath, "r") as file: + for line in file: + data = parse_line(line) + if data: + conn.execute(INSERT_SQL, ( + station_id, + data["date"], + data["max_temp"], + data["min_temp"], + data["precip"] + )) + count += 1 + return count + +def main(): + start_time = datetime.now() + logging.info("Starting weather data ingestion...") + + conn = connect_db() + init_db(conn) + + total_records = 0 + for filename in os.listdir(DATA_DIR): + if not filename.endswith(".txt"): + continue + station_id = os.path.splitext(filename)[0] + file_path = os.path.join(DATA_DIR, filename) + count = ingest_file(file_path, station_id, conn) + total_records += count + logging.info(f"Ingested {count} records from {filename}") + + conn.commit() + conn.close() + + end_time = datetime.now() + logging.info(f"Weather data ingestion complete. Total records: {total_records}") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() From 05356490895103f2b01c17ec0c4936fcf9a955d4 Mon Sep 17 00:00:00 2001 From: Rob Walczak Date: Wed, 7 May 2025 09:50:19 -0500 Subject: [PATCH 2/3] Add files via upload --- answers/requirements.txt | Bin 0 -> 384 bytes answers/scheduler_setup.txt | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 answers/requirements.txt create mode 100644 answers/scheduler_setup.txt diff --git a/answers/requirements.txt b/answers/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..60b065f074caae30bf5b23616c0e9f0d508921ab GIT binary patch literal 384 zcmZvY%L>9k3`Or+@KcmN6mj9gy&LfZq&}zxt;MOJ{dn~x4yYhQvdDepW}c5y7wvhq zj`UVDUap}sDpg-SkgHB=ba%|=R%6E+72rF1_(AiC22@*}QJ1I&+NgqGu=C?K^-W1s z+{ti2QNuRhmQ+`IMLx@ByG@Z$4ZHyMt?eQ!fjNegxo<#!b0lS!YT^!d-AYR>G{=3* r-aVIolG% Date: Wed, 7 May 2025 09:57:51 -0500 Subject: [PATCH 3/3] Update requirements.txt --- answers/requirements.txt | Bin 384 -> 111 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/answers/requirements.txt b/answers/requirements.txt index 60b065f074caae30bf5b23616c0e9f0d508921ab..afc48ff4ec5cd49112329a5b85b07af78eaf9bd1 100644 GIT binary patch literal 111 zcmWm5K?;K~5CzbEp?CO@y(f`U&$3*Q!CnK2K*Y4j7Fdk<%eyLtNzx??K7*np; xtt^}zXJtjvt-%xVZk~!_aM36h(dUneweH_u1-zW5vSF8LmHD1xgp!f*Hb07>CQ|?a literal 384 zcmZvY%L>9k3`Or+@KcmN6mj9gy&LfZq&}zxt;MOJ{dn~x4yYhQvdDepW}c5y7wvhq zj`UVDUap}sDpg-SkgHB=ba%|=R%6E+72rF1_(AiC22@*}QJ1I&+NgqGu=C?K^-W1s z+{ti2QNuRhmQ+`IMLx@ByG@Z$4ZHyMt?eQ!fjNegxo<#!b0lS!YT^!d-AYR>G{=3* r-aVIolG%