From b472c460113150b01210664815dd5fcebb8dbe15 Mon Sep 17 00:00:00 2001 From: cct0831 Date: Fri, 27 Mar 2026 21:58:15 +0800 Subject: [PATCH 1/2] fix/issue-483: concentration_guard infinite loop, HWM backfill, time stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit concentration_guard.py (#483): - Extend dedup window from 360s to 3600s and include filled orders - Add daily sell cap of 3 per symbol to prevent infinite loop ticker_watcher.py (#484, #485): - Add EOD price deviation guard blocking buys >5% from last close - Sync positions table after proposal sells - Add time-based stop loss: held >5d + loss >5% → force sell pnl_engine.py (#485): - Include entry_trading_day in sync_positions_table - Add _backfill_high_water_mark from eod_prices Fixes #483, Fixes #484, Fixes #485 Branch: fix/483-484-485-strategy-bugs Co-Authored-By: Claude Opus 4.6 (1M context) --- src/openclaw/concentration_guard.py | 29 +++++++++++++-- src/openclaw/pnl_engine.py | 30 +++++++++++++-- src/openclaw/ticker_watcher.py | 57 +++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 8 deletions(-) diff --git a/src/openclaw/concentration_guard.py b/src/openclaw/concentration_guard.py index bfeb60e..677766f 100644 --- a/src/openclaw/concentration_guard.py +++ b/src/openclaw/concentration_guard.py @@ -19,7 +19,8 @@ _AUTO_REDUCE_THRESHOLD: float = 0.40 # 超過 40%:自動核准減倉 _WARN_THRESHOLD: float = 0.25 # 超過 25%:生成待審 proposal _TARGET_WEIGHT: float = 0.20 # 目標降至 20%(與 risk_engine max_symbol_weight 對齊) -_STALE_ORDER_SEC: int = 360 # 超過 6 分鐘的 submitted 賣單視為 stale +_STALE_ORDER_SEC: int = 3600 # 超過 1 小時的賣單視為 stale(#483: was 360s) +_MAX_DAILY_SELL_ORDERS: int = 3 # 同一 symbol 每日最多產生 3 筆 concentration sell(#483) class ConcentrationProposal(TypedDict): @@ -52,15 +53,16 @@ def check_concentration( if total_value <= 0: return [] - # Dedup: check pending sell orders per symbol, but only skip if - # the pending sell qty is sufficient to bring weight below target (#385) + # Dedup: check recent sell orders per symbol (submitted + filled), + # only skip if the pending/recent sell qty is sufficient to bring weight below target (#385) + # #483: include 'filled' status + extend window to 1 hour to prevent infinite loop in simulation stale_cutoff = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(time.time() - _STALE_ORDER_SEC)) pending_sell_qty: dict[str, int] = {} try: for r in conn.execute( """SELECT symbol, SUM(qty) FROM orders - WHERE side='sell' AND status='submitted' AND ts_submit > ? + WHERE side='sell' AND status IN ('submitted', 'filled') AND ts_submit > ? GROUP BY symbol""", (stale_cutoff,), ).fetchall(): @@ -69,6 +71,18 @@ def check_concentration( log.error("Dedup query failed, proceeding WITHOUT dedup — " "duplicate proposals may be generated: %s", e) + # #483: daily sell cap — count today's filled concentration sells per symbol + daily_sell_count: dict[str, int] = {} + try: + for r in conn.execute( + """SELECT symbol, COUNT(DISTINCT order_id) FROM orders + WHERE side='sell' AND status='filled' AND date(ts_submit) = date('now') + GROUP BY symbol""", + ).fetchall(): + daily_sell_count[r[0]] = int(r[1] or 0) + except sqlite3.Error as e: + log.warning("Daily sell count query failed: %s", e) + proposals: list[ConcentrationProposal] = [] for symbol, qty, price in rows: weight = (qty * (price or 0)) / total_value @@ -87,6 +101,13 @@ def check_concentration( log.info("Concentration %s: %.1f%% — pending sell %d insufficient (would be %.1f%%), generating additional proposal", symbol, weight * 100, pending_qty, remaining_weight * 100) + # #483: daily cap — skip if already hit max daily sells for this symbol + sym_daily_sells = daily_sell_count.get(symbol, 0) + if sym_daily_sells >= _MAX_DAILY_SELL_ORDERS: + log.info("Concentration %s: %.1f%% — skipped (daily cap %d/%d reached)", + symbol, weight * 100, sym_daily_sells, _MAX_DAILY_SELL_ORDERS) + continue + if locked_symbols and symbol in locked_symbols: log.warning("Concentration %s: %.1f%% — skipped (locked symbol, sell prohibited)", symbol, weight * 100) diff --git a/src/openclaw/pnl_engine.py b/src/openclaw/pnl_engine.py index 86ad40d..d217e9c 100644 --- a/src/openclaw/pnl_engine.py +++ b/src/openclaw/pnl_engine.py @@ -147,10 +147,11 @@ def sync_positions_table(conn: sqlite3.Connection) -> None: Does NOT update current_price or unrealized_pnl (requires live feed). """ conn.execute("DELETE FROM positions") + # #485: include entry_trading_day = earliest buy order date for current position if _table_exists(conn, "position_quarantine"): conn.execute( """ - INSERT INTO positions (symbol, quantity, avg_price) + INSERT INTO positions (symbol, quantity, avg_price, entry_trading_day) SELECT o.symbol, SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END) @@ -158,7 +159,8 @@ def sync_positions_table(conn: sqlite3.Connection) -> None: ROUND( SUM(CASE WHEN o.side='buy' THEN f.qty * f.price ELSE 0 END) / MAX(SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END), 1), - 4) AS avg_price + 4) AS avg_price, + MIN(CASE WHEN o.side='buy' THEN date(o.ts_submit) END) AS entry_trading_day FROM orders o JOIN fills f ON f.order_id = o.order_id LEFT JOIN position_quarantine q @@ -173,7 +175,7 @@ def sync_positions_table(conn: sqlite3.Connection) -> None: else: conn.execute( """ - INSERT INTO positions (symbol, quantity, avg_price) + INSERT INTO positions (symbol, quantity, avg_price, entry_trading_day) SELECT o.symbol, SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END) @@ -181,7 +183,8 @@ def sync_positions_table(conn: sqlite3.Connection) -> None: ROUND( SUM(CASE WHEN o.side='buy' THEN f.qty * f.price ELSE 0 END) / MAX(SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END), 1), - 4) AS avg_price + 4) AS avg_price, + MIN(CASE WHEN o.side='buy' THEN date(o.ts_submit) END) AS entry_trading_day FROM orders o JOIN fills f ON f.order_id = o.order_id WHERE o.status IN ('filled', 'partially_filled') @@ -191,6 +194,25 @@ def sync_positions_table(conn: sqlite3.Connection) -> None: ) conn.commit() + # #485: backfill high_water_mark from eod_prices for positions that have it NULL + _backfill_high_water_mark(conn) + + +def _backfill_high_water_mark(conn: sqlite3.Connection) -> None: + """Set high_water_mark from eod_prices max(close) since entry for positions missing it.""" + try: + conn.execute( + """UPDATE positions SET high_water_mark = ( + SELECT MAX(e.close) FROM eod_prices e + WHERE e.symbol = positions.symbol + AND e.trade_date >= COALESCE(positions.entry_trading_day, '2000-01-01') + ) + WHERE high_water_mark IS NULL AND quantity > 0""" + ) + conn.commit() + except sqlite3.Error as e: + log.warning("_backfill_high_water_mark failed: %s", e) + # ── API helpers ────────────────────────────────────────────────────────────── diff --git a/src/openclaw/ticker_watcher.py b/src/openclaw/ticker_watcher.py index a89f6a2..b7eb6b6 100644 --- a/src/openclaw/ticker_watcher.py +++ b/src/openclaw/ticker_watcher.py @@ -108,6 +108,9 @@ def _get_latest_committee_stance(conn: sqlite3.Connection) -> str: _TRAILING_PCT_BASE: float = float(_os.environ.get("TRAILING_PCT", "0.05")) # Trailing Stop 基礎 5% _TRAILING_PCT_TIGHT: float = float(_os.environ.get("TRAILING_PCT_TIGHT","0.03")) # 大獲利收緊至 3% _TRAILING_PROFIT_THRESHOLD: float = 0.50 # 獲利超過 50% 啟用收緊 trailing +# #485: 時間止損參數 +_TIME_STOP_DAYS: int = int(_os.environ.get("TIME_STOP_DAYS", "5")) # 持有 >N 個交易日 +_TIME_STOP_LOSS_PCT: float = float(_os.environ.get("TIME_STOP_LOSS_PCT", "0.05")) # 未實現虧損 >5% 觸發 # ── DB 連線(直接指向 data/sqlite/trades.db,與前端共用)──────────────────── _DEFAULT_DB = str(_REPO_ROOT / "data" / "sqlite" / "trades.db") @@ -1347,6 +1350,29 @@ def run_watcher() -> None: ) decision_id = str(uuid.uuid4()) + # ── #485: 時間止損 — 持有超過 N 天且虧損超過閾值 → 強制 sell ── + if sig == "flat" and pos_entry is not None and avg_price and avg_price > 0: + _unrealized_pct = (cur_close - avg_price) / avg_price + if _unrealized_pct < -_TIME_STOP_LOSS_PCT: + try: + _etd_row = conn.execute( + "SELECT entry_trading_day FROM positions WHERE symbol=?", + (symbol,), + ).fetchone() + if _etd_row and _etd_row[0]: + _entry_date = dt.datetime.strptime(_etd_row[0], "%Y-%m-%d").date() + _held_days = (dt.date.today() - _entry_date).days + if _held_days >= _TIME_STOP_DAYS: + sig = "sell" + _agg_meta["time_stop"] = { + "held_days": _held_days, + "unrealized_pct": round(_unrealized_pct * 100, 1), + } + log.warning("[%s] TIME STOP triggered — held %d days, loss %.1f%%", + symbol, _held_days, _unrealized_pct * 100) + except (sqlite3.Error, ValueError) as _ts_err: + log.debug("[%s] time stop check failed: %s", symbol, _ts_err) + # ── Mock 防護:mock 資料禁止開新倉(buy) ──────────────── if data_is_mock and sig == "buy": _log_trace(conn, symbol=symbol, signal=sig, snap=snap, @@ -1435,6 +1461,29 @@ def run_watcher() -> None: pass continue + # ── #484: EOD 價格偏離守衛 — buy order 價格交叉驗證 ──────── + _EOD_DEVIATION_THRESHOLD = 0.05 # 5% + if result.order.side == "buy": + try: + _eod_row = conn.execute( + "SELECT close FROM eod_prices WHERE symbol=? ORDER BY trade_date DESC LIMIT 1", + (symbol,), + ).fetchone() + if _eod_row and _eod_row[0]: + _eod_close = float(_eod_row[0]) + _dev = abs(result.order.price - _eod_close) / _eod_close if _eod_close > 0 else 0 + if _dev > _EOD_DEVIATION_THRESHOLD: + log.warning("[%s] BUY BLOCKED — price %.2f deviates %.1f%% from EOD close %.2f", + symbol, result.order.price, _dev * 100, _eod_close) + _log_trace(conn, symbol=symbol, signal=sig, snap=snap, + approved=False, reject_code="RISK_PRICE_EOD_DEVIATION", + decision_id=decision_id, + extra_meta={**_agg_meta, "eod_close": _eod_close, + "deviation_pct": round(_dev * 100, 1)}) + continue + except sqlite3.Error as _eod_err: + log.warning("[%s] EOD deviation check failed (proceeding): %s", symbol, _eod_err) + # ── 執行核准訂單 ────────────────────────────────────────── try: conn.execute("BEGIN IMMEDIATE") @@ -1579,6 +1628,14 @@ def run_watcher() -> None: log.error("[proposals] ROLLBACK failed — DB state may be inconsistent: %s", rb_err) if sell_intents or n_noted: log.info("[proposals] Processed %d sell intents, %d noted", len(sell_intents), n_noted) + # #483: sync positions table after proposal sells so concentration_guard + # sees updated qty (prevents infinite loop in simulation mode) + if sell_intents: + try: + sync_positions_table(conn) + log.info("[proposals] positions table synced after %d sell intents", len(sell_intents)) + except sqlite3.Error as sync_err: + log.warning("[proposals] sync_positions_table after sells failed: %s", sync_err) except Exception as pe: # noqa: BLE001 — dynamic import + multi-step pipeline log.error("[proposals] executor error: %s", pe, exc_info=True) From c186d544f1b3c75aabae926b9c53aa0f8316f041 Mon Sep 17 00:00:00 2001 From: cct0831 Date: Fri, 27 Mar 2026 22:07:38 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix/issue-485:=20update=20test=20fixtures?= =?UTF-8?q?=20=E2=80=94=20add=20entry=5Ftrading=5Fday,=20high=5Fwater=5Fma?= =?UTF-8?q?rk=20columns=20and=20logger?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - test_pnl_engine: add ts_submit to orders, entry_trading_day + high_water_mark to positions - test_position_quarantine: add entry_trading_day + high_water_mark columns, fix positional INSERT - pnl_engine: add missing logger for _backfill_high_water_mark warning Fixes #485 Branch: fix/483-484-485-strategy-bugs Co-Authored-By: Claude Opus 4.6 (1M context) --- src/openclaw/pnl_engine.py | 3 +++ src/tests/test_pnl_engine.py | 13 +++++++++---- src/tests/test_position_quarantine.py | 6 ++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/openclaw/pnl_engine.py b/src/openclaw/pnl_engine.py index d217e9c..b595580 100644 --- a/src/openclaw/pnl_engine.py +++ b/src/openclaw/pnl_engine.py @@ -15,9 +15,12 @@ from __future__ import annotations import datetime +import logging import sqlite3 from typing import Tuple +log = logging.getLogger(__name__) + # ── Cost basis ────────────────────────────────────────────────────────────── diff --git a/src/tests/test_pnl_engine.py b/src/tests/test_pnl_engine.py index c0f5a96..f93e81e 100644 --- a/src/tests/test_pnl_engine.py +++ b/src/tests/test_pnl_engine.py @@ -20,7 +20,8 @@ def _conn() -> sqlite3.Connection: order_id TEXT PRIMARY KEY, symbol TEXT, side TEXT, - status TEXT + status TEXT, + ts_submit TEXT ); CREATE TABLE fills ( fill_id TEXT PRIMARY KEY, @@ -33,7 +34,9 @@ def _conn() -> sqlite3.Connection: CREATE TABLE positions ( symbol TEXT PRIMARY KEY, quantity INTEGER, - avg_price REAL + avg_price REAL, + entry_trading_day TEXT, + high_water_mark REAL ); CREATE TABLE daily_pnl_summary ( trade_date TEXT PRIMARY KEY, @@ -53,14 +56,16 @@ def _conn() -> sqlite3.Connection: def _insert_buy(conn, order_id, symbol, qty, price, fee=0): - conn.execute("INSERT INTO orders VALUES (?,?,?,?)", (order_id, symbol, "buy", "filled")) + conn.execute("INSERT INTO orders VALUES (?,?,?,?,?)", + (order_id, symbol, "buy", "filled", "2026-03-01T00:00:00+00:00")) conn.execute("INSERT INTO fills VALUES (?,?,?,?,?,?)", (f"f_{order_id}", order_id, qty, price, fee, 0)) conn.commit() def _insert_sell(conn, order_id, symbol, qty, price, fee=0, tax=0): - conn.execute("INSERT INTO orders VALUES (?,?,?,?)", (order_id, symbol, "sell", "filled")) + conn.execute("INSERT INTO orders VALUES (?,?,?,?,?)", + (order_id, symbol, "sell", "filled", "2026-03-02T00:00:00+00:00")) conn.execute("INSERT INTO fills VALUES (?,?,?,?,?,?)", (f"f_{order_id}", order_id, qty, price, fee, tax)) conn.commit() diff --git a/src/tests/test_position_quarantine.py b/src/tests/test_position_quarantine.py index f77c5f7..65261e7 100644 --- a/src/tests/test_position_quarantine.py +++ b/src/tests/test_position_quarantine.py @@ -23,7 +23,9 @@ def make_db() -> sqlite3.Connection: avg_price REAL, current_price REAL, unrealized_pnl REAL, - state TEXT + state TEXT, + entry_trading_day TEXT, + high_water_mark REAL ); CREATE TABLE orders ( order_id TEXT PRIMARY KEY, @@ -48,7 +50,7 @@ def make_db() -> sqlite3.Connection: ); """ ) - conn.execute("INSERT INTO positions VALUES ('2330', 100, 500.0, 510.0, 1000.0, 'HOLDING')") + conn.execute("INSERT INTO positions VALUES ('2330', 100, 500.0, 510.0, 1000.0, 'HOLDING', NULL, NULL)") return conn