Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions answers/aggregate_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
'''
Computes:
Average max temperature (°C)
Average min temperature (°C)
Total precipitation (cm)
Stores them in a new table weather_summary
'''
# IMPORTS
import sqlite3
import logging
from datetime import datetime

DB_PATH = "../ctva_weather.db" # Path to the SQLite DB
LOG_PATH = "../logs/etl.log" # Log file for operations

# LOGGING
logging.basicConfig(
filename=LOG_PATH,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)

# SUMMARY SETUP
CREATE_SUMMARY_SQL = """
CREATE TABLE IF NOT EXISTS weather_summary (
station_id TEXT,
year INTEGER,
avg_max_temp_c REAL,
avg_min_temp_c REAL,
total_precip_cm REAL,
PRIMARY KEY (station_id, year)
);
"""

INSERT_SUMMARY_SQL = """
INSERT OR REPLACE INTO weather_summary (
station_id, year, avg_max_temp_c, avg_min_temp_c, total_precip_cm
)
SELECT
station_id,
strftime('%Y', date) AS year,
AVG(max_temp_c) / 10.0 AS avg_max,
AVG(min_temp_c) / 10.0 AS avg_min,
SUM(precipitation_mm) / 100.0 AS total_precip
FROM weather
WHERE max_temp_c IS NOT NULL AND min_temp_c IS NOT NULL AND precipitation_mm IS NOT NULL
GROUP BY station_id, year;
"""

def main():
start_time = datetime.now()
logging.info("Starting weather summary aggregation...")

conn = sqlite3.connect(DB_PATH)
with conn:
conn.execute(CREATE_SUMMARY_SQL)
conn.execute(INSERT_SUMMARY_SQL)

conn.close()
end_time = datetime.now()

logging.info("Weather summary aggregation complete.")
logging.info(f"Duration: {end_time - start_time}")

if __name__ == "__main__":
main()
123 changes: 123 additions & 0 deletions answers/etl_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'''
Monitors a watch directory (data/incoming/) for new .txt files
Cleans and ingests them into the database
Avoids duplicate entries
Moves processed files to an archive/ folder
Logs all activity
'''
# IMPORTS
import os
import shutil
import sqlite3
import logging
from datetime import datetime

# CONFIG
WATCH_DIR = "../data/incoming"
ARCHIVE_DIR = "../data/archive"
# decouples input and output for automation
DB_PATH = "../ctva_weather.db"
LOG_PATH = "../logs/etl.log"

# LOGGING
logging.basicConfig(
filename=LOG_PATH,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)

# DATABASE & SCHEMA
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS weather (
station_id TEXT NOT NULL,
date DATE NOT NULL,
max_temp_c INTEGER,
min_temp_c INTEGER,
precipitation_mm INTEGER,
PRIMARY KEY (station_id, date)
);
"""

INSERT_SQL = """
INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm)
VALUES (?, ?, ?, ?, ?);
"""

# FUNCTIONS
def connect_db():
return sqlite3.connect(DB_PATH)

def init_db(conn):
with conn:
conn.execute(CREATE_TABLE_SQL)


def parse_line(line):
parts = line.strip().split("\t")
if len(parts) != 4: # prevents bad data from entering the database
return None
date_str, max_temp, min_temp, precip = parts
try:
return {
"date": datetime.strptime(date_str, "%Y%m%d").date(),
"max_temp": None if max_temp == "-9999" else int(max_temp),
"min_temp": None if min_temp == "-9999" else int(min_temp),
"precip": None if precip == "-9999" else int(precip)
}
except:
return None

def process_file(filepath, conn): #line by line ingestion
station_id = os.path.splitext(os.path.basename(filepath))[0]
count = 0
with open(filepath, "r") as file:
for line in file:
data = parse_line(line)
if data:
conn.execute(INSERT_SQL, (
station_id,
data["date"],
data["max_temp"],
data["min_temp"],
data["precip"]
))
count += 1
return station_id, count

def archive_file(filepath): # prevent reprocessing
if not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)
filename = os.path.basename(filepath)
shutil.move(filepath, os.path.join(ARCHIVE_DIR, filename))

def main():
start_time = datetime.now()
logging.info("ETL process started...")

conn = connect_db()
init_db(conn)

total_files = 0
total_records = 0

for file in os.listdir(WATCH_DIR):
if file.endswith(".txt"):
file_path = os.path.join(WATCH_DIR, file)
try:
station_id, count = process_file(file_path, conn)
logging.info(f"Ingested {count} records from {file} (station: {station_id})")
total_records += count
archive_file(file_path)
total_files += 1
except Exception as e:
logging.error(f"Failed to process {file}: {e}")

conn.commit()
conn.close()

end_time = datetime.now()
logging.info(f"ETL process complete. Files processed: {total_files}, Records ingested: {total_records}")
logging.info(f"Duration: {end_time - start_time}")

if __name__ == "__main__":
main()
101 changes: 101 additions & 0 deletions answers/ingest_weather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
'''
Performs a one-time bulk ingestion of historical weather data
from a specified directory (data/wx_data/) into a SQLite database.
'''
# IMPORTS
import os
import sqlite3
import logging
from datetime import datetime

# CONFIG
DATA_DIR = "../data/wx_data"
DB_PATH = "../ctva_weather.db"
LOG_PATH = "../logs/etl.log"

# LOGGING
logging.basicConfig(
filename=LOG_PATH,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)

# DATABASE SETUP
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS weather (
station_id TEXT NOT NULL,
date DATE NOT NULL,
max_temp_c INTEGER,
min_temp_c INTEGER,
precipitation_mm INTEGER,
PRIMARY KEY (station_id, date)
);
"""

INSERT_SQL = """
INSERT OR IGNORE INTO weather (station_id, date, max_temp_c, min_temp_c, precipitation_mm)
VALUES (?, ?, ?, ?, ?);
"""

# FUNCTIONS
def connect_db():
return sqlite3.connect(DB_PATH)

def init_db(conn):
with conn:
conn.execute(CREATE_TABLE_SQL)

def parse_line(line):
parts = line.strip().split("\t")
if len(parts) != 4:
return None
date_str, max_temp, min_temp, precip = parts
return {
"date": datetime.strptime(date_str, "%Y%m%d").date(),
"max_temp": None if max_temp == "-9999" else int(max_temp),
"min_temp": None if min_temp == "-9999" else int(min_temp),
"precip": None if precip == "-9999" else int(precip)
}

def ingest_file(filepath, station_id, conn):
count = 0
with open(filepath, "r") as file:
for line in file:
data = parse_line(line)
if data:
conn.execute(INSERT_SQL, (
station_id,
data["date"],
data["max_temp"],
data["min_temp"],
data["precip"]
))
count += 1
return count

def main():
start_time = datetime.now()
logging.info("Starting weather data ingestion...")

conn = connect_db()
init_db(conn)

total_records = 0
for filename in os.listdir(DATA_DIR):
if not filename.endswith(".txt"):
continue
station_id = os.path.splitext(filename)[0]
file_path = os.path.join(DATA_DIR, filename)
count = ingest_file(file_path, station_id, conn)
total_records += count
logging.info(f"Ingested {count} records from {filename}")

conn.commit()
conn.close()

end_time = datetime.now()
logging.info(f"Weather data ingestion complete. Total records: {total_records}")
logging.info(f"Duration: {end_time - start_time}")

if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions answers/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# requirements.txt
# This project uses only the Python Standard Library.
# No external packages are needed.
32 changes: 32 additions & 0 deletions answers/scheduler_setup.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Setup in Task Scheduler

1. Open Task Scheduler

2. Create a New Task
click Create Task.

3. General Tab
Name: Run Ingest Weather ETL
Runs the ingest_weather.py script daily.
Check: “Run with highest privileges”

4. Triggers Tab
Click New…
Set: Begin the task: On a schedule
Choose Daily, Weekly, or At log on
Set the start time
Click OK

5. Actions Tab
Click New…
Action: Start a program
Replace C:\Path\To\The\Script\ with the full path to the etl_runner.py script

6. Settings Tab
Check “Allow task to be run on demand”
Check “Run task as soon as possible after a scheduled start is missed”
Check “Stop the task if it runs longer than 1 hour”

8. Save