-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbd_create_data_1_unify_multi.py
More file actions
81 lines (73 loc) · 3.22 KB
/
bd_create_data_1_unify_multi.py
File metadata and controls
81 lines (73 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import polars as pl
from datetime import datetime
START_DATE = "2020-01-01"
ASSET_LIST = [
"ADAUSDT","ALGOUSDT","ANKRUSDT","ARPAUSDT","ATOMUSDT","BANDUSDT",
"BATUSDT","BCHUSDT","BNBUSDT","BTCUSDT","CELRUSDT","CHZUSDT",
"COSUSDT","CTXCUSDT","CVCUSDT","DASHUSDT","DENTUSDT","DOGEUSDT",
"DUSKUSDT","ENJUSDT","EOSUSDT","ETCUSDT","ETHUSDT","FETUSDT",
"FTMUSDT","FTTUSDT","FUNUSDT","HBARUSDT","HOTUSDT","ICXUSDT",
"IOSTUSDT","IOTAUSDT","IOTXUSDT","KAVAUSDT","LINKUSDT","LTCUSDT",
"MTLUSDT","NEOUSDT","NKNUSDT","NULSUSDT","ONEUSDT","ONGUSDT",
"ONTUSDT","RLCUSDT","RVNUSDT","STXUSDT","TFUELUSDT","THETAUSDT",
"TROYUSDT","TRXUSDT","TUSDUSDT","USDCUSDT","VETUSDT","VITEUSDT",
"WANUSDT","WINUSDT","XLMUSDT","XRPUSDT","XTZUSDT","ZECUSDT",
"ZILUSDT","ZRXUSDT"
]
INTERVALS = ['5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h']
def load_and_prepare(asset: str, folder: str, interval: str, cutoff: datetime) -> pl.DataFrame | None:
path = os.path.join(folder, f"{asset}_{interval}_historical_data.csv")
if not os.path.exists(path):
print(f"⚠️ Missing {asset} for {interval}")
return None
try:
df = (
pl.read_csv(path)
.select(["timestamp", "open", "high", "low", "close", "volume"])
.with_columns([
pl.col("timestamp").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S")
])
.filter(pl.col("timestamp") >= cutoff)
.rename({
"open": f"{asset}_open",
"high": f"{asset}_high",
"low": f"{asset}_low",
"close": f"{asset}_close",
"volume":f"{asset}_volume"
})
)
return df
except Exception as e:
print(f"❌ Error loading {asset} ({interval}): {e}")
return None
def merge_all_assets(assets, folder, interval, cutoff_date):
cutoff_dt = datetime.fromisoformat(cutoff_date)
merged = None
for asset in assets:
df = load_and_prepare(asset, folder, interval, cutoff_dt)
if df is None:
continue
if merged is None:
merged = df
else:
merged = merged.join(df, on="timestamp", how="inner")
return merged.sort("timestamp") if merged is not None else None
if __name__ == "__main__":
for interval in INTERVALS:
input_dir = f"datasets/raw/full_scope/{interval}"
output_dir = f"datasets/refined/unified_1_prep/{interval}"
os.makedirs(output_dir, exist_ok=True)
print(f"\n🔧 Processing {interval} interval...")
ds = merge_all_assets(ASSET_LIST, input_dir, interval, START_DATE)
if ds is None:
print(f"🚫 No data merged for {interval}")
continue
pdf = ds.to_pandas()
value_cols = [col for col in pdf.columns if col != "timestamp"]
mask = pdf[value_cols].notna().all(axis=1)
first_valid_index = mask.idxmax()
pdf_trimmed = pdf.iloc[first_valid_index:].reset_index(drop=True)
output_path = os.path.join(output_dir, "unified_dataset.csv")
pdf_trimmed.to_csv(output_path, index=False)
print(f"✅ Saved {interval} to {output_path} — final shape: {pdf_trimmed.shape}")