-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingest_data.py
More file actions
83 lines (66 loc) · 2.17 KB
/
ingest_data.py
File metadata and controls
83 lines (66 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
# coding: utf-8
import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
import click
import time
# Define dtypes and datetime columns
dtype = {
"VendorID": "Int64",
"passenger_count": "Int64",
"trip_distance": "float64",
"RatecodeID": "Int64",
"store_and_fwd_flag": "string",
"PULocationID": "Int64",
"DOLocationID": "Int64",
"payment_type": "Int64",
"fare_amount": "float64",
"extra": "float64",
"mta_tax": "float64",
"tip_amount": "float64",
"tolls_amount": "float64",
"improvement_surcharge": "float64",
"total_amount": "float64",
"congestion_surcharge": "float64"
}
parse_dates = [
"tpep_pickup_datetime",
"tpep_dropoff_datetime"
]
# Default CSV URL and chunk size
CSV_URL = "data/yellow_tripdata_2021-01.csv.gz"
CHUNK_SIZE = 100_000
@click.command()
@click.option('--pg-user', default='postgres', help='PostgreSQL user')
@click.option('--pg-pass', default='postgres', help='PostgreSQL password')
@click.option('--pg-host', default='localhost', help='PostgreSQL host')
@click.option('--pg-port', default=5432, type=int, help='PostgreSQL port')
@click.option('--pg-db', default='ny_taxi', help='PostgreSQL database name')
@click.option('--target-table', default='yellow_taxi_trips', help='Target table name')
def run(pg_user, pg_pass, pg_host, pg_port, pg_db, target_table):
"""Ingest NYC Taxi CSV data into Postgres in chunks"""
# Create DB connection
engine = create_engine(
f'postgresql+psycopg://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}'
)
print("🚀 Starting ingestion")
# Read CSV in chunks
df_iter = pd.read_csv(
CSV_URL,
chunksize=CHUNK_SIZE,
dtype=dtype,
parse_dates=parse_dates
)
start_time = time.time()
for i, df_chunk in enumerate(tqdm(df_iter, desc="Ingesting chunks")):
df_chunk.to_sql(
name=target_table,
con=engine,
if_exists='replace' if i == 0 else 'append',
index=False,
)
elapsed = time.time() - start_time
print(f"✅ Ingestion completed in {elapsed:.2f} seconds")
if __name__ == "__main__":
run()