Skip to content

Commit b96e16c

Browse files
committed
fix: resolve DuckDB lock conflicts with guard, read-only connections, and degraded mode
Three-part fix for the recurring duckdb.IOException when multiple processes try to open trader.duckdb in write mode: 1. Lock guard (db.py): _connect_with_lock_guard() distinguishes stale locks (dead PID → retry 10 s) from live conflicts (alive PID → immediate RuntimeError with exact `kill <PID>` command). open_db_readonly() added for processes that only need reads — multiple processes can hold read-only connections simultaneously without conflicting with the write owner. 2. MCP server degraded mode (mcp/server.py): lifespan() catches lock RuntimeError and falls back to :memory: context. Analysis tools (run_analysis, get_regime, get_system_status) keep working; 22 DB-backed tools return {"success": false, "degraded_mode": true} instead of crashing the Claude session. 3. FastAPI read-only (api/server.py): all GET endpoints now use get_portfolio_state_readonly() and get_decision_log_readonly() — the API server no longer competes with the MCP server for the write lock. Also adds get_portfolio_state_readonly() (portfolio_state.py) and get_decision_log_readonly() (decision_log.py) singletons, 12 new tests in tests/quant_pod/test_db_lock_guard.py (all pass), and updates packages/quant_pod/README.md with the multi-process connection model.
1 parent 421f45c commit b96e16c

File tree

7 files changed

+531
-43
lines changed

7 files changed

+531
-43
lines changed

packages/quant_pod/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,23 @@ All runtime state lives in a single ACID-safe DuckDB file. No scattered pickle f
103103
| `strategies` | Strategy registry |
104104
| `regime_strategy_matrix` | Regime → allocation weights |
105105

106+
### Multi-process access model
107+
108+
DuckDB enforces one write connection per file across OS processes.
109+
110+
| Process | Connection type | Function |
111+
|---------|----------------|---------|
112+
| **MCP server** | Write (owner) | `open_db()` — runs migrations, holds lock |
113+
| **FastAPI server** | Read-only | `open_db_readonly()` — GET endpoints only |
114+
| **Scripts / scheduler** | Read-only | `open_db_readonly()` |
115+
116+
**Lock conflict handling** (`packages/quant_pod/db.py`):
117+
- *Stale lock* (owning process died): retries for 10 s, then raises with a `.wal` cleanup hint
118+
- *Live conflict* (owner is alive): raises `RuntimeError` immediately with `kill <PID>` command
119+
- *Degraded mode*: if the MCP server encounters a lock on startup, it falls back to an in-memory
120+
context. Analysis tools (`run_analysis`, `get_regime`) still work; portfolio/execution tools
121+
return `{"success": false, "degraded_mode": true}`
122+
106123
## Regime Detection
107124

108125
`regime_detector_ic` uses real ADX and ATR indicators — no LLMs, no stubs:

packages/quant_pod/api/server.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,15 @@
4444
from loguru import logger
4545
from pydantic import BaseModel
4646

47-
from quant_pod.audit.decision_log import extract_indicator_attributions, get_decision_log
47+
from quant_pod.audit.decision_log import (
48+
extract_indicator_attributions,
49+
get_decision_log,
50+
get_decision_log_readonly,
51+
)
4852
from quant_pod.audit.models import AuditQuery
4953
from quant_pod.execution.broker_factory import get_broker, get_broker_mode
5054
from quant_pod.execution.kill_switch import get_kill_switch
51-
from quant_pod.execution.portfolio_state import get_portfolio_state
55+
from quant_pod.execution.portfolio_state import get_portfolio_state, get_portfolio_state_readonly
5256
from quant_pod.learning.calibration import get_calibration_tracker
5357
from quant_pod.monitoring.metrics import (
5458
get_metrics_content_type,
@@ -110,7 +114,7 @@ class ResetRequest(BaseModel):
110114
def health() -> dict[str, Any]:
111115
"""Health check — returns service status and kill switch state."""
112116
kill = get_kill_switch()
113-
portfolio = get_portfolio_state()
117+
portfolio = get_portfolio_state_readonly()
114118
snapshot = portfolio.get_snapshot()
115119

116120
return {
@@ -127,15 +131,15 @@ def health() -> dict[str, Any]:
127131
@app.get("/portfolio")
128132
def get_portfolio() -> dict[str, Any]:
129133
"""Current portfolio snapshot."""
130-
portfolio = get_portfolio_state()
134+
portfolio = get_portfolio_state_readonly()
131135
snapshot = portfolio.get_snapshot()
132136
return snapshot.model_dump()
133137

134138

135139
@app.get("/portfolio/positions")
136140
def get_positions() -> list[dict[str, Any]]:
137141
"""Current open positions."""
138-
portfolio = get_portfolio_state()
142+
portfolio = get_portfolio_state_readonly()
139143
return [p.model_dump() for p in portfolio.get_positions()]
140144

141145

@@ -219,7 +223,7 @@ def get_audit(
219223
limit: int = Query(default=50, le=500),
220224
) -> list[dict[str, Any]]:
221225
"""Recent audit log entries."""
222-
log = get_decision_log()
226+
log = get_decision_log_readonly()
223227
events = log.query(
224228
AuditQuery(
225229
symbol=symbol,
@@ -247,7 +251,7 @@ def get_audit(
247251
@app.get("/audit/{event_id}/trace")
248252
def get_audit_trace(event_id: str) -> list[dict[str, Any]]:
249253
"""Full decision trace for a specific event."""
250-
log = get_decision_log()
254+
log = get_decision_log_readonly()
251255
trace = log.get_decision_trace(event_id)
252256
if not trace:
253257
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
@@ -264,7 +268,7 @@ def get_audit_attribution(event_id: str) -> dict[str, Any]:
264268
- Derived attributions from the market_data_snapshot (as fallback)
265269
- IC dissent signals (ICs that disagreed with the consensus)
266270
"""
267-
log = get_decision_log()
271+
log = get_decision_log_readonly()
268272
log.query(AuditQuery(limit=1)) # Query by event_id directly
269273
# query() doesn't filter by event_id, so fetch via trace (single-element trace = the event itself)
270274
trace = log.get_decision_trace(event_id)
@@ -295,7 +299,7 @@ def get_audit_attribution(event_id: str) -> dict[str, Any]:
295299
@app.get("/audit/session/{session_id}/summary")
296300
def get_session_summary(session_id: str) -> dict[str, Any]:
297301
"""High-level summary of a trading session."""
298-
log = get_decision_log()
302+
log = get_decision_log_readonly()
299303
return log.get_session_summary(session_id)
300304

301305

@@ -438,7 +442,7 @@ def get_heartbeat() -> dict[str, Any]:
438442
439443
Status: "ok" | "stale" | "never_seen"
440444
"""
441-
log = get_decision_log()
445+
log = get_decision_log_readonly()
442446
now = datetime.now()
443447
max_silence_hours = 25 # One session = one trading day + buffer
444448

@@ -499,7 +503,7 @@ def get_pnl_dashboard() -> dict[str, Any]:
499503
- total_equity: cash + positions_value
500504
- recent_trades: last 10 closed trades with P&L
501505
"""
502-
portfolio = get_portfolio_state()
506+
portfolio = get_portfolio_state_readonly()
503507
snapshot = portfolio.get_snapshot()
504508
positions = portfolio.get_positions()
505509

@@ -549,7 +553,7 @@ def get_anomalies() -> dict[str, Any]:
549553
"""
550554
anomalies: list[dict[str, Any]] = []
551555
broker = get_broker()
552-
log = get_decision_log()
556+
log = get_decision_log_readonly()
553557
now = datetime.now()
554558

555559
# ---- 1. Order size anomalies ----------------------------------------
@@ -579,7 +583,7 @@ def get_anomalies() -> dict[str, Any]:
579583
)
580584

581585
# ---- 2. Win rate degradation (uses ClosedTrade records, not fills) -----
582-
portfolio = get_portfolio_state()
586+
portfolio = get_portfolio_state_readonly()
583587
recent_closed = portfolio.conn.execute(
584588
"SELECT realized_pnl FROM closed_trades ORDER BY closed_at DESC LIMIT 20"
585589
).fetchall()
@@ -989,7 +993,7 @@ def prometheus_metrics() -> str:
989993
Also updates NAV and daily P&L gauges on each scrape so they stay current
990994
without requiring a dedicated background task.
991995
"""
992-
portfolio = get_portfolio_state()
996+
portfolio = get_portfolio_state_readonly()
993997
snap = portfolio.get_snapshot()
994998
record_nav(snap.total_equity)
995999
record_daily_pnl(snap.daily_pnl)

packages/quant_pod/audit/decision_log.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,8 +482,34 @@ def _attr(
482482

483483

484484
def get_decision_log(db_path: str | None = None) -> DecisionLog:
485-
"""Get the singleton DecisionLog instance."""
485+
"""Get the singleton DecisionLog instance (write connection)."""
486486
global _decision_log
487487
if _decision_log is None:
488488
_decision_log = DecisionLog(db_path=db_path)
489489
return _decision_log
490+
491+
492+
# Read-only singleton — for processes that must not compete for the write lock.
493+
_decision_log_ro: DecisionLog | None = None
494+
495+
496+
def get_decision_log_readonly() -> DecisionLog:
497+
"""
498+
Get a read-only DecisionLog singleton.
499+
500+
Use this in processes (FastAPI, scripts) that run alongside the MCP server
501+
and only need to QUERY the audit trail. The returned instance cannot
502+
record new events — those calls will raise at runtime.
503+
504+
DecisionLog.__init__ already skips schema creation when a connection is
505+
injected, so no DDL is attempted on the read-only connection.
506+
507+
Raises:
508+
FileNotFoundError: if trader.duckdb doesn't exist yet (MCP server not started).
509+
"""
510+
global _decision_log_ro
511+
if _decision_log_ro is None:
512+
from quant_pod.db import open_db_readonly
513+
514+
_decision_log_ro = DecisionLog(conn=open_db_readonly())
515+
return _decision_log_ro

packages/quant_pod/db.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,38 @@
1212
Schema ownership lives here. Services receive an injected connection;
1313
they do NOT open their own files.
1414
15+
## Connection Model
16+
17+
DuckDB allows only ONE write connection per file across OS processes.
18+
The MCP server is the canonical write owner — it holds the connection for
19+
its entire lifetime. Other processes (FastAPI, scripts) must connect
20+
read-only via open_db_readonly().
21+
22+
### Lock conflict handling
23+
24+
open_db() wraps duckdb.connect() with a lock guard (_connect_with_lock_guard):
25+
26+
- Stale lock (owning process is dead): retries for up to
27+
_STALE_LOCK_RETRY_SECS (10 s) until the OS releases the lock.
28+
29+
- Live conflict (owning process is alive): raises RuntimeError immediately
30+
with the PID and the exact `kill PID` command to resolve it.
31+
32+
The MCP server's lifespan() catches this RuntimeError and falls back to an
33+
in-memory context (degraded mode) so the Claude session is never crashed by
34+
a lock conflict. Analysis tools keep working; tools that need persistent
35+
state return {"success": False, "degraded_mode": True}.
36+
1537
Usage:
16-
# Production
38+
# Production (MCP server — write owner)
1739
from quant_pod.db import open_db, run_migrations
1840
conn = open_db("~/.quant_pod/trader.duckdb")
1941
run_migrations(conn)
2042
43+
# FastAPI / scripts — read-only, no lock competition
44+
from quant_pod.db import open_db_readonly
45+
conn = open_db_readonly() # FileNotFoundError if MCP server not started
46+
2147
# Tests — fully isolated in-memory DB
2248
conn = open_db(":memory:")
2349
run_migrations(conn)
@@ -156,6 +182,72 @@ def reset_connection() -> None:
156182
_conn = None
157183

158184

185+
# ---------------------------------------------------------------------------
186+
# Read-only connection — for processes that must not compete for the write lock
187+
# ---------------------------------------------------------------------------
188+
189+
_conn_ro: duckdb.DuckDBPyConnection | None = None
190+
_conn_ro_lock = Lock()
191+
192+
193+
def open_db_readonly(path: str = "") -> duckdb.DuckDBPyConnection:
194+
"""
195+
Open (or return a cached) read-only DuckDB connection.
196+
197+
Multiple processes can hold read-only connections simultaneously without
198+
conflicting with the write owner (MCP server). Does NOT run migrations —
199+
the write owner is always responsible for schema.
200+
201+
For ':memory:' paths (test compat) falls back to the regular write
202+
connection so test code that calls this function works identically to
203+
production code that calls open_db().
204+
205+
Raises:
206+
FileNotFoundError: if the DB file does not exist. The write owner
207+
(MCP server) must be started first.
208+
"""
209+
global _conn_ro
210+
211+
if not path:
212+
path = os.getenv("TRADER_DB_PATH", "~/.quant_pod/trader.duckdb")
213+
214+
if path == ":memory:":
215+
# Tests use :memory: — read-only has no meaning there; reuse write conn.
216+
return open_db(path)
217+
218+
resolved = Path(path).expanduser()
219+
if not resolved.exists():
220+
raise FileNotFoundError(
221+
f"DB not found at {resolved}. "
222+
"The MCP server must be started (and have run migrations) before "
223+
"read-only consumers can connect."
224+
)
225+
226+
path = str(resolved)
227+
with _conn_ro_lock:
228+
if _conn_ro is None:
229+
_conn_ro = duckdb.connect(path, read_only=True)
230+
logger.info(f"[DB] Opened read-only connection at {path}")
231+
return _conn_ro
232+
233+
234+
def reset_connection_readonly() -> None:
235+
"""
236+
Close and clear the cached read-only connection.
237+
238+
Call this in tests after any test that opens a file-backed read-only
239+
connection so the cached singleton doesn't bleed across tests.
240+
"""
241+
global _conn_ro
242+
with _conn_ro_lock:
243+
if _conn_ro is not None:
244+
try:
245+
_conn_ro.close()
246+
except Exception:
247+
pass
248+
_conn_ro = None
249+
250+
159251
# ---------------------------------------------------------------------------
160252
# Migrations — idempotent, append-only schema upgrades
161253
# ---------------------------------------------------------------------------

packages/quant_pod/execution/portfolio_state.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ def __init__(
110110
initial_cash: float = 100_000.0,
111111
# Legacy parameter kept for backward compatibility — ignored when conn is provided
112112
db_path: str | None = None,
113+
read_only: bool = False,
113114
):
114115
# RLock (reentrant) because upsert_position() calls get_position() under the lock
115116
self._lock = RLock()
116117
self._initial_cash = initial_cash
118+
self._read_only = read_only
117119

118120
if conn is not None:
119121
# Injected connection (preferred — supports consolidated DB and in-memory tests)
@@ -127,7 +129,10 @@ def __init__(
127129
self._conn = open_db(db_path)
128130
run_migrations(self._conn)
129131

130-
self._seed_cash()
132+
# Skip seeding on read-only connections — the write owner seeds on first run,
133+
# and DDL/INSERT would raise duckdb.InvalidInputException on a read-only conn.
134+
if not self._read_only:
135+
self._seed_cash()
131136
logger.info("PortfolioState initialized")
132137

133138
# -------------------------------------------------------------------------
@@ -579,7 +584,7 @@ def get_portfolio_state(
579584
# Legacy parameter — ignored when conn is provided
580585
db_path: str | None = None,
581586
) -> PortfolioState:
582-
"""Get the singleton PortfolioState instance."""
587+
"""Get the singleton PortfolioState instance (write connection)."""
583588
global _portfolio_state
584589
if _portfolio_state is None:
585590
if conn is None:
@@ -589,3 +594,28 @@ def get_portfolio_state(
589594
run_migrations(conn)
590595
_portfolio_state = PortfolioState(conn=conn, initial_cash=initial_cash)
591596
return _portfolio_state
597+
598+
599+
# Read-only singleton — for processes that must not compete for the write lock
600+
# (e.g. the FastAPI server's GET endpoints).
601+
_portfolio_state_ro: PortfolioState | None = None
602+
603+
604+
def get_portfolio_state_readonly() -> PortfolioState:
605+
"""
606+
Get a read-only PortfolioState singleton.
607+
608+
Use this in processes (FastAPI, scripts) that run alongside the MCP server
609+
and only need to READ portfolio data. The returned instance cannot execute
610+
writes (upsert_position, adjust_cash, etc.) — those calls will raise a
611+
duckdb.InvalidInputException at runtime.
612+
613+
Raises:
614+
FileNotFoundError: if trader.duckdb doesn't exist yet (MCP server not started).
615+
"""
616+
global _portfolio_state_ro
617+
if _portfolio_state_ro is None:
618+
from quant_pod.db import open_db_readonly
619+
620+
_portfolio_state_ro = PortfolioState(conn=open_db_readonly(), read_only=True)
621+
return _portfolio_state_ro

0 commit comments

Comments
 (0)