-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbookquest.py
More file actions
98 lines (90 loc) · 3.48 KB
/
bookquest.py
File metadata and controls
98 lines (90 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""This product includes software developed by Bryant Moscon
(http://www.bryantmoscon.com/)"""
from cryptofeed import FeedHandler
from cryptofeed.defines import L2_BOOK
from cryptofeed.types import OrderBook
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.backends.aggregate import Throttle
from questdb.ingress import Sender, IngressError, TimestampNanos
import sys
import asyncio
from concurrent.futures import ProcessPoolExecutor
import logging
from sys import stdout
# Define logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logFormatter = logging.Formatter(\
"%(asctime)s %(levelname)-8s %(filename)s:%(funcName)s %(message)s")
consoleHandler = logging.StreamHandler(stdout)
consoleHandler.setFormatter(logFormatter)
logger.addHandler(consoleHandler)
QUEST_HOST = '127.0.0.1'
QUEST_PORT = 9009
def push_to_db(key: str, exchange: str, symbol: str, vals: str, receipt_timestamp: int, timestamp: int) -> None:
"""Insert new row into QuestDB table.
It will automatically create a new table if it doesn't exists yet.
Args:
key (str): _description_
exchange (str): _description_
symbol (str): _description_
vals (str): _description_
receipt_timestamp (int): _description_
timestamp (int): _description_
"""
logger.info(f"Pushing data to QuestDB table={key}")
try:
vals_split = vals.split(',')
orders_dict = dict((a.strip(), float(b.strip())) \
for a, b in (element.split('=') \
for element in vals_split))
with Sender(QUEST_HOST, QUEST_PORT) as sender:
task = sender.row(
key,
symbols={
'pair': symbol,
'exchange': exchange},
columns={
**orders_dict,
'receipt_timestamp': receipt_timestamp,
'timestamp': timestamp},
at=TimestampNanos.now())
sender.flush()
except IngressError as e:
sys.stderr.write(f'Got error: {e}\n')
async def callback(data: OrderBook, receipt: float, key: str='book', depth: int=1) -> None:
"""
Custom callback for L2_BOOK Cryptofeed channel that pushes data to QuestDB
Args:
data (OrderBook): _description_
receipt (float): _description_
key (str, optional): _description_. Defaults to 'book'.
depth (int, optional): _description_. Defaults to 1.
"""
loop = asyncio.get_event_loop()
book = data.book
vals = ','.join([f"bid_{i}_price={book.bids.index(i)[0]},bid_{i}_size={book.bids.index(i)[1]}" \
for i in range(depth)] + \
[f"ask{i}_price={book.asks.index(i)[0]},ask_{i}_size={book.asks.index(i)[1]}" \
for i in range(depth)])
receipt_timestamp = receipt
receipt_timestamp_int = int(receipt_timestamp * 1_000_000)
timestamp_int = int(receipt_timestamp_int * 1000)
with ProcessPoolExecutor() as p:
await loop.run_in_executor(p, \
push_to_db, key, data.exchange, data.symbol, vals, receipt_timestamp_int, timestamp_int)
def main() -> None:
"""Get top of the Binance Futures orderbook in a given time window."""
f = FeedHandler()
f.add_feed(
BinanceFutures(
symbols=['BTC-USDT-PERP', 'ETH-USDT-PERP'],
channels=[L2_BOOK],
callbacks={
L2_BOOK: Throttle(callback, window=60)
}
)
)
f.run()
if __name__ == '__main__':
main()