Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/openclaw/concentration_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
_AUTO_REDUCE_THRESHOLD: float = 0.40 # 超過 40%:自動核准減倉
_WARN_THRESHOLD: float = 0.25 # 超過 25%:生成待審 proposal
_TARGET_WEIGHT: float = 0.20 # 目標降至 20%(與 risk_engine max_symbol_weight 對齊)
_STALE_ORDER_SEC: int = 360 # 超過 6 分鐘的 submitted 賣單視為 stale
_STALE_ORDER_SEC: int = 3600 # 超過 1 小時的賣單視為 stale(#483: was 360s)
_MAX_DAILY_SELL_ORDERS: int = 3 # 同一 symbol 每日最多產生 3 筆 concentration sell(#483)


class ConcentrationProposal(TypedDict):
Expand Down Expand Up @@ -52,15 +53,16 @@ def check_concentration(
if total_value <= 0:
return []

# Dedup: check pending sell orders per symbol, but only skip if
# the pending sell qty is sufficient to bring weight below target (#385)
# Dedup: check recent sell orders per symbol (submitted + filled),
# only skip if the pending/recent sell qty is sufficient to bring weight below target (#385)
# #483: include 'filled' status + extend window to 1 hour to prevent infinite loop in simulation
stale_cutoff = time.strftime("%Y-%m-%dT%H:%M:%S",
time.gmtime(time.time() - _STALE_ORDER_SEC))
pending_sell_qty: dict[str, int] = {}
try:
for r in conn.execute(
"""SELECT symbol, SUM(qty) FROM orders
WHERE side='sell' AND status='submitted' AND ts_submit > ?
WHERE side='sell' AND status IN ('submitted', 'filled') AND ts_submit > ?
GROUP BY symbol""",
(stale_cutoff,),
).fetchall():
Expand All @@ -69,6 +71,18 @@ def check_concentration(
log.error("Dedup query failed, proceeding WITHOUT dedup — "
"duplicate proposals may be generated: %s", e)

# #483: daily sell cap — count today's filled concentration sells per symbol
daily_sell_count: dict[str, int] = {}
try:
for r in conn.execute(
"""SELECT symbol, COUNT(DISTINCT order_id) FROM orders
WHERE side='sell' AND status='filled' AND date(ts_submit) = date('now')
GROUP BY symbol""",
).fetchall():
daily_sell_count[r[0]] = int(r[1] or 0)
except sqlite3.Error as e:
log.warning("Daily sell count query failed: %s", e)

proposals: list[ConcentrationProposal] = []
for symbol, qty, price in rows:
weight = (qty * (price or 0)) / total_value
Expand All @@ -87,6 +101,13 @@ def check_concentration(
log.info("Concentration %s: %.1f%% — pending sell %d insufficient (would be %.1f%%), generating additional proposal",
symbol, weight * 100, pending_qty, remaining_weight * 100)

# #483: daily cap — skip if already hit max daily sells for this symbol
sym_daily_sells = daily_sell_count.get(symbol, 0)
if sym_daily_sells >= _MAX_DAILY_SELL_ORDERS:
log.info("Concentration %s: %.1f%% — skipped (daily cap %d/%d reached)",
symbol, weight * 100, sym_daily_sells, _MAX_DAILY_SELL_ORDERS)
continue

if locked_symbols and symbol in locked_symbols:
log.warning("Concentration %s: %.1f%% — skipped (locked symbol, sell prohibited)",
symbol, weight * 100)
Expand Down
33 changes: 29 additions & 4 deletions src/openclaw/pnl_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
from __future__ import annotations

import datetime
import logging
import sqlite3
from typing import Tuple

log = logging.getLogger(__name__)

# ── Cost basis ──────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -147,18 +150,20 @@ def sync_positions_table(conn: sqlite3.Connection) -> None:
Does NOT update current_price or unrealized_pnl (requires live feed).
"""
conn.execute("DELETE FROM positions")
# #485: include entry_trading_day = earliest buy order date for current position
if _table_exists(conn, "position_quarantine"):
conn.execute(
"""
INSERT INTO positions (symbol, quantity, avg_price)
INSERT INTO positions (symbol, quantity, avg_price, entry_trading_day)
SELECT
o.symbol,
SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END)
- SUM(CASE WHEN o.side='sell' THEN f.qty ELSE 0 END) AS net_qty,
ROUND(
SUM(CASE WHEN o.side='buy' THEN f.qty * f.price ELSE 0 END)
/ MAX(SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END), 1),
4) AS avg_price
4) AS avg_price,
MIN(CASE WHEN o.side='buy' THEN date(o.ts_submit) END) AS entry_trading_day
FROM orders o
JOIN fills f ON f.order_id = o.order_id
LEFT JOIN position_quarantine q
Expand All @@ -173,15 +178,16 @@ def sync_positions_table(conn: sqlite3.Connection) -> None:
else:
conn.execute(
"""
INSERT INTO positions (symbol, quantity, avg_price)
INSERT INTO positions (symbol, quantity, avg_price, entry_trading_day)
SELECT
o.symbol,
SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END)
- SUM(CASE WHEN o.side='sell' THEN f.qty ELSE 0 END) AS net_qty,
ROUND(
SUM(CASE WHEN o.side='buy' THEN f.qty * f.price ELSE 0 END)
/ MAX(SUM(CASE WHEN o.side='buy' THEN f.qty ELSE 0 END), 1),
4) AS avg_price
4) AS avg_price,
MIN(CASE WHEN o.side='buy' THEN date(o.ts_submit) END) AS entry_trading_day
FROM orders o
JOIN fills f ON f.order_id = o.order_id
WHERE o.status IN ('filled', 'partially_filled')
Expand All @@ -191,6 +197,25 @@ def sync_positions_table(conn: sqlite3.Connection) -> None:
)
conn.commit()

# #485: backfill high_water_mark from eod_prices for positions that have it NULL
_backfill_high_water_mark(conn)


def _backfill_high_water_mark(conn: sqlite3.Connection) -> None:
"""Set high_water_mark from eod_prices max(close) since entry for positions missing it."""
try:
conn.execute(
"""UPDATE positions SET high_water_mark = (
SELECT MAX(e.close) FROM eod_prices e
WHERE e.symbol = positions.symbol
AND e.trade_date >= COALESCE(positions.entry_trading_day, '2000-01-01')
)
WHERE high_water_mark IS NULL AND quantity > 0"""
)
conn.commit()
except sqlite3.Error as e:
log.warning("_backfill_high_water_mark failed: %s", e)


# ── API helpers ──────────────────────────────────────────────────────────────

Expand Down
57 changes: 57 additions & 0 deletions src/openclaw/ticker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def _get_latest_committee_stance(conn: sqlite3.Connection) -> str:
_TRAILING_PCT_BASE: float = float(_os.environ.get("TRAILING_PCT", "0.05")) # Trailing Stop 基礎 5%
_TRAILING_PCT_TIGHT: float = float(_os.environ.get("TRAILING_PCT_TIGHT","0.03")) # 大獲利收緊至 3%
_TRAILING_PROFIT_THRESHOLD: float = 0.50 # 獲利超過 50% 啟用收緊 trailing
# #485: 時間止損參數
_TIME_STOP_DAYS: int = int(_os.environ.get("TIME_STOP_DAYS", "5")) # 持有 >N 個交易日
_TIME_STOP_LOSS_PCT: float = float(_os.environ.get("TIME_STOP_LOSS_PCT", "0.05")) # 未實現虧損 >5% 觸發

# ── DB 連線(直接指向 data/sqlite/trades.db,與前端共用)────────────────────
_DEFAULT_DB = str(_REPO_ROOT / "data" / "sqlite" / "trades.db")
Expand Down Expand Up @@ -1347,6 +1350,29 @@ def run_watcher() -> None:
)
decision_id = str(uuid.uuid4())

# ── #485: 時間止損 — 持有超過 N 天且虧損超過閾值 → 強制 sell ──
if sig == "flat" and pos_entry is not None and avg_price and avg_price > 0:
_unrealized_pct = (cur_close - avg_price) / avg_price
if _unrealized_pct < -_TIME_STOP_LOSS_PCT:
try:
_etd_row = conn.execute(
"SELECT entry_trading_day FROM positions WHERE symbol=?",
(symbol,),
).fetchone()
if _etd_row and _etd_row[0]:
_entry_date = dt.datetime.strptime(_etd_row[0], "%Y-%m-%d").date()
_held_days = (dt.date.today() - _entry_date).days
if _held_days >= _TIME_STOP_DAYS:
sig = "sell"
_agg_meta["time_stop"] = {
"held_days": _held_days,
"unrealized_pct": round(_unrealized_pct * 100, 1),
}
log.warning("[%s] TIME STOP triggered — held %d days, loss %.1f%%",
symbol, _held_days, _unrealized_pct * 100)
except (sqlite3.Error, ValueError) as _ts_err:
log.debug("[%s] time stop check failed: %s", symbol, _ts_err)

# ── Mock 防護:mock 資料禁止開新倉(buy) ────────────────
if data_is_mock and sig == "buy":
_log_trace(conn, symbol=symbol, signal=sig, snap=snap,
Expand Down Expand Up @@ -1435,6 +1461,29 @@ def run_watcher() -> None:
pass
continue

# ── #484: EOD 價格偏離守衛 — buy order 價格交叉驗證 ────────
_EOD_DEVIATION_THRESHOLD = 0.05 # 5%
if result.order.side == "buy":
try:
_eod_row = conn.execute(
"SELECT close FROM eod_prices WHERE symbol=? ORDER BY trade_date DESC LIMIT 1",
(symbol,),
).fetchone()
if _eod_row and _eod_row[0]:
_eod_close = float(_eod_row[0])
_dev = abs(result.order.price - _eod_close) / _eod_close if _eod_close > 0 else 0
if _dev > _EOD_DEVIATION_THRESHOLD:
log.warning("[%s] BUY BLOCKED — price %.2f deviates %.1f%% from EOD close %.2f",
symbol, result.order.price, _dev * 100, _eod_close)
_log_trace(conn, symbol=symbol, signal=sig, snap=snap,
approved=False, reject_code="RISK_PRICE_EOD_DEVIATION",
decision_id=decision_id,
extra_meta={**_agg_meta, "eod_close": _eod_close,
"deviation_pct": round(_dev * 100, 1)})
continue
except sqlite3.Error as _eod_err:
log.warning("[%s] EOD deviation check failed (proceeding): %s", symbol, _eod_err)

# ── 執行核准訂單 ──────────────────────────────────────────
try:
conn.execute("BEGIN IMMEDIATE")
Expand Down Expand Up @@ -1579,6 +1628,14 @@ def run_watcher() -> None:
log.error("[proposals] ROLLBACK failed — DB state may be inconsistent: %s", rb_err)
if sell_intents or n_noted:
log.info("[proposals] Processed %d sell intents, %d noted", len(sell_intents), n_noted)
# #483: sync positions table after proposal sells so concentration_guard
# sees updated qty (prevents infinite loop in simulation mode)
if sell_intents:
try:
sync_positions_table(conn)
log.info("[proposals] positions table synced after %d sell intents", len(sell_intents))
except sqlite3.Error as sync_err:
log.warning("[proposals] sync_positions_table after sells failed: %s", sync_err)
except Exception as pe: # noqa: BLE001 — dynamic import + multi-step pipeline
log.error("[proposals] executor error: %s", pe, exc_info=True)

Expand Down
13 changes: 9 additions & 4 deletions src/tests/test_pnl_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def _conn() -> sqlite3.Connection:
order_id TEXT PRIMARY KEY,
symbol TEXT,
side TEXT,
status TEXT
status TEXT,
ts_submit TEXT
);
CREATE TABLE fills (
fill_id TEXT PRIMARY KEY,
Expand All @@ -33,7 +34,9 @@ def _conn() -> sqlite3.Connection:
CREATE TABLE positions (
symbol TEXT PRIMARY KEY,
quantity INTEGER,
avg_price REAL
avg_price REAL,
entry_trading_day TEXT,
high_water_mark REAL
);
CREATE TABLE daily_pnl_summary (
trade_date TEXT PRIMARY KEY,
Expand All @@ -53,14 +56,16 @@ def _conn() -> sqlite3.Connection:


def _insert_buy(conn, order_id, symbol, qty, price, fee=0):
conn.execute("INSERT INTO orders VALUES (?,?,?,?)", (order_id, symbol, "buy", "filled"))
conn.execute("INSERT INTO orders VALUES (?,?,?,?,?)",
(order_id, symbol, "buy", "filled", "2026-03-01T00:00:00+00:00"))
conn.execute("INSERT INTO fills VALUES (?,?,?,?,?,?)",
(f"f_{order_id}", order_id, qty, price, fee, 0))
conn.commit()


def _insert_sell(conn, order_id, symbol, qty, price, fee=0, tax=0):
conn.execute("INSERT INTO orders VALUES (?,?,?,?)", (order_id, symbol, "sell", "filled"))
conn.execute("INSERT INTO orders VALUES (?,?,?,?,?)",
(order_id, symbol, "sell", "filled", "2026-03-02T00:00:00+00:00"))
conn.execute("INSERT INTO fills VALUES (?,?,?,?,?,?)",
(f"f_{order_id}", order_id, qty, price, fee, tax))
conn.commit()
Expand Down
6 changes: 4 additions & 2 deletions src/tests/test_position_quarantine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def make_db() -> sqlite3.Connection:
avg_price REAL,
current_price REAL,
unrealized_pnl REAL,
state TEXT
state TEXT,
entry_trading_day TEXT,
high_water_mark REAL
);
CREATE TABLE orders (
order_id TEXT PRIMARY KEY,
Expand All @@ -48,7 +50,7 @@ def make_db() -> sqlite3.Connection:
);
"""
)
conn.execute("INSERT INTO positions VALUES ('2330', 100, 500.0, 510.0, 1000.0, 'HOLDING')")
conn.execute("INSERT INTO positions VALUES ('2330', 100, 500.0, 510.0, 1000.0, 'HOLDING', NULL, NULL)")
return conn


Expand Down
Loading