diff --git a/README.md b/README.md index bc9744c..858e168 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Autonomous evolutionary crypto trading system. AI agents trade USDT perpetual fu ## Features - **Evolutionary lifecycle** — agents are born, trade, die, and pass learned knowledge to the next generation via DNA encoding -- **Q-Learning brain** — 30-dimensional feature engineering with linear function approximation and experience replay +- **Q-Learning brain** — 30-dimensional feature engineering with linear approximation, experience replay, retrieval-style pattern memory, online trade-outcome ML, and explainable decision traces - **4 trading strategies** — Momentum (EMA crossover), Mean Reversion (Bollinger), Scalping (VWAP), Breakout (range) - **Adaptive regime detection** — automatically selects strategies based on market conditions (trending, choppy, ranging) - **Health system** — HP-based survival mechanic that enforces discipline and kills underperforming agents diff --git a/darwin_agent/ml/brain.py b/darwin_agent/ml/brain.py index cf5fbb7..6fb1095 100644 --- a/darwin_agent/ml/brain.py +++ b/darwin_agent/ml/brain.py @@ -1,13 +1,14 @@ """ML Brain — Q-Learning with linear function approximation.""" -import numpy as np import json import os -from typing import List, Dict, Optional, Tuple, Any -from dataclasses import dataclass, field -from datetime import datetime from collections import deque -from darwin_agent.ml.features import N_FEATURES +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, Tuple + +import numpy as np + +from darwin_agent.ml.features import FEATURE_NAMES, N_FEATURES @dataclass @@ -42,7 +43,7 @@ def __init__(self, n_features: int = N_FEATURES, learning_rate: float = 0.01, self.epsilon_decay = epsilon_decay self.epsilon_min = epsilon_min - self.n_actions = len(self.STRATEGIES) * len(self.SIZINGS) # 15 + self.n_actions = len(self.STRATEGIES) * len(self.SIZINGS) self.weights = np.random.randn(self.n_actions, n_features) * 0.01 self.bias = np.zeros(self.n_actions) @@ -53,14 +54,108 @@ def __init__(self, n_features: int = N_FEATURES, learning_rate: float = 0.01, self.exploitation_count = 0 self.regime_bonuses: Dict[str, np.ndarray] = {} + # LLM-like retrieval memory (online prototypes + quality stats). + self.pattern_memory: Dict[str, Dict[str, Any]] = {} + self.pattern_lr = 0.08 + self.pattern_memory_max_regimes = 16 + + def _sanitize_state(self, state: np.ndarray) -> np.ndarray: + arr = np.array(state, dtype=np.float32).reshape(-1) + if arr.shape[0] != self.n_features: + fixed = np.zeros(self.n_features, dtype=np.float32) + n = min(self.n_features, arr.shape[0]) + if n > 0: + fixed[:n] = arr[:n] + arr = fixed + arr = np.nan_to_num(arr, nan=0.0, posinf=5.0, neginf=-5.0) + return np.clip(arr, -5.0, 5.0) + def predict_q(self, state: np.ndarray, regime: str = "unknown") -> np.ndarray: + state = self._sanitize_state(state) q = self.weights @ state + self.bias if regime in self.regime_bonuses: q += self.regime_bonuses[regime] + q += self._pattern_bias(state, regime) return q + def _pattern_bias(self, state: np.ndarray, regime: str) -> np.ndarray: + """Return additive Q-bias using learned success prototypes per action.""" + bias = np.zeros(self.n_actions) + keys = [regime] if regime in self.pattern_memory else [] + if "global" in self.pattern_memory: + keys.append("global") + + for key in keys: + mem = self.pattern_memory.get(key, {}) + for action_str, stats in mem.items(): + try: + action_idx = int(action_str) + except (TypeError, ValueError): + continue + if not (0 <= action_idx < self.n_actions): + continue + + centroid = self._sanitize_state(np.array(stats.get("centroid", []), dtype=np.float32)) + dist = float(np.linalg.norm(state - centroid)) + similarity = 1.0 / (1.0 + dist) + win_rate = float(stats.get("win_rate", 0.5)) + reward_avg = float(stats.get("avg_reward", 0.0)) + count = max(0.0, float(stats.get("count", 0))) + support = min(1.0, count / 25.0) + memory_signal = ((win_rate - 0.5) * 2.0 + reward_avg * 0.08) * similarity * support + bias[action_idx] += memory_signal + return np.clip(bias, -2.5, 2.5) + + def _prune_pattern_memory(self): + if len(self.pattern_memory) <= self.pattern_memory_max_regimes: + return + removable = [k for k in self.pattern_memory.keys() if k != "global"] + if not removable: + return + + def score(regime_name: str) -> float: + entries = self.pattern_memory.get(regime_name, {}) + return float(sum(float(v.get("count", 0)) for v in entries.values())) + + removable.sort(key=score) + to_remove = len(self.pattern_memory) - self.pattern_memory_max_regimes + for regime_name in removable[:to_remove]: + self.pattern_memory.pop(regime_name, None) + + def _remember_pattern(self, state, action, reward, regime): + """Online pattern learner (lightweight memory inspired by LLM-style retrieval).""" + state = self._sanitize_state(state) + keys = [regime, "global"] if regime and regime != "unknown" else ["global"] + for key in keys: + bucket = self.pattern_memory.setdefault(key, {}) + entry = bucket.setdefault(str(action), { + "count": 0, + "wins": 0, + "losses": 0, + "avg_reward": 0.0, + "centroid": state.tolist(), + }) + + count = int(entry["count"]) + 1 + wins = int(entry.get("wins", 0)) + (1 if reward > 0 else 0) + losses = int(entry.get("losses", 0)) + (1 if reward <= 0 else 0) + avg_reward = float(entry["avg_reward"]) + (float(reward) - float(entry["avg_reward"])) / count + + old_centroid = self._sanitize_state(np.array(entry.get("centroid", state.tolist()), dtype=np.float32)) + centroid = (1 - self.pattern_lr) * old_centroid + self.pattern_lr * state + + entry["count"] = count + entry["wins"] = wins + entry["losses"] = losses + entry["avg_reward"] = avg_reward + entry["win_rate"] = wins / max(1, count) + entry["centroid"] = centroid.tolist() + + self._prune_pattern_memory() + def choose_action(self, state: np.ndarray, regime: str = "unknown", health_pct: float = 1.0) -> Tuple[int, Action]: + state = self._sanitize_state(state) self.total_decisions += 1 eff_eps = min(0.5, self.epsilon * 2) if health_pct < 0.3 else self.epsilon @@ -68,9 +163,9 @@ def choose_action(self, state: np.ndarray, regime: str = "unknown", self.exploration_count += 1 if health_pct < 0.4: safe = self._safe_actions() - idx = np.random.choice(safe) + idx = int(np.random.choice(safe)) else: - idx = np.random.randint(self.n_actions) + idx = int(np.random.randint(self.n_actions)) else: self.exploitation_count += 1 q = self.predict_q(state, regime) @@ -82,10 +177,13 @@ def choose_action(self, state: np.ndarray, regime: str = "unknown", return idx, self._decode(idx) def learn(self, state, action, reward, next_state, done, regime="unknown"): + state = self._sanitize_state(state) + next_state = self._sanitize_state(next_state) if next_state is not None else None self.memory.append(Experience(state=state, action=action, reward=reward, next_state=next_state, done=done, metadata={"regime": regime})) self._update(state, action, reward, next_state, done, regime) + self._remember_pattern(state, action, reward, regime) if len(self.memory) >= self.batch_size: self._replay() self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay) @@ -144,12 +242,47 @@ def export_brain(self): "weights": self.weights.tolist(), "bias": self.bias.tolist(), "epsilon": self.epsilon, "regime_bonuses": {k: v.tolist() for k, v in self.regime_bonuses.items()}, + "pattern_memory": self.pattern_memory, "total_decisions": self.total_decisions, "exploration_count": self.exploration_count, "exploitation_count": self.exploitation_count, "n_features": self.n_features, "n_actions": self.n_actions, } + def _sanitize_pattern_memory(self, data: Any) -> Dict[str, Dict[str, Any]]: + if not isinstance(data, dict): + return {} + + clean: Dict[str, Dict[str, Any]] = {} + for regime_name, entries in data.items(): + if not isinstance(regime_name, str) or not isinstance(entries, dict): + continue + clean_entries: Dict[str, Any] = {} + for action_key, stats in entries.items(): + try: + action_idx = int(action_key) + except (TypeError, ValueError): + continue + if not (0 <= action_idx < self.n_actions) or not isinstance(stats, dict): + continue + centroid = self._sanitize_state(np.array(stats.get("centroid", []), dtype=np.float32)).tolist() + count = max(0, int(stats.get("count", 0))) + wins = max(0, int(stats.get("wins", 0))) + losses = max(0, int(stats.get("losses", 0))) + avg_reward = float(stats.get("avg_reward", 0.0)) + win_rate = float(stats.get("win_rate", wins / max(1, count))) + clean_entries[str(action_idx)] = { + "count": count, + "wins": wins, + "losses": losses, + "avg_reward": avg_reward, + "win_rate": max(0.0, min(1.0, win_rate)), + "centroid": centroid, + } + if clean_entries: + clean[regime_name] = clean_entries + return clean + def import_brain(self, data, mutation_rate=0.05): if "weights" in data: w = np.array(data["weights"]) @@ -161,13 +294,20 @@ def import_brain(self, data, mutation_rate=0.05): ma = min(w.shape[0], self.weights.shape[0]) mf = min(w.shape[1], self.weights.shape[1]) self.weights[:ma, :mf] = w[:ma, :mf] + mb = min(b.shape[0], self.bias.shape[0]) + self.bias[:mb] = b[:mb] if "regime_bonuses" in data: for r, b in data["regime_bonuses"].items(): arr = np.array(b) if len(arr) == self.n_actions: self.regime_bonuses[r] = arr + if "pattern_memory" in data: + self.pattern_memory = self._sanitize_pattern_memory(data["pattern_memory"]) if "epsilon" in data: - self.epsilon = min(0.3, data["epsilon"] * 1.5) + try: + self.epsilon = min(0.3, float(data["epsilon"]) * 1.5) + except (TypeError, ValueError): + pass def save(self, path): with open(path, "w") as f: @@ -188,5 +328,33 @@ def get_stats(self): "current_epsilon": round(self.epsilon, 4), "memory_size": len(self.memory), "regimes_learned": list(self.regime_bonuses.keys()), + "pattern_memory_keys": list(self.pattern_memory.keys()), "weight_magnitude": round(float(np.mean(np.abs(self.weights))), 4), } + + def explain_action(self, state: np.ndarray, action_idx: int, regime: str = "unknown") -> str: + """Human-readable explanation of why an action is preferred.""" + state = self._sanitize_state(state) + if not (0 <= action_idx < self.n_actions): + return f"Q-explain [{regime}] invalid-action={action_idx}" + + weights = self.weights[action_idx] + contrib = weights * state + top_idx = np.argsort(np.abs(contrib))[-3:][::-1] + top_features = ", ".join([ + f"{FEATURE_NAMES[int(i)]}:{contrib[i]:+.3f}" + if int(i) < len(FEATURE_NAMES) else f"f{int(i)}:{contrib[i]:+.3f}" + for i in top_idx + ]) + + q_raw = float(self.weights[action_idx] @ state + self.bias[action_idx]) + q_mem = float(self._pattern_bias(state, regime)[action_idx]) + + mem_txt = "no-memory" + mem = self.pattern_memory.get(regime, {}).get(str(action_idx)) + if mem: + mem_txt = ( + f"memory wr={mem.get('win_rate', 0.0):.2f} " + f"r={mem.get('avg_reward', 0.0):+.2f} n={mem.get('count', 0)}" + ) + return f"Q-explain[{regime}] a={action_idx} q={q_raw:+.3f} mem={q_mem:+.3f} top={top_features} | {mem_txt}" diff --git a/darwin_agent/ml/outcome_model.py b/darwin_agent/ml/outcome_model.py new file mode 100644 index 0000000..a0c2b9a --- /dev/null +++ b/darwin_agent/ml/outcome_model.py @@ -0,0 +1,80 @@ +"""Online trade outcome model for win-probability estimation.""" + +from dataclasses import dataclass +from typing import Dict, Any + +import numpy as np + + +@dataclass +class OnlineTradeOutcomeModel: + n_features: int + learning_rate: float = 0.02 + l2: float = 1e-4 + + def __post_init__(self): + self.weights = np.zeros(self.n_features, dtype=np.float32) + self.bias = 0.0 + self.samples = 0 + self.wins = 0 + + def _sanitize(self, x: np.ndarray) -> np.ndarray: + arr = np.array(x, dtype=np.float32).reshape(-1) + if arr.shape[0] != self.n_features: + fixed = np.zeros(self.n_features, dtype=np.float32) + n = min(self.n_features, arr.shape[0]) + if n > 0: + fixed[:n] = arr[:n] + arr = fixed + arr = np.nan_to_num(arr, nan=0.0, posinf=5.0, neginf=-5.0) + return np.clip(arr, -5.0, 5.0) + + @staticmethod + def _sigmoid(z: float) -> float: + z = float(np.clip(z, -30.0, 30.0)) + return float(1.0 / (1.0 + np.exp(-z))) + + def predict_proba(self, x: np.ndarray) -> float: + x = self._sanitize(x) + return self._sigmoid(np.dot(self.weights, x) + self.bias) + + def update(self, x: np.ndarray, win_label: int, sample_weight: float = 1.0): + x = self._sanitize(x) + y = 1.0 if win_label else 0.0 + p = self.predict_proba(x) + err = (p - y) * max(0.2, min(2.0, float(sample_weight))) + self.weights -= self.learning_rate * (err * x + self.l2 * self.weights) + self.bias -= self.learning_rate * err + self.samples += 1 + self.wins += int(y) + + def get_stats(self) -> Dict[str, Any]: + return { + "samples": self.samples, + "win_rate": round(self.wins / max(1, self.samples), 4), + "weight_magnitude": round(float(np.mean(np.abs(self.weights))), 5), + } + + def export(self) -> Dict[str, Any]: + return { + "n_features": self.n_features, + "learning_rate": self.learning_rate, + "l2": self.l2, + "weights": self.weights.tolist(), + "bias": self.bias, + "samples": self.samples, + "wins": self.wins, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any], n_features: int): + model = cls(n_features=n_features, + learning_rate=float(data.get("learning_rate", 0.02)), + l2=float(data.get("l2", 1e-4))) + weights = np.array(data.get("weights", []), dtype=np.float32) + if weights.shape[0] == n_features: + model.weights = weights + model.bias = float(data.get("bias", 0.0)) + model.samples = max(0, int(data.get("samples", 0))) + model.wins = max(0, int(data.get("wins", 0))) + return model diff --git a/darwin_agent/ml/selector.py b/darwin_agent/ml/selector.py index 45cfd2b..0719702 100644 --- a/darwin_agent/ml/selector.py +++ b/darwin_agent/ml/selector.py @@ -1,12 +1,15 @@ """Adaptive Strategy Selector — ML Brain decides which strategy to use.""" -import numpy as np -from typing import List, Dict, Optional from dataclasses import dataclass, field from datetime import datetime, timezone -from darwin_agent.ml.brain import QLearningBrain, Action -from darwin_agent.ml.features import FeatureEngineer, MarketFeatures +from typing import Dict, List, Optional + +import numpy as np + from darwin_agent.markets.base import Candle, MarketSignal, TimeFrame +from darwin_agent.ml.brain import Action, QLearningBrain +from darwin_agent.ml.features import FeatureEngineer, MarketFeatures +from darwin_agent.ml.outcome_model import OnlineTradeOutcomeModel from darwin_agent.strategies.base import STRATEGY_REGISTRY @@ -45,6 +48,30 @@ def __init__(self, brain: QLearningBrain): self.last_regime: str = "unknown" self.pending_trade: Optional[Dict] = None + # Extra trading ML layer: online per-(regime,strategy) win-prob model. + self.outcome_models: Dict[str, OnlineTradeOutcomeModel] = {} + + def _model_key(self, regime: str, strategy: str) -> str: + return f"{regime}::{strategy}" + + def _get_outcome_model(self, regime: str, strategy: str) -> OnlineTradeOutcomeModel: + key = self._model_key(regime, strategy) + if key not in self.outcome_models: + self.outcome_models[key] = OnlineTradeOutcomeModel(n_features=self.brain.n_features) + return self.outcome_models[key] + + def _build_ai_reason(self, features: MarketFeatures, action_idx: int, + action: Action, signal: MarketSignal, ml_win_prob: float) -> str: + explanation = self.brain.explain_action(features.features, action_idx, features.regime) + trend = features.features[0] if len(features.features) > 0 else 0.0 + momentum = features.features[6] if len(features.features) > 6 else 0.0 + volatility = features.features[13] if len(features.features) > 13 else 0.0 + regime_hint = f"regime={features.regime} trend={trend:+.2f} mom={momentum:+.2f} vol={volatility:+.2f}" + return ( + f"AI decision: {action.strategy}/{action.sizing} conf={signal.confidence:.2f} " + f"ml_win={ml_win_prob:.2f} | {regime_hint} | {explanation}" + ) + def decide(self, candles: List[Candle], symbol: str, timeframe: TimeFrame, health_pct: float) -> TradeDecision: features = self.feature_engineer.extract(candles, symbol) @@ -76,18 +103,33 @@ def decide(self, candles: List[Candle], symbol: str, reason=f"{action.strategy}: no signal | {features.regime}") fit = self._regime_fit(features.regime, action.strategy) - signal.confidence = signal.confidence * 0.6 + action.confidence * 0.2 + fit * 0.2 + outcome_model = self._get_outcome_model(features.regime, action.strategy) + ml_win_prob = outcome_model.predict_proba(features.features) + + signal.confidence = ( + signal.confidence * 0.45 + + action.confidence * 0.15 + + fit * 0.15 + + ml_win_prob * 0.25 + ) + signal.confidence = float(np.clip(np.nan_to_num(signal.confidence, nan=0.0), 0.0, 1.0)) self.pending_trade = { - "strategy": action.strategy, "sizing": action.sizing, - "regime": features.regime, "entry_time": datetime.now(timezone.utc), - "state": features.features.copy(), "action_idx": action_idx, + "strategy": action.strategy, + "sizing": action.sizing, + "regime": features.regime, + "entry_time": datetime.now(timezone.utc), + "state": features.features.copy(), + "action_idx": action_idx, + "model_key": self._model_key(features.regime, action.strategy), + "ml_win_prob": ml_win_prob, } + reason = self._build_ai_reason(features, action_idx, action, signal, ml_win_prob) + return TradeDecision( should_trade=True, action=action, signal=signal, features=features, - brain_action_idx=action_idx, - reason=f"Brain: {action.strategy}/{action.sizing} | {features.regime} | conf:{signal.confidence:.2f}") + brain_action_idx=action_idx, reason=reason) def report_result(self, pnl_pct: float, health_change: float, new_candles=None, symbol=""): @@ -101,13 +143,29 @@ def report_result(self, pnl_pct: float, health_change: float, dur = 0 strat = "unknown" + regime = self.last_regime + pending_state = self.last_state if self.pending_trade: dur = (datetime.now(timezone.utc) - self.pending_trade["entry_time"]).total_seconds() / 60 strat = self.pending_trade["strategy"] + regime = self.pending_trade.get("regime", regime) + pending_state = self.pending_trade.get("state", pending_state) reward = self.brain.calculate_reward(pnl_pct, health_change, dur, strat) - self.brain.learn(self.last_state, self.last_action, reward, next_state, False, self.last_regime) - self._update_regime(self.last_regime, strat, pnl_pct) + done = strat == "unknown" + self.brain.learn(self.last_state, self.last_action, reward, next_state, done, self.last_regime) + + if strat != "unknown": + self._update_regime(regime, strat, pnl_pct) + try: + model = self._get_outcome_model(regime, strat) + label = 1 if pnl_pct > 0 else 0 + weight = min(2.0, max(0.25, abs(pnl_pct) + 0.25)) + model.update(pending_state, label, sample_weight=weight) + except Exception: + # Keep selector resilient even if ML model update fails. + pass + self.pending_trade = None def report_hold_result(self, candles, symbol): @@ -158,11 +216,15 @@ def get_playbook(self): for regime, s in self.regime_stats.items(): pb[regime] = { "best_strategy": s.best_strategy, - "trades": s.trades, "win_rate": round(s.win_rate, 3), + "trades": s.trades, + "win_rate": round(s.win_rate, 3), "total_pnl": round(s.total_pnl, 2), "strategy_breakdown": { - k: {"trades": v["trades"], "win_rate": round(v.get("win_rate", 0), 3), - "pnl": round(v["total_pnl"], 2)} + k: { + "trades": v["trades"], + "win_rate": round(v.get("win_rate", 0), 3), + "pnl": round(v["total_pnl"], 2), + } for k, v in s.strategy_results.items() } } @@ -172,10 +234,18 @@ def export_for_dna(self): return { "brain": self.brain.export_brain(), "regime_stats": { - r: {"trades": s.trades, "wins": s.wins, "total_pnl": s.total_pnl, - "best_strategy": s.best_strategy, "strategy_results": s.strategy_results} + r: { + "trades": s.trades, + "wins": s.wins, + "total_pnl": s.total_pnl, + "best_strategy": s.best_strategy, + "strategy_results": s.strategy_results, + } for r, s in self.regime_stats.items() - } + }, + "outcome_models": { + k: v.export() for k, v in self.outcome_models.items() + }, } def import_from_dna(self, data, mutation_rate=0.05): @@ -184,8 +254,16 @@ def import_from_dna(self, data, mutation_rate=0.05): if "regime_stats" in data: for r, d in data["regime_stats"].items(): self.regime_stats[r] = RegimeStats( - trades=d.get("trades", 0), wins=d.get("wins", 0), + trades=d.get("trades", 0), + wins=d.get("wins", 0), total_pnl=d.get("total_pnl", 0), best_strategy=d.get("best_strategy", "unknown"), strategy_results=d.get("strategy_results", {}), ) + if "outcome_models" in data and isinstance(data["outcome_models"], dict): + for key, payload in data["outcome_models"].items(): + if not isinstance(key, str) or not isinstance(payload, dict): + continue + self.outcome_models[key] = OnlineTradeOutcomeModel.from_dict( + payload, n_features=self.brain.n_features + )