-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrade_executor.py
More file actions
343 lines (295 loc) Β· 13.5 KB
/
trade_executor.py
File metadata and controls
343 lines (295 loc) Β· 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import json, time, threading, datetime, os, sys, math, logging, signal
from collections import defaultdict, deque
import websocket
from pybit.unified_trading import HTTP
from dotenv import load_dotenv,find_dotenv
from colorama import init, Fore, Style
import redis
from bots.utils.logger import setup_logger
from bots.utils.redis_client import get_redis
from config_trade_executor import (
API_KEY, API_SECRET, ACCOUNT_TYPE,
LEVERAGE, ORDER_VALUE_USDT, MIN_BALANCE_USDT,
CONFIRM_SIGNALS, CONFIRMATION_REQUIRED,CONFIDENCE_MIN,
TRIGGER_CHANNEL, HEARTBEAT_CHANNEL,STOP_LOSS_PERCENT,TAKE_PROFIT_PERCENT,
SERVICE_STATUS_CH, BOT_NAME, LOG_LEVEL, LOG_FILE,LIMIT_OFFSET_PERCENT
)
SYMBOL_QTY_INCREMENT = {
'ETHUSDT': 0.01,
'XRPUSDT': 1,
'SOLUSDT': 0.01,
'DOGEUSDT': 1,
'ADAUSDT': 1,
# add more symbols as needed
}
# Load .env
load_dotenv()
dotenv_path = find_dotenv()
loaded = load_dotenv(dotenv_path)
print("Dotenv loaded from:", dotenv_path, "Success:", loaded)
print("BYBIT_API_KEY:", os.getenv("BYBIT_API_KEY"))
print("BYBIT_API_SECRET:", os.getenv("BYBIT_API_SECRET"))
print("Loaded LOG_FILE:", os.getenv("LOG_FILE"))
# Setup Logger
lg = setup_logger(LOG_FILE, getattr(logging, LOG_LEVEL.upper()))
lg.info("β
Logger initialized with level: %s", LOG_LEVEL)
# Ensure API credentials
if not (API_KEY and API_SECRET):
lg.error("β API_KEY/API_SECRET missing β exiting.")
sys.exit(1)
else:
lg.debug("β
API keys loaded.")
# Set up Bybit session
lg.debug("π§ Initializing Bybit HTTP session...")
session = HTTP(api_key=API_KEY, api_secret=API_SECRET)
# Set up Redis
lg.debug("π§ Connecting to Redis...")
redis_cli = get_redis()
pubsub = redis_cli.pubsub()
pubsub.subscribe(TRIGGER_CHANNEL)
open_positions = {}
lg.info("β
Connected to Redis queue: %s", TRIGGER_CHANNEL)
recent_dir = defaultdict(lambda: deque(maxlen=CONFIRM_SIGNALS))
lg.debug("π¦ Recent signals deque initialized (maxlen=%s)", CONFIRM_SIGNALS)
def round_qty(symbol: str, qty: float) -> float:
increment = SYMBOL_QTY_INCREMENT.get(symbol, 0.001) # default to 0.001 if missing
return math.floor(qty / increment) * increment
init(autoreset=True)
def display_positions(open_positions):
for symbol, data in open_positions.items():
side = data.get('side', 'Unknown').capitalize()
side_icon = "π" if side.lower() == "buy" else "π"
side_color = Fore.GREEN if side.lower() == "buy" else Fore.RED
pnl = float(data.get('unrealisedPnl', 0))
pnl_color = Fore.GREEN if pnl >= 0 else Fore.RED
status_icon = "β
" if data.get('positionStatus') == 'Normal' else "β οΈ"
print(f"{Fore.CYAN}π Open Position: {Fore.YELLOW}{symbol} {side_icon} Side: {side_color}{side.upper()}")
print(f" πΌ Size: {Fore.WHITE}{data.get('size')} {symbol[:-4]}")
print(f" π° Avg Price: {Fore.WHITE}${float(data.get('avgPrice', 0)):,.2f} π Mark Price: {Fore.WHITE}${float(data.get('markPrice', 0)):,.2f}")
print(f" π Unrealized PnL: {pnl_color}${pnl:.4f} π Cum Realized PnL: {Fore.MAGENTA}${float(data.get('cumRealisedPnl', 0)):,.4f}")
print(f" βοΈ Leverage: {Fore.WHITE}{data.get('leverage')}x π¨ Liquidation: {Fore.WHITE}${float(data.get('liqPrice', 0)):,.2f}")
print(f" π Position Value: {Fore.WHITE}${float(data.get('positionValue', 0)):,.2f}")
print(f" {status_icon} Status: {Fore.WHITE}{data.get('positionStatus')}\n")
# Heartbeat Thread
def heartbeat():
lg.debug("π Heartbeat thread started.")
while True:
msg = {
"bot_name": BOT_NAME,
"heartbeat": True,
"time": datetime.datetime.utcnow().isoformat()
}
redis_cli.publish(HEARTBEAT_CHANNEL, json.dumps(msg))
lg.debug("π Heartbeat sent: %s", msg)
time.sleep(30)
def update_open_positions():
try:
positions = session.get_positions(category="linear", settleCoin='USDT')
current_positions = positions["result"]["list"]
open_positions.clear()
for pos in current_positions:
if float(pos["size"]) > 0:
open_positions[pos["symbol"]] = pos
#lg.debug("π Updated open positions: %s", open_positions)
display_positions(open_positions)
except Exception as e:
lg.error("β οΈ Position update error: %s", e)
# Get USDT Balance
def get_available_usdt() -> float:
lg.debug("πͺ Fetching available USDT balance...")
try:
r = session.get_wallet_balance(accountType=ACCOUNT_TYPE)
lg.debug("πͺ Balance response: %s", r)
for coin in r["result"]["list"][0]["coin"]:
if coin["coin"] == "USDT":
bal = float(coin.get("totalAvailableBalance") or coin["walletBalance"])
lg.info("πͺ USDT Balance: %.2f", bal)
return bal
except Exception as e:
lg.error("β οΈ Balance fetch failed: %s", e)
return 0.0
# Set Leverage
def set_leverage(sym: str):
lg.debug("π§ Setting leverage for %s to %d", sym, LEVERAGE)
try:
resp = session.set_leverage(
category="linear", symbol=sym,
buyLeverage=str(LEVERAGE), sellLeverage=str(LEVERAGE)
)
lg.info("β
Leverage set response: %s", resp)
except Exception as e:
if "leverage not modified" in str(e):
lg.info("βΉοΈ Leverage already set β ignoring.")
else:
lg.error("β οΈ Leverage error: %s", e)
# Place Market Order
def place_market(sym: str, side: str, usdt: float):
if sym in open_positions:
lg.info("β οΈ %s already has open trade. Skipping...", sym)
return
lg.debug("π Preparing market order: %s %s %.2f USDT", sym, side, usdt)
try:
price_data = session.get_tickers(category="linear", symbol=sym)
lg.debug("π Price data: %s", price_data)
price = float(price_data["result"]["list"][0]["lastPrice"])
qty = math.floor(((usdt * LEVERAGE) / price) * 1000) / 1000
lg.info("π Calculated qty: %.3f at price: %.2f", qty, price)
if qty < 0.001:
lg.warning("β οΈ Calculated qty %.3f below min order size β skipping.", qty)
return
# Calculate Stop Loss and Take Profit
if side == "long":
sl_price = price * (1 - STOP_LOSS_PERCENT / 100)
tp_price = price * (1 + TAKE_PROFIT_PERCENT / 100)
else:
sl_price = price * (1 + STOP_LOSS_PERCENT / 100)
tp_price = price * (1 - TAKE_PROFIT_PERCENT / 100)
resp = session.place_order(
category="linear",
symbol=sym,
side="Buy" if side == "long" else "Sell",
orderType="Market",
qty=f"{qty:.3f}",
timeInForce="IOC",
stopLoss=f"{sl_price:.2f}",
takeProfit=f"{tp_price:.2f}"
)
lg.info("π Order placed: %s %s, Response: %s", sym, side, resp)
open_positions[sym] = {
"side": side,
"order_id": resp["result"]["orderId"],
"qty": qty,
"sl_price": sl_price,
"tp_price": tp_price,
"timestamp": datetime.datetime.utcnow().isoformat()
}
lg.info("π‘οΈ SL: %.2f | π― TP: %.2f", sl_price, tp_price)
except Exception as e:
lg.error("β οΈ Order error: %s", e)
def place_limit_order(sym: str, side: str, usdt: float):
if sym in open_positions:
lg.info("β οΈ %s already has open trade. Skipping...", sym)
return
lg.debug("π Preparing limit order: %s %s %.2f USDT", sym, side, usdt)
try:
orderbook = session.get_orderbook(category="linear", symbol=sym)
best_bid = float(orderbook["result"]["b"][0][0])
best_ask = float(orderbook["result"]["a"][0][0])
limit_price = best_bid * (1 - LIMIT_OFFSET_PERCENT / 100) if side == "long" else best_ask * (1 + LIMIT_OFFSET_PERCENT / 100)
raw_qty = (usdt * LEVERAGE) / limit_price
qty = round_qty(sym, raw_qty)
lg.info("π Qty rounded: %.3f at limit price: %.4f", qty, limit_price)
min_qty = SYMBOL_QTY_INCREMENT.get(sym, 0.001)
if qty < min_qty:
lg.warning("β οΈ Qty %.3f below min order size (%.3f). Skipping.", qty, min_qty)
return
# Calculate SL & TP
sl_price = limit_price * (1 - STOP_LOSS_PERCENT / 100) if side == "long" else limit_price * (1 + STOP_LOSS_PERCENT / 100)
tp_price = limit_price * (1 + TAKE_PROFIT_PERCENT / 100) if side == "long" else limit_price * (1 - TAKE_PROFIT_PERCENT / 100)
resp = session.place_order(
category="linear",
symbol=sym,
side="Buy" if side == "long" else "Sell",
orderType="Limit",
qty=f"{qty}",
price=f"{limit_price:.4f}",
timeInForce="PostOnly",
stopLoss=f"{sl_price:.4f}",
takeProfit=f"{tp_price:.4f}"
)
lg.info("π Limit order placed: %s %s, Response: %s", sym, side, resp)
open_positions[sym] = {
"side": side,
"order_id": resp["result"]["orderId"],
"qty": qty,
"limit_price": limit_price,
"sl_price": sl_price,
"tp_price": tp_price,
"timestamp": datetime.datetime.utcnow().isoformat()
}
lg.info("π Limit: %.4f | π‘οΈ SL: %.4f | π― TP: %.4f", limit_price, sl_price, tp_price)
except Exception as e:
lg.error("β οΈ Limit order error: %s", e)
# Graceful shutdown handler
running = True
def signal_handler(sig, frame):
global running
lg.info("π¦ Received shutdown signal (Ctrl+C). Exiting gracefully...")
running = False
pubsub.close()
signal.signal(signal.SIGINT, signal_handler)
# Start heartbeat
threading.Thread(target=heartbeat, daemon=True).start()
recent_signals = defaultdict(lambda: deque(maxlen=CONFIRM_SIGNALS))
lg.debug("π¦ Recent signals initialized (maxlen=%s)", CONFIRM_SIGNALS)
# Replace main loop with Redis Queue (trigger_queue consumer)
# Main loop (Redis queue consumer)
lg.info("π’ Starting Redis queue consumer loop...")
try:
while running:
update_open_positions()
msg = redis_cli.blpop(TRIGGER_CHANNEL, timeout=5)
if msg:
queue_name, payload_raw = msg
lg.debug("π© Popped message from '%s': %s", queue_name, payload_raw)
payload = json.loads(payload_raw)
sym = payload["symbol"]
interval = payload["interval"]
direction = payload.get("direction")
conf = float(payload.get("confidence", 0))
signal_type = payload.get("signal_type")
ctx = payload["context"]
close, rsi, macd = ctx["close"], ctx["rsi"], ctx["macd"]
volume, volume_ma = ctx["volume"], ctx["volume_ma"]
window_start, window_end = payload["window_start"][11:16], payload["window_end"][11:16]
# Comprehensive logging
log_msg = (f"π₯ Signal: {sym} | {interval}m | {signal_type} | "
f"{'π Long' if direction=='long' else 'π Short'} | "
f"π― Conf: {conf:.2f}% (Min: {CONFIDENCE_MIN}%) | "
f"π Window: {window_start}->{window_end} | Close: ${close:.2f} | "
f"RSI: {rsi:.2f} | MACD: {macd:.4f} | Vol: {volume:.2f} (Avg: {volume_ma:.2f})")
reason_skipped = None
if direction not in ("long", "short"):
reason_skipped = "Invalid direction"
elif sym in open_positions:
reason_skipped = "Position already open"
elif conf < CONFIDENCE_MIN:
reason_skipped = f"Confidence below {CONFIDENCE_MIN}%"
elif get_available_usdt() < MIN_BALANCE_USDT:
reason_skipped = "Insufficient balance"
if reason_skipped:
lg.info(f"{log_msg} | β Skipped: {reason_skipped}")
continue
# Store detailed signal info
recent_signals[sym].append({
"direction": direction,
"signal_type": signal_type,
"confidence": conf,
"timestamp": datetime.datetime.utcnow()
})
lg.debug("π Recent signals for [%s]: %s", sym, list(recent_signals[sym]))
# Check for required consecutive aligned signals
required_signals = CONFIRMATION_REQUIRED.get(interval, CONFIRM_SIGNALS)
signals_list = list(recent_signals[sym])
if len(signals_list) >= required_signals:
last_signals = signals_list[-required_signals:]
directions_aligned = all(sig["direction"] == direction for sig in last_signals)
if directions_aligned:
bal = get_available_usdt()
if bal < MIN_BALANCE_USDT:
lg.warning("β οΈ Low balance (%.2f USDT). Skipping.", bal)
continue
set_leverage(sym)
place_limit_order(sym, direction, ORDER_VALUE_USDT)
recent_signals[sym].clear()
lg.debug("β»οΈ Cleared signals after execution for %s", sym)
else:
lg.debug("π‘ Signals not aligned yet: %s", last_signals)
else:
lg.debug("π‘ Waiting for more signals (%d/%d)", len(signals_list), required_signals)
else:
lg.debug("β³ No message received, polling again...")
except KeyboardInterrupt:
lg.info("π¦ Ctrl+C pressed. Exiting gracefully...")
finally:
lg.info("π Shutdown complete.")