-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Description
Hello,
for storing historical data you may be interested in using a timeseries database.
Here is some code to download data from Binance
import datetime
from binance_historical_data import BinanceDataDumper
data_dumper = BinanceDataDumper(
path_dir_where_to_dump=".",
asset_class="spot", # spot, um, cm
data_type="klines", # aggTrades, klines, trades
data_frequency="1h",
)
data_dumper.dump_data(
tickers="BTCUSDT",
date_start=datetime.date(2023, 7, 1),
date_end=None,
is_to_update_existing=False,
)using https://pypi.org/project/binance-historical-data/
Store data into an InfluxDB database
from influxdb_client import InfluxDBClient, WriteOptions
import datetime
import pandas as pd
data_source = "binance"
asset_class = "spot" # spot, um, cm
storage_frequency = "daily" # daily, monthly
data_type = "klines" # aggTrades, klines, trades
symbol = "BTCUSDT"
data_frequency = "1h"
dt = datetime.date(2023, 7, 1)
fname = f"C:\\Users\\w4c\\data\\{data_source}\\{asset_class}\\{storage_frequency}\\{data_type}\\{symbol}\\{data_frequency}\\{symbol}-{data_frequency}-{dt.year}-{dt.month:02}-{dt.day:02}.csv"
with InfluxDBClient.from_env_properties() as client:
columns = [
"OpenTime",
"Open",
"High",
"Low",
"Close",
"Volume",
"CloseTime",
"Quote asset volume",
"Number of trades",
"Taker buy base asset volume",
"Taker buy quote asset volume",
"Ignore",
]
for df in pd.read_csv(fname, chunksize=1_000, names=columns):
# for col in ["OpenTime", "CloseTime"]:
for col in ["OpenTime"]:
df[col] = pd.to_datetime(df[col], unit="ms")
df["CloseTime"] *= 1_000_000.0
df["data_source"] = data_source
df["asset_class"] = asset_class
df["data_type"] = data_type
df["data_frequency"] = data_frequency
df["symbol"] = symbol
print(df)
print(df.dtypes)
with client.write_api() as write_api:
try:
write_api.write(
record=df,
bucket="data",
data_frame_measurement_name="crypto",
data_frame_tag_columns=[
"data_source",
"asset_class",
"data_type",
"symbol",
"data_frequency",
],
data_frame_timestamp_column="OpenTime",
)
except Exception as e:
print(e)using https://github.com/influxdata/influxdb-client-python
Retrieve data as Pandas DataFrame
from influxdb_client import InfluxDBClient, WriteOptions
import pandas as pd
pd.options.display.max_rows = 10
pd.options.display.max_columns = 20
data_source = "binance"
asset_class = "spot" # spot, um, cm
data_type = "klines" # aggTrades, klines, trades
symbol = "BTCUSDT"
data_frequency = "1h"
dt_from = pd.to_datetime("2023-07-01")
dt_to = pd.to_datetime("2023-07-02")
ts_from = int(dt_from.timestamp())
ts_to = int(dt_to.timestamp())
# query = 'from(bucket:\"data\") |> range(start:-30d)'
query = f"""from(bucket:"data")
|> range(start: {ts_from}, stop: {ts_to})
|> filter(fn: (r) => r.data_source == "{data_source}"
and r.asset_class == "{asset_class}"
and r.data_type == "{data_type}"
and r.symbol == "{symbol}"
and r.data_frequency == "{data_frequency}"
)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")"""
with InfluxDBClient.from_env_properties() as client:
df = client.query_api().query_data_frame(query=query)
if len(df) > 0:
df.drop(columns=["result", "table", "_start", "_stop"], inplace=True)
df.rename(columns={"_time": "OpenTime"}, inplace=True)
df["CloseTime"] = pd.to_datetime(df["CloseTime"])
print(df)I'm still facing an issue influxdata/influxdb-client-python#592
Maybe an other TSDB should be considered ? TimescaleDB for example.
An other approach could be to simply store data as Parquet or Feather files (or an other format) into a hierarchical directory
Kind regards
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels