A robust, daily ETL pipeline that simulates ingestion of retail sales data, performs cleaning, transformation, and stores the results into a PostgreSQL database. The entire process is orchestrated using Apache Airflow 3.x.
- Python 3.12
- Pandas 2.3
- PostgreSQL 16
- Apache Airflow 3.0.4
- SQLAlchemy 2.x
- psycopg 3
- Parquet (PyArrow)
data_engg_proj/
βββ dags/
β βββ etl_extract.py # Picks latest CSV & archives it
β βββ etl_transform.py # Cleans & aggregates data
β βββ etl_load.py # Loads data to Postgres
β βββ retail_sales_etl_dag.py # Airflow DAG definition
βββ data/
β βββ incoming/ # Simulated raw CSVs
β βββ processed/ # Cleaned & aggregated parquet
β βββ archive/ # Archived raw files post-extract
βββ db/
β βββ init.sql # SQL schema for Postgres
βββ scripts/
β βββ generate_fake_csv.py # Simulates schema-drifting CSVs
βββ .env # PostgreSQL connection vars
βββ requirements.txt
βββ README.md
brew install pyenv postgresql@16
pyenv install 3.12.6
pyenv global 3.12.6
python -m venv venv
source venv/bin/activate
pip install "psycopg[binary]" "SQLAlchemy>=2.0,<3" "pandas>=2.3,<2.4" "pyarrow"
brew services start postgresql@16
createdb retaildb
psql retaildb -f db/init.sql
β
Recommended: Use WSL (Windows Subsystem for Linux) + Ubuntu
β Not recommended: Docker-based Postgres for this project
sudo apt update && sudo apt install -y python3.12 python3.12-venv postgresql libpq-dev
python3.12 -m venv venv
source venv/bin/activate
pip install "psycopg[binary]" "SQLAlchemy>=2.0,<3" "pandas>=2.3,<2.4" "pyarrow"
sudo service postgresql start
createdb retaildb
psql retaildb -f db/init.sql
- Install PostgreSQL 16 for Windows
- Install Python 3.12 from https://www.python.org
- Then run:
python -m venv venv
.\venv\Scripts\activate
pip install "psycopg[binary]" "SQLAlchemy>=2.0,<3" "pandas>=2.3,<2.4" "pyarrow"
- Use pgAdmin or CLI to create a new DB called
retaildb
psql -U postgres -d retaildb -f db/init.sql
Replace
postgreswith your DB username if different.
# Generate sample CSV with schema drift
python scripts/generate_fake_csv.py
# Clean & aggregate to parquet
python dags/etl_transform.py
# Load to Postgres
python dags/etl_load.py
export AIRFLOW_VERSION=3.0.4
PYVER=$(python -c 'import sys;print(f"{sys.version_info.major}.{sys.version_info.minor}")')
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYVER}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "$CONSTRAINT_URL"
export AIRFLOW__CORE__DAGS_FOLDER=$(pwd)/dags
airflow standalone
Save the generated admin password or create your own:
airflow users create -u admin -p admin -r Admin -f Admin -l User -e admin@example.com
- Picks the latest CSV from
data/incoming/ - Archives a copy to
data/archive/
- Handles schema drift (extra/missing cols)
- Converts to typed DataFrame
- Outputs:
cleaned.parquet(row-level)agg.parquet(aggregated)
- Inserts rows into
sales_clean - Upserts aggregates into
sales_daily_agg
ββββββββββββββ ββββββββββββββ ββββββββββββββ
β incoming/ β ββββΆ β transform β ββββΆ β Postgres DBβ
β CSV (raw) β β .parquet β β clean + aggβ
ββββββββββββββ ββββββββββββββ ββββββββββββββ
β
ββββββββΆ archive/
| Column | Type | Notes |
|---|---|---|
| id | BIGSERIAL | Auto-increment primary key |
| sale_date | DATE | Enforced schema column |
| product_id | TEXT | |
| quantity | INT | |
| price | NUMERIC | |
| revenue | NUMERIC | GENERATED ALWAYS AS (quantity*price) |
| Column | Type |
|---|---|
| sale_date | DATE |
| product_id | TEXT |
| total_qty | BIGINT |
| total_revenue | NUMERIC |
- Add Great Expectations for data validation
- Replace CSV with S3/Cloud Storage
- Use Spark/Dask for large-scale data
- Slack/Email alerts on drift/failure
- Push logs to Prometheus/Grafana
https://github.com/Bishaldgr8 For queries: bishalboroorborah@gmail.com