diff --git a/answers/aggregate_stats.py b/answers/aggregate_stats.py new file mode 100644 index 00000000..3626a30e --- /dev/null +++ b/answers/aggregate_stats.py @@ -0,0 +1,66 @@ +''' +Computes: +Average max temperature (°C) +Average min temperature (°C) +Total precipitation (cm) +Stores them in a new table weather_summary +''' +# IMPORTS +import sqlite3 +import logging +from datetime import datetime + +DB_PATH = "../ctva_weather.db" # Path to the SQLite DB +LOG_PATH = "../logs/etl.log" # Log file for operations + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# SUMMARY SETUP +CREATE_SUMMARY_SQL = """ +CREATE TABLE IF NOT EXISTS weather_summary ( + station_id TEXT, + year INTEGER, + avg_max_temp_c REAL, + avg_min_temp_c REAL, + total_precip_cm REAL, + PRIMARY KEY (station_id, year) +); +""" + +INSERT_SUMMARY_SQL = """ +INSERT OR REPLACE INTO weather_summary ( + station_id, year, avg_max_temp_c, avg_min_temp_c, total_precip_cm +) +SELECT + station_id, + strftime('%Y', date) AS year, + AVG(max_temp_c) / 10.0 AS avg_max, + AVG(min_temp_c) / 10.0 AS avg_min, + SUM(precipitation_mm) / 100.0 AS total_precip +FROM weather +WHERE max_temp_c IS NOT NULL AND min_temp_c IS NOT NULL AND precipitation_mm IS NOT NULL +GROUP BY station_id, year; +""" + +def main(): + start_time = datetime.now() + logging.info("Starting weather summary aggregation...") + + conn = sqlite3.connect(DB_PATH) + with conn: + conn.execute(CREATE_SUMMARY_SQL) + conn.execute(INSERT_SUMMARY_SQL) + + conn.close() + end_time = datetime.now() + + logging.info("Weather summary aggregation complete.") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/answers/etl_runner.py b/answers/etl_runner.py new file mode 100644 index 00000000..3b7d7f9e --- /dev/null +++ b/answers/etl_runner.py @@ -0,0 +1,123 @@ +''' +Monitors a watch directory (data/incoming/) for new .txt files +Cleans and ingests them into the database +Avoids duplicate entries +Moves processed files to an archive/ folder +Logs all activity +''' +# IMPORTS +import os +import shutil +import sqlite3 +import logging +from datetime import datetime + +# CONFIG +WATCH_DIR = "../data/incoming" +ARCHIVE_DIR = "../data/archive" +# decouples input and output for automation +DB_PATH = "../ctva_weather.db" +LOG_PATH = "../logs/etl.log" + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# DATABASE & SCHEMA +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS weather ( + station_id TEXT NOT NULL, + date DATE NOT NULL, + max_temp_c INTEGER, + min_temp_c INTEGER, + precipitation_mm INTEGER, + PRIMARY KEY (station_id, date) +); +""" + +INSERT_SQL = """ +INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm) +VALUES (?, ?, ?, ?, ?); +""" + +# FUNCTIONS +def connect_db(): + return sqlite3.connect(DB_PATH) + +def init_db(conn): + with conn: + conn.execute(CREATE_TABLE_SQL) + + +def parse_line(line): + parts = line.strip().split("\t") + if len(parts) != 4: # prevents bad data from entering the database + return None + date_str, max_temp, min_temp, precip = parts + try: + return { + "date": datetime.strptime(date_str, "%Y%m%d").date(), + "max_temp": None if max_temp == "-9999" else int(max_temp), + "min_temp": None if min_temp == "-9999" else int(min_temp), + "precip": None if precip == "-9999" else int(precip) + } + except: + return None + +def process_file(filepath, conn): #line by line ingestion + station_id = os.path.splitext(os.path.basename(filepath))[0] + count = 0 + with open(filepath, "r") as file: + for line in file: + data = parse_line(line) + if data: + conn.execute(INSERT_SQL, ( + station_id, + data["date"], + data["max_temp"], + data["min_temp"], + data["precip"] + )) + count += 1 + return station_id, count + +def archive_file(filepath): # prevent reprocessing + if not os.path.exists(ARCHIVE_DIR): + os.makedirs(ARCHIVE_DIR) + filename = os.path.basename(filepath) + shutil.move(filepath, os.path.join(ARCHIVE_DIR, filename)) + +def main(): + start_time = datetime.now() + logging.info("ETL process started...") + + conn = connect_db() + init_db(conn) + + total_files = 0 + total_records = 0 + + for file in os.listdir(WATCH_DIR): + if file.endswith(".txt"): + file_path = os.path.join(WATCH_DIR, file) + try: + station_id, count = process_file(file_path, conn) + logging.info(f"Ingested {count} records from {file} (station: {station_id})") + total_records += count + archive_file(file_path) + total_files += 1 + except Exception as e: + logging.error(f"Failed to process {file}: {e}") + + conn.commit() + conn.close() + + end_time = datetime.now() + logging.info(f"ETL process complete. Files processed: {total_files}, Records ingested: {total_records}") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() diff --git a/answers/ingest_weather.py b/answers/ingest_weather.py new file mode 100644 index 00000000..2263e8ed --- /dev/null +++ b/answers/ingest_weather.py @@ -0,0 +1,101 @@ +''' +Performs a one-time bulk ingestion of historical weather data +from a specified directory (data/wx_data/) into a SQLite database. +''' +# IMPORTS +import os +import sqlite3 +import logging +from datetime import datetime + +# CONFIG +DATA_DIR = "../data/wx_data" +DB_PATH = "../ctva_weather.db" +LOG_PATH = "../logs/etl.log" + +# LOGGING +logging.basicConfig( + filename=LOG_PATH, + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) + +# DATABASE SETUP +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS weather ( + station_id TEXT NOT NULL, + date DATE NOT NULL, + max_temp_c INTEGER, + min_temp_c INTEGER, + precipitation_mm INTEGER, + PRIMARY KEY (station_id, date) +); +""" + +INSERT_SQL = """ +INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm) +VALUES (?, ?, ?, ?, ?); +""" + +# FUNCTIONS +def connect_db(): + return sqlite3.connect(DB_PATH) + +def init_db(conn): + with conn: + conn.execute(CREATE_TABLE_SQL) + +def parse_line(line): + parts = line.strip().split("\t") + if len(parts) != 4: + return None + date_str, max_temp, min_temp, precip = parts + return { + "date": datetime.strptime(date_str, "%Y%m%d").date(), + "max_temp": None if max_temp == "-9999" else int(max_temp), + "min_temp": None if min_temp == "-9999" else int(min_temp), + "precip": None if precip == "-9999" else int(precip) + } + +def ingest_file(filepath, station_id, conn): + count = 0 + with open(filepath, "r") as file: + for line in file: + data = parse_line(line) + if data: + conn.execute(INSERT_SQL, ( + station_id, + data["date"], + data["max_temp"], + data["min_temp"], + data["precip"] + )) + count += 1 + return count + +def main(): + start_time = datetime.now() + logging.info("Starting weather data ingestion...") + + conn = connect_db() + init_db(conn) + + total_records = 0 + for filename in os.listdir(DATA_DIR): + if not filename.endswith(".txt"): + continue + station_id = os.path.splitext(filename)[0] + file_path = os.path.join(DATA_DIR, filename) + count = ingest_file(file_path, station_id, conn) + total_records += count + logging.info(f"Ingested {count} records from {filename}") + + conn.commit() + conn.close() + + end_time = datetime.now() + logging.info(f"Weather data ingestion complete. Total records: {total_records}") + logging.info(f"Duration: {end_time - start_time}") + +if __name__ == "__main__": + main() diff --git a/answers/requirements.txt b/answers/requirements.txt new file mode 100644 index 00000000..afc48ff4 --- /dev/null +++ b/answers/requirements.txt @@ -0,0 +1,3 @@ +# requirements.txt +# This project uses only the Python Standard Library. +# No external packages are needed. diff --git a/answers/scheduler_setup.txt b/answers/scheduler_setup.txt new file mode 100644 index 00000000..84d2e223 --- /dev/null +++ b/answers/scheduler_setup.txt @@ -0,0 +1,32 @@ +Setup in Task Scheduler + +1. Open Task Scheduler + +2. Create a New Task +click Create Task. + +3. General Tab +Name: Run Ingest Weather ETL +Runs the ingest_weather.py script daily. +Check: “Run with highest privileges” + +4. Triggers Tab +Click New… +Set: Begin the task: On a schedule +Choose Daily, Weekly, or At log on +Set the start time +Click OK + +5. Actions Tab +Click New… +Action: Start a program +Replace C:\Path\To\The\Script\ with the full path to the etl_runner.py script + +6. Settings Tab +Check “Allow task to be run on demand” +Check “Run task as soon as possible after a scheduled start is missed” +Check “Stop the task if it runs longer than 1 hour” + +8. Save + +