-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathanalysis.py
More file actions
329 lines (286 loc) · 14 KB
/
analysis.py
File metadata and controls
329 lines (286 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# Technical Analysis Module for Trading Signals
"""
Technical Analysis Module
=========================
Provides ``TechnicalAnalysis`` — the signal engine used by ``TradingBot``.
Responsibilities
----------------
- Maintains a rolling price history (up to 65 ticks) per pair, persisted to
``data/history_buffer.json`` so indicators survive a bot restart without a
warm-up gap.
- Pre-populates history from 15 m OHLC candles via ``seed_from_ohlc()`` on
startup so RSI/SMA are usable immediately.
- Generates a ``(signal, score)`` tuple via ``generate_signal_with_score()``:
*Mean-reversion path* (``enable_mr_signals=True``):
RSI oversold (< ``mr_rsi_buy``) → ``BUY``; RSI overbought
(> ``mr_rsi_sell``) → ``SELL``. Score driven by distance from 30/70.
*Trend/breakout path* (``enable_trend_signals=True``):
Price above Bollinger upper band + RSI ≥ 55 → ``BUY``; price below
lower band + RSI ≤ 45 → ``SELL``.
Score range: −50 … +50; positive = bullish bias.
The stronger path wins (highest |score| overrides the weaker one).
ATR and Williams %R provide a small additional boost when confirming.
- Computes ATR (Average True Range) from price history for dynamic stops.
- Multi-timeframe trend confirmation via ``check_mtf_trend()``.
Signal engine mode flags (pushed from ``TradingBot`` after config load)
-----------------------------------------------------------------------
- ``enable_mr_signals`` — enable mean-reversion path (default: ``True``)
- ``enable_trend_signals``— enable BB breakout path (default: ``True``)
- ``mr_rsi_buy`` — RSI buy threshold (default: 33)
- ``mr_rsi_sell`` — RSI sell threshold (default: 67)
"""
import logging
import numpy as np
import os
import json
from collections import deque
class TechnicalAnalysis:
"""
Technical analysis tool for generating trading signals based on market data.
Supports multi-pair analysis with separate price history per pair.
"""
def __init__(self, rsi_period=14, sma_short=20, sma_long=50, min_volatility_pct=0.15):
self.rsi_period = rsi_period
self.sma_short = sma_short
self.sma_long = sma_long
self.min_volatility_pct = min_volatility_pct
self.logger = logging.getLogger(__name__)
self.pair_price_history = {}
self.max_history = 200 # 200 ticks → SMA50 uses 50/200; better context than 65
self.buffer_path = os.path.join(os.path.dirname(__file__), 'data', 'history_buffer.json')
# Signal engine mode flags (pushed from TradingBot after config load)
self.enable_mr_signals = True # mean-reversion: RSI oversold/overbought
self.enable_trend_signals = True # trend/breakout: Bollinger Band momentum
self.mr_rsi_buy = 33.0 # RSI <= threshold triggers mean-reversion BUY
self.mr_rsi_sell = 67.0 # RSI >= threshold triggers mean-reversion SELL
self._load_history()
def _get_price_history(self, pair):
if pair not in self.pair_price_history:
self.pair_price_history[pair] = deque(maxlen=self.max_history)
return self.pair_price_history[pair]
def _load_history(self):
try:
if os.path.exists(self.buffer_path):
with open(self.buffer_path, 'r') as f:
data = json.load(f)
for pair, prices in data.items():
self.pair_price_history[pair] = deque(prices, maxlen=self.max_history)
self.logger.info(f"Loaded price history for {len(data)} pairs from buffer")
except Exception as e:
self.logger.error(f"Error loading price history buffer: {e}")
def _save_history(self):
"""Atomically write price history so a crash/power-loss never leaves a corrupted file."""
try:
import tempfile
os.makedirs(os.path.dirname(self.buffer_path), exist_ok=True)
data = {pair: list(prices) for pair, prices in self.pair_price_history.items()}
dir_path = os.path.dirname(self.buffer_path)
fd, tmp_path = tempfile.mkstemp(dir=dir_path, suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
json.dump(data, f)
os.replace(tmp_path, self.buffer_path) # atomic on POSIX
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
self.logger.error(f"Error saving price history buffer: {e}")
def seed_from_nas_ohlc(self, pair, nas_root):
"""Seed price history from NAS 5-minute OHLC CSV files.
CSV format: ts,open,high,low,close,vwap,volume,count
NAS folder mapping: XBTEUR→XXBTZEUR, ETHEUR→XETHZEUR, XRPEUR→XXRPZEUR,
SOLEUR→SOLEUR (same).
Only seeds if history is still too sparse (< sma_long).
"""
import csv
from pathlib import Path
folder_map = {
'XBTEUR': 'XXBTZEUR',
'ETHEUR': 'XETHZEUR',
'XRPEUR': 'XXRPZEUR',
'SOLEUR': 'SOLEUR',
'ADAEUR': 'ADAEUR',
'DOTEUR': 'DOTEUR',
'LINKEUR': 'LINKEUR',
}
history = self._get_price_history(pair)
if len(history) >= self.max_history:
return # buffer already full
folder = folder_map.get(pair, pair)
import datetime
year = datetime.datetime.utcnow().year
csv_path = Path(nas_root) / str(year) / folder / 'ohlc_5m.csv'
if not csv_path.exists():
return
try:
closes = []
with open(csv_path, newline='') as f:
reader = csv.DictReader(f)
for row in reader:
try:
closes.append(float(row['close']))
except (KeyError, ValueError):
continue
if not closes:
return
# Prepend NAS data (older) before current live history
needed = self.max_history - len(history)
nas_closes = closes[-needed:] if needed < len(closes) else closes
existing = list(history)
history.clear()
for c in nas_closes:
history.append(c)
for c in existing:
history.append(c)
self._save_history()
self.logger.info(f"[NAS seed] {pair}: prepended {len(nas_closes)} closes → buffer={len(history)}/{self.max_history}")
except Exception as e:
self.logger.warning(f"[NAS seed] {pair}: failed to seed from {csv_path}: {e}")
"""Pre-populate price history from OHLC candle closes.
Only seeds if the current history is too sparse for reliable signals.
"""
history = self._get_price_history(pair)
if len(history) >= self.sma_long:
return # already warmed up
for c in closes[-self.max_history:]:
history.append(float(c))
self._save_history()
self.logger.info(f"[OHLC seed] {pair}: seeded {len(history)} closes from 15m candles")
def calculate_rsi(self, prices):
"""Compute the Relative Strength Index over the last ``rsi_period`` values.
Returns a float in [0, 100], or ``None`` if there are fewer than
``rsi_period + 1`` data points.
"""
if len(prices) < self.rsi_period + 1:
return None
prices = np.array(prices)
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains[-self.rsi_period:])
avg_loss = np.mean(losses[-self.rsi_period:])
if avg_loss == 0:
return 100 if avg_gain > 0 else 0
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def calculate_atr(self, pair, period=14):
"""Calculate Average True Range using price history buffer."""
prices = self._get_price_history(pair)
if len(prices) < period + 1:
return None
# Approximate TR using absolute difference of consecutive closes
prices_list = list(prices)
tr = [abs(prices_list[i] - prices_list[i-1]) for i in range(1, len(prices_list))]
return np.mean(tr[-period:])
def check_mtf_trend(self, prices, short_p=20, long_p=50):
"""Check if the general trend is bullish on the provided history."""
if len(prices) < long_p:
return True # Not enough data, don't block
sma_short = np.mean(prices[-short_p:])
sma_long = np.mean(prices[-long_p:])
return sma_short > sma_long
def generate_signal(self, market_data):
"""Convenience wrapper — returns only the signal string (ignores score)."""
signal, _ = self.generate_signal_with_score(market_data)
return signal
def generate_signal_with_score(self, market_data):
try:
if not market_data:
return "HOLD", 0
pair_key = list(market_data.keys())[0]
pair_data = market_data[pair_key]
if 'c' not in pair_data:
self.logger.warning("No closing price found in market data")
return "HOLD", 0
close_price = float(pair_data['c'][0])
price_history = self._get_price_history(pair_key)
price_history.append(close_price)
self._save_history()
if len(price_history) < self.sma_long:
return "HOLD", 0
prices = np.array(list(price_history))
# Bollinger Band Breakout Logic
# Use same parameters as backtest: SMA20, STD20, SMA50
sma20 = np.mean(prices[-20:])
std20 = np.std(prices[-20:])
sma50 = np.mean(prices[-50:])
upper_bb = sma20 + (2.0 * std20)
lower_bb = sma20 - (2.0 * std20)
current_price = prices[-1]
signal = "HOLD"
score = 0.0
# RSI confirmation (used by both signal paths)
rsi_confirm = self.calculate_rsi(list(price_history)[-20:]) if len(price_history) >= 20 else None
rsi_full = self.calculate_rsi(list(price_history)) if len(price_history) >= self.rsi_period + 1 else None
sma_ratio = (sma20 - sma50) / sma50 if sma50 > 0 else 0.0
# --- Mean-reversion signal path (reversion_bias variant) ---
# Buy extreme RSI oversold when not in strong downtrend; sell RSI overbought
if self.enable_mr_signals and rsi_full is not None:
rsi_s = 0.0
buy_t = getattr(self, 'mr_rsi_buy', 33)
sell_t = getattr(self, 'mr_rsi_sell', 67)
if rsi_full <= buy_t:
rsi_s = (buy_t - rsi_full) / max(buy_t, 1) * 50
elif rsi_full >= sell_t:
rsi_s = -((rsi_full - sell_t) / max(100 - sell_t, 1) * 50)
sma_s = max(-50.0, min(50.0, sma_ratio * 100 * 10))
mr_score = rsi_s + sma_s
if rsi_full <= self.mr_rsi_buy and sma_ratio > -0.01:
signal = "BUY"
score = mr_score
elif rsi_full >= self.mr_rsi_sell and sma_ratio < 0.01:
signal = "SELL"
score = mr_score
# --- Trend/breakout signal path (Bollinger Band momentum) ---
# Only overrides MR signal if trend signal is stronger
if self.enable_trend_signals:
if current_price > upper_bb:
# Bullish Breakout: require RSI >= 55 to confirm momentum
if current_price > sma50 and (rsi_confirm is None or rsi_confirm >= 55):
trend_score = min(50.0, 25.0 + (((current_price - upper_bb) / upper_bb) * 100 * 50.0))
if trend_score > score:
signal = "BUY"
score = trend_score
elif current_price > sma50 and score == 0.0:
score = 8.0 # weak, no signal override
elif current_price < lower_bb:
# Bearish Breakout: require RSI <= 45 to confirm downward momentum
if current_price < sma50 and (rsi_confirm is None or rsi_confirm <= 45):
trend_score = max(-50.0, -25.0 - (((lower_bb - current_price) / lower_bb) * 100 * 50.0))
if trend_score < score:
signal = "SELL"
score = trend_score
elif current_price < sma50 and score == 0.0:
score = -8.0 # weak, no signal override
# Cap score
score = max(-50.0, min(50.0, score))
# Additional indicators: ATR and Williams %R (approximate from closes)
atr = None
willr = None
try:
# approximate ATR from close diffs as fallback (we don't have full OHLC here)
tr = np.abs(np.diff(prices))
atr = float(np.mean(tr[-14:])) if len(tr) >= 14 else None
except Exception:
atr = None
try:
window = 14
if len(prices) >= window:
high_w = np.max(prices[-window:])
low_w = np.min(prices[-window:])
willr = (high_w - current_price) / (high_w - low_w) * -100 if (high_w - low_w) != 0 else None
except Exception:
willr = None
# Boost score slightly if ATR breakout and %R supports momentum
if atr is not None and willr is not None:
if current_price > upper_bb and willr < -20:
score += min(8.0, (atr / max(1e-6, sma20)) * 100.0)
if current_price < lower_bb and willr > -80:
score -= min(8.0, (atr / max(1e-6, sma20)) * 100.0)
return signal, score
except Exception as e:
self.logger.error(f"Error generating signal: {e}")
return "HOLD", 0