-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtool_get_market_data_parallel.py
More file actions
118 lines (94 loc) · 4.26 KB
/
tool_get_market_data_parallel.py
File metadata and controls
118 lines (94 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
import requests
import pandas as pd
from datetime import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# API Setup
BASE_URL = "https://api.binance.com/api/v3"
KLINE_ENDPOINT = f"{BASE_URL}/klines"
def ensure_dir_exists(path):
os.makedirs(path, exist_ok=True)
def fetch_historical_data(symbol, interval, start_date=None, end_date=None):
if not start_date:
start_date = datetime(2019, 1, 1)
if not end_date:
end_date = datetime.now()
start_timestamp = int(start_date.timestamp() * 1000)
end_timestamp = int(end_date.timestamp() * 1000)
all_klines = []
current_timestamp = start_timestamp
while current_timestamp < end_timestamp:
url = f"{KLINE_ENDPOINT}?symbol={symbol}&interval={interval}&limit=1000&startTime={current_timestamp}"
try:
response = requests.get(url)
response.raise_for_status()
klines = response.json()
if not klines:
break
all_klines.extend(klines)
current_timestamp = klines[-1][0] + 1
print(f"[{symbol}-{interval}] ✅ Up to {datetime.fromtimestamp(current_timestamp / 1000)}")
time.sleep(0.3)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"[{symbol}-{interval}] ❌ Fetch error: {e}")
if not all_klines:
return pd.DataFrame()
df = pd.DataFrame(all_klines, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_asset_volume', 'number_of_trades',
'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
numeric_columns = ['open', 'high', 'low', 'close', 'volume', 'quote_asset_volume',
'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, axis=1)
df = df[~df.index.duplicated(keep='first')]
return df
def save_to_csv(df, filename):
df = df.iloc[:-1]
df.to_csv(filename)
print(f"💾 Saved to {filename}")
def fetch_and_save(symbol, interval, max_retries=10, backoff_base=2):
attempt = 0
while attempt < max_retries:
try:
df = fetch_historical_data(symbol, interval)
if not df.empty:
filepath = f"datasets/raw/full_scope/{interval}/{symbol}_{interval}_historical_data.csv"
ensure_dir_exists(os.path.dirname(filepath))
save_to_csv(df, filepath)
return
else:
print(f"[{symbol}-{interval}] ⚠️ Empty dataset received. Retrying...")
except Exception as e:
wait = min(backoff_base ** attempt, 300) # Cap delay at 5 minutes
print(f"[{symbol}-{interval}] ❌ Error: {e} — Retrying in {wait}s...")
time.sleep(wait)
attempt += 1
print(f"[{symbol}-{interval}] ❌ Final failure after {max_retries} attempts.")
def main():
symbols = [
"ZRXUSDT", "ZILUSDT", "ZECUSDT", "XTZUSDT", "XRPUSDT", "XLMUSDT",
"WINUSDT", "WANUSDT", "VITEUSDT", "VETUSDT", "USDCUSDT", "TUSDUSDT",
"TRXUSDT", "TROYUSDT", "THETAUSDT", "TFUELUSDT", "STXUSDT", "RVNUSDT",
"RLCUSDT", "ONTUSDT", "ONGUSDT", "ONEUSDT", "NULSUSDT", "NKNUSDT",
"NEOUSDT", "MTLUSDT", "LTCUSDT", "LINKUSDT", "KAVAUSDT", "IOTXUSDT",
"IOTAUSDT", "IOSTUSDT", "ICXUSDT", "HOTUSDT", "HBARUSDT", "FUNUSDT",
"FTTUSDT", "FTMUSDT", "FETUSDT", "ETHUSDT", "ETCUSDT", "EOSUSDT",
"ENJUSDT", "DUSKUSDT"
]
# Total Intervals: ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '1w']
intervals = ['1m']
max_workers = 1
tasks = [(symbol, interval) for interval in intervals for symbol in symbols]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(fetch_and_save, symbol, interval) for symbol, interval in tasks]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"🚨 Unexpected thread failure: {e}")
if __name__ == "__main__":
main()