A production-ready PySpark ETL pipeline that extracts incremental data from MySQL (customers, policies, claims, payments), enriches & transforms the data, computes aggregated insurance metrics, writes results back to MySQL, and maintains progress using an offsets table.
This README provides complete setup, usage, architecture, and troubleshooting guidance.
- Project Overview
- Folder Structure
- Prerequisites
- Configuration (
conf/config.yml) - Database Setup
- Incremental Load & Offset Logic
- Run Locally
- Run in Production
- Logging & Monitoring
The ETL pipeline: - Reads incremental data using offsets stored in MySQL. - Enriches and joins customers, policies, claims, payments. - Computes aggregated metrics. - Writes metrics to MySQL. - Updates offsets to maintain incremental processing.
insuranceETL/
├── conf/
│ └── config.yml
├── stages/
│ ├── ingest.py
│ ├── transform.py
│ └── writeback.py
├── utils/
│ ├── logger.py
│ ├── spark_session.py
│ ├── db_utils.py
│ └── offsets.py
├── main.py
├── scripts/
│ └── spark_submit.sh
├── sql/
│ └── db_scripts.sql
├── requirements.txt
└── README.md
- Java 8/11\
- Python 3.8+\
- Apache Spark 3.x\
- MySQL server\
- Network access to MySQL port 3306
Install dependencies:
pip install -r requirements.txt
Update the mysql conenction info
Tables for offsets & metrics:
CREATE TABLE IF NOT EXISTS etl_offsets (...);
CREATE TABLE IF NOT EXISTS insurance_metrics (...);- Read last offset.
- Load only new rows.
- Process, aggregate, write results.
- Update offsets (idempotent workflow).
python main.py
spark-submit --master local[*] --packages mysql:mysql-connector-java:8.0.33 main.py
Integrated logging + Spark UI for monitoring.
- create a zip file with a name called "Insure-pkg.zip" for all python files dependencies(stages,utils) and copy into deploy folder
- Download mysql jdbc jar and copy the jar file into the deploy folder
- In the command line, run below command to create wheel file for the library mysql-connector-python library pip download mysql-connector-python --platform any --only-binary=:all: --no-deps copy the whl file into the deploy folder
- Copy the config.yml file into the deploy folder
- copy the main.py file into the deploy folder
spark-submit --master local[*] --py-files Insure-pkg.zip,mysql_connector_python-9.5.0-py2.py3-none-any.whl --jars mysql-connector-j-8.0.33.jar --files conf/config.yml main.py