Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Autonomous evolutionary crypto trading system. AI agents trade USDT perpetual fu
## Features

- **Evolutionary lifecycle** — agents are born, trade, die, and pass learned knowledge to the next generation via DNA encoding
- **Q-Learning brain** — 30-dimensional feature engineering with linear function approximation and experience replay
- **Q-Learning brain** — 30-dimensional feature engineering with linear approximation, experience replay, retrieval-style pattern memory, online trade-outcome ML, and explainable decision traces
- **4 trading strategies** — Momentum (EMA crossover), Mean Reversion (Bollinger), Scalping (VWAP), Breakout (range)
- **Adaptive regime detection** — automatically selects strategies based on market conditions (trending, choppy, ranging)
- **Health system** — HP-based survival mechanic that enforces discipline and kills underperforming agents
Expand Down
186 changes: 177 additions & 9 deletions darwin_agent/ml/brain.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""ML Brain — Q-Learning with linear function approximation."""

import numpy as np
import json
import os
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime
from collections import deque
from darwin_agent.ml.features import N_FEATURES
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple

import numpy as np

from darwin_agent.ml.features import FEATURE_NAMES, N_FEATURES


@dataclass
Expand Down Expand Up @@ -42,7 +43,7 @@ def __init__(self, n_features: int = N_FEATURES, learning_rate: float = 0.01,
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min

self.n_actions = len(self.STRATEGIES) * len(self.SIZINGS) # 15
self.n_actions = len(self.STRATEGIES) * len(self.SIZINGS)
self.weights = np.random.randn(self.n_actions, n_features) * 0.01
self.bias = np.zeros(self.n_actions)

Expand All @@ -53,24 +54,118 @@ def __init__(self, n_features: int = N_FEATURES, learning_rate: float = 0.01,
self.exploitation_count = 0
self.regime_bonuses: Dict[str, np.ndarray] = {}

# LLM-like retrieval memory (online prototypes + quality stats).
self.pattern_memory: Dict[str, Dict[str, Any]] = {}
self.pattern_lr = 0.08
self.pattern_memory_max_regimes = 16

def _sanitize_state(self, state: np.ndarray) -> np.ndarray:
arr = np.array(state, dtype=np.float32).reshape(-1)
if arr.shape[0] != self.n_features:
fixed = np.zeros(self.n_features, dtype=np.float32)
n = min(self.n_features, arr.shape[0])
if n > 0:
fixed[:n] = arr[:n]
arr = fixed
arr = np.nan_to_num(arr, nan=0.0, posinf=5.0, neginf=-5.0)
return np.clip(arr, -5.0, 5.0)

def predict_q(self, state: np.ndarray, regime: str = "unknown") -> np.ndarray:
state = self._sanitize_state(state)
q = self.weights @ state + self.bias
if regime in self.regime_bonuses:
q += self.regime_bonuses[regime]
q += self._pattern_bias(state, regime)
return q

def _pattern_bias(self, state: np.ndarray, regime: str) -> np.ndarray:
"""Return additive Q-bias using learned success prototypes per action."""
bias = np.zeros(self.n_actions)
keys = [regime] if regime in self.pattern_memory else []
if "global" in self.pattern_memory:
keys.append("global")

for key in keys:
mem = self.pattern_memory.get(key, {})
for action_str, stats in mem.items():
try:
action_idx = int(action_str)
except (TypeError, ValueError):
continue
if not (0 <= action_idx < self.n_actions):
continue

centroid = self._sanitize_state(np.array(stats.get("centroid", []), dtype=np.float32))
dist = float(np.linalg.norm(state - centroid))
similarity = 1.0 / (1.0 + dist)
win_rate = float(stats.get("win_rate", 0.5))
reward_avg = float(stats.get("avg_reward", 0.0))
count = max(0.0, float(stats.get("count", 0)))
support = min(1.0, count / 25.0)
memory_signal = ((win_rate - 0.5) * 2.0 + reward_avg * 0.08) * similarity * support
bias[action_idx] += memory_signal
return np.clip(bias, -2.5, 2.5)

def _prune_pattern_memory(self):
if len(self.pattern_memory) <= self.pattern_memory_max_regimes:
return
removable = [k for k in self.pattern_memory.keys() if k != "global"]
if not removable:
return

def score(regime_name: str) -> float:
entries = self.pattern_memory.get(regime_name, {})
return float(sum(float(v.get("count", 0)) for v in entries.values()))

removable.sort(key=score)
to_remove = len(self.pattern_memory) - self.pattern_memory_max_regimes
for regime_name in removable[:to_remove]:
self.pattern_memory.pop(regime_name, None)

def _remember_pattern(self, state, action, reward, regime):
"""Online pattern learner (lightweight memory inspired by LLM-style retrieval)."""
state = self._sanitize_state(state)
keys = [regime, "global"] if regime and regime != "unknown" else ["global"]
for key in keys:
bucket = self.pattern_memory.setdefault(key, {})
entry = bucket.setdefault(str(action), {
"count": 0,
"wins": 0,
"losses": 0,
"avg_reward": 0.0,
"centroid": state.tolist(),
})

count = int(entry["count"]) + 1
wins = int(entry.get("wins", 0)) + (1 if reward > 0 else 0)
losses = int(entry.get("losses", 0)) + (1 if reward <= 0 else 0)
avg_reward = float(entry["avg_reward"]) + (float(reward) - float(entry["avg_reward"])) / count

old_centroid = self._sanitize_state(np.array(entry.get("centroid", state.tolist()), dtype=np.float32))
centroid = (1 - self.pattern_lr) * old_centroid + self.pattern_lr * state

entry["count"] = count
entry["wins"] = wins
entry["losses"] = losses
entry["avg_reward"] = avg_reward
entry["win_rate"] = wins / max(1, count)
entry["centroid"] = centroid.tolist()

self._prune_pattern_memory()

def choose_action(self, state: np.ndarray, regime: str = "unknown",
health_pct: float = 1.0) -> Tuple[int, Action]:
state = self._sanitize_state(state)
self.total_decisions += 1
eff_eps = min(0.5, self.epsilon * 2) if health_pct < 0.3 else self.epsilon

if np.random.random() < eff_eps:
self.exploration_count += 1
if health_pct < 0.4:
safe = self._safe_actions()
idx = np.random.choice(safe)
idx = int(np.random.choice(safe))
else:
idx = np.random.randint(self.n_actions)
idx = int(np.random.randint(self.n_actions))
else:
self.exploitation_count += 1
q = self.predict_q(state, regime)
Expand All @@ -82,10 +177,13 @@ def choose_action(self, state: np.ndarray, regime: str = "unknown",
return idx, self._decode(idx)

def learn(self, state, action, reward, next_state, done, regime="unknown"):
state = self._sanitize_state(state)
next_state = self._sanitize_state(next_state) if next_state is not None else None
self.memory.append(Experience(state=state, action=action, reward=reward,
next_state=next_state, done=done,
metadata={"regime": regime}))
self._update(state, action, reward, next_state, done, regime)
self._remember_pattern(state, action, reward, regime)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip pattern-memory updates for non-filled decisions

The new retrieval memory is updated on every learn() call, including report_result(0, 0) invocations used for rejected (unfilled) trades. Because reward shaping adds a positive baseline (calculate_reward), these rejected decisions can be recorded as wins in _remember_pattern, which then feeds back into _pattern_bias and reinforces actions that were never actually executed, corrupting the memory signal used during action selection.

Useful? React with 👍 / 👎.

if len(self.memory) >= self.batch_size:
self._replay()
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
Expand Down Expand Up @@ -144,12 +242,47 @@ def export_brain(self):
"weights": self.weights.tolist(), "bias": self.bias.tolist(),
"epsilon": self.epsilon,
"regime_bonuses": {k: v.tolist() for k, v in self.regime_bonuses.items()},
"pattern_memory": self.pattern_memory,
"total_decisions": self.total_decisions,
"exploration_count": self.exploration_count,
"exploitation_count": self.exploitation_count,
"n_features": self.n_features, "n_actions": self.n_actions,
}

def _sanitize_pattern_memory(self, data: Any) -> Dict[str, Dict[str, Any]]:
if not isinstance(data, dict):
return {}

clean: Dict[str, Dict[str, Any]] = {}
for regime_name, entries in data.items():
if not isinstance(regime_name, str) or not isinstance(entries, dict):
continue
clean_entries: Dict[str, Any] = {}
for action_key, stats in entries.items():
try:
action_idx = int(action_key)
except (TypeError, ValueError):
continue
if not (0 <= action_idx < self.n_actions) or not isinstance(stats, dict):
continue
centroid = self._sanitize_state(np.array(stats.get("centroid", []), dtype=np.float32)).tolist()
count = max(0, int(stats.get("count", 0)))
wins = max(0, int(stats.get("wins", 0)))
losses = max(0, int(stats.get("losses", 0)))
avg_reward = float(stats.get("avg_reward", 0.0))
win_rate = float(stats.get("win_rate", wins / max(1, count)))
clean_entries[str(action_idx)] = {
"count": count,
"wins": wins,
"losses": losses,
"avg_reward": avg_reward,
"win_rate": max(0.0, min(1.0, win_rate)),
"centroid": centroid,
}
if clean_entries:
clean[regime_name] = clean_entries
return clean

def import_brain(self, data, mutation_rate=0.05):
if "weights" in data:
w = np.array(data["weights"])
Expand All @@ -161,13 +294,20 @@ def import_brain(self, data, mutation_rate=0.05):
ma = min(w.shape[0], self.weights.shape[0])
mf = min(w.shape[1], self.weights.shape[1])
self.weights[:ma, :mf] = w[:ma, :mf]
mb = min(b.shape[0], self.bias.shape[0])
self.bias[:mb] = b[:mb]
if "regime_bonuses" in data:
for r, b in data["regime_bonuses"].items():
arr = np.array(b)
if len(arr) == self.n_actions:
self.regime_bonuses[r] = arr
if "pattern_memory" in data:
self.pattern_memory = self._sanitize_pattern_memory(data["pattern_memory"])
if "epsilon" in data:
self.epsilon = min(0.3, data["epsilon"] * 1.5)
try:
self.epsilon = min(0.3, float(data["epsilon"]) * 1.5)
except (TypeError, ValueError):
pass

def save(self, path):
with open(path, "w") as f:
Expand All @@ -188,5 +328,33 @@ def get_stats(self):
"current_epsilon": round(self.epsilon, 4),
"memory_size": len(self.memory),
"regimes_learned": list(self.regime_bonuses.keys()),
"pattern_memory_keys": list(self.pattern_memory.keys()),
"weight_magnitude": round(float(np.mean(np.abs(self.weights))), 4),
}

def explain_action(self, state: np.ndarray, action_idx: int, regime: str = "unknown") -> str:
"""Human-readable explanation of why an action is preferred."""
state = self._sanitize_state(state)
if not (0 <= action_idx < self.n_actions):
return f"Q-explain [{regime}] invalid-action={action_idx}"

weights = self.weights[action_idx]
contrib = weights * state
top_idx = np.argsort(np.abs(contrib))[-3:][::-1]
top_features = ", ".join([
f"{FEATURE_NAMES[int(i)]}:{contrib[i]:+.3f}"
if int(i) < len(FEATURE_NAMES) else f"f{int(i)}:{contrib[i]:+.3f}"
for i in top_idx
])

q_raw = float(self.weights[action_idx] @ state + self.bias[action_idx])
q_mem = float(self._pattern_bias(state, regime)[action_idx])

mem_txt = "no-memory"
mem = self.pattern_memory.get(regime, {}).get(str(action_idx))
if mem:
mem_txt = (
f"memory wr={mem.get('win_rate', 0.0):.2f} "
f"r={mem.get('avg_reward', 0.0):+.2f} n={mem.get('count', 0)}"
)
return f"Q-explain[{regime}] a={action_idx} q={q_raw:+.3f} mem={q_mem:+.3f} top={top_features} | {mem_txt}"
80 changes: 80 additions & 0 deletions darwin_agent/ml/outcome_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Online trade outcome model for win-probability estimation."""

from dataclasses import dataclass
from typing import Dict, Any

import numpy as np


@dataclass
class OnlineTradeOutcomeModel:
n_features: int
learning_rate: float = 0.02
l2: float = 1e-4

def __post_init__(self):
self.weights = np.zeros(self.n_features, dtype=np.float32)
self.bias = 0.0
self.samples = 0
self.wins = 0

def _sanitize(self, x: np.ndarray) -> np.ndarray:
arr = np.array(x, dtype=np.float32).reshape(-1)
if arr.shape[0] != self.n_features:
fixed = np.zeros(self.n_features, dtype=np.float32)
n = min(self.n_features, arr.shape[0])
if n > 0:
fixed[:n] = arr[:n]
arr = fixed
arr = np.nan_to_num(arr, nan=0.0, posinf=5.0, neginf=-5.0)
return np.clip(arr, -5.0, 5.0)

@staticmethod
def _sigmoid(z: float) -> float:
z = float(np.clip(z, -30.0, 30.0))
return float(1.0 / (1.0 + np.exp(-z)))

def predict_proba(self, x: np.ndarray) -> float:
x = self._sanitize(x)
return self._sigmoid(np.dot(self.weights, x) + self.bias)

def update(self, x: np.ndarray, win_label: int, sample_weight: float = 1.0):
x = self._sanitize(x)
y = 1.0 if win_label else 0.0
p = self.predict_proba(x)
err = (p - y) * max(0.2, min(2.0, float(sample_weight)))
self.weights -= self.learning_rate * (err * x + self.l2 * self.weights)
self.bias -= self.learning_rate * err
self.samples += 1
self.wins += int(y)

def get_stats(self) -> Dict[str, Any]:
return {
"samples": self.samples,
"win_rate": round(self.wins / max(1, self.samples), 4),
"weight_magnitude": round(float(np.mean(np.abs(self.weights))), 5),
}

def export(self) -> Dict[str, Any]:
return {
"n_features": self.n_features,
"learning_rate": self.learning_rate,
"l2": self.l2,
"weights": self.weights.tolist(),
"bias": self.bias,
"samples": self.samples,
"wins": self.wins,
}

@classmethod
def from_dict(cls, data: Dict[str, Any], n_features: int):
model = cls(n_features=n_features,
learning_rate=float(data.get("learning_rate", 0.02)),
l2=float(data.get("l2", 1e-4)))
weights = np.array(data.get("weights", []), dtype=np.float32)
if weights.shape[0] == n_features:
model.weights = weights
model.bias = float(data.get("bias", 0.0))
model.samples = max(0, int(data.get("samples", 0)))
model.wins = max(0, int(data.get("wins", 0)))
return model
Loading