Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 140 additions & 15 deletions api/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from typing import Generator, Optional


def _utc_now() -> datetime:
Expand All @@ -26,13 +26,16 @@ def _utc_now() -> datetime:
String,
Text,
create_engine,
event,
text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, relationship, sessionmaker
from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker
from sqlalchemy.types import JSON

Base = declarative_base()

class Base(DeclarativeBase):
"""SQLAlchemy 2.0 style declarative base."""
pass


class Feature(Base):
Expand Down Expand Up @@ -307,11 +310,11 @@ def _migrate_add_schedules_tables(engine) -> None:

# Create schedules table if missing
if "schedules" not in existing_tables:
Schedule.__table__.create(bind=engine)
Schedule.__table__.create(bind=engine) # type: ignore[attr-defined]

# Create schedule_overrides table if missing
if "schedule_overrides" not in existing_tables:
ScheduleOverride.__table__.create(bind=engine)
ScheduleOverride.__table__.create(bind=engine) # type: ignore[attr-defined]

# Add crash_count column if missing (for upgrades)
if "schedules" in existing_tables:
Expand All @@ -332,6 +335,41 @@ def _migrate_add_schedules_tables(engine) -> None:
conn.commit()


def _configure_sqlite_immediate_transactions(engine) -> None:
"""Configure engine for IMMEDIATE transactions via event hooks.

Per SQLAlchemy docs: https://docs.sqlalchemy.org/en/20/dialects/sqlite.html

This replaces fragile pysqlite implicit transaction handling with explicit
BEGIN IMMEDIATE at transaction start. Benefits:
- Acquires write lock immediately, preventing stale reads
- Works correctly regardless of prior ORM operations
- Future-proof: won't break when pysqlite legacy mode is removed in Python 3.16

Note: We only use IMMEDIATE for user transactions, not for PRAGMA statements.
The do_begin hook only fires when SQLAlchemy starts a transaction, which
doesn't happen for PRAGMA commands when using conn.exec_driver_sql() directly.
"""
@event.listens_for(engine, "connect")
def do_connect(dbapi_connection, connection_record):
# Disable pysqlite's implicit transaction handling
dbapi_connection.isolation_level = None

# Execute PRAGMAs immediately on raw connection before any transactions
# This is safe because isolation_level=None means no implicit transactions
cursor = dbapi_connection.cursor()
try:
# These PRAGMAs need to run outside of any transaction
cursor.execute("PRAGMA busy_timeout=30000")
finally:
cursor.close()

@event.listens_for(engine, "begin")
def do_begin(conn):
# Use IMMEDIATE for all transactions to prevent stale reads
conn.exec_driver_sql("BEGIN IMMEDIATE")


def create_database(project_dir: Path) -> tuple:
"""
Create database and return engine + session maker.
Expand All @@ -351,21 +389,37 @@ def create_database(project_dir: Path) -> tuple:
return _engine_cache[cache_key]

db_url = get_database_url(project_dir)
engine = create_engine(db_url, connect_args={
"check_same_thread": False,
"timeout": 30 # Wait up to 30s for locks
})
Base.metadata.create_all(bind=engine)

# Choose journal mode based on filesystem type
# WAL mode doesn't work reliably on network filesystems and can cause corruption
is_network = _is_network_path(project_dir)
journal_mode = "DELETE" if is_network else "WAL"

engine = create_engine(db_url, connect_args={
"check_same_thread": False,
"timeout": 30 # Wait up to 30s for locks
})

# Set journal mode BEFORE configuring event hooks
# PRAGMA journal_mode must run outside of a transaction, and our event hooks
# start a transaction with BEGIN IMMEDIATE on every operation
with engine.connect() as conn:
conn.execute(text(f"PRAGMA journal_mode={journal_mode}"))
conn.execute(text("PRAGMA busy_timeout=30000"))
conn.commit()
# Get raw DBAPI connection to execute PRAGMA outside transaction
raw_conn = conn.connection.dbapi_connection
if raw_conn is None:
raise RuntimeError("Failed to get raw DBAPI connection")
cursor = raw_conn.cursor()
try:
cursor.execute(f"PRAGMA journal_mode={journal_mode}")
cursor.execute("PRAGMA busy_timeout=30000")
finally:
cursor.close()

# Configure IMMEDIATE transactions via event hooks AFTER setting PRAGMAs
# This must happen before create_all() and migrations run
_configure_sqlite_immediate_transactions(engine)

Base.metadata.create_all(bind=engine)

# Migrate existing databases
_migrate_add_in_progress_column(engine)
Expand Down Expand Up @@ -417,7 +471,7 @@ def set_session_maker(session_maker: sessionmaker) -> None:
_session_maker = session_maker


def get_db() -> Session:
def get_db() -> Generator[Session, None, None]:
"""
Dependency for FastAPI to get database session.

Expand All @@ -431,3 +485,74 @@ def get_db() -> Session:
yield db
finally:
db.close()


# =============================================================================
# Atomic Transaction Helpers for Parallel Mode
# =============================================================================
# These helpers prevent database corruption when multiple processes access the
# same SQLite database concurrently. They use IMMEDIATE transactions which
# acquire write locks at the start (preventing stale reads) and atomic
# UPDATE ... WHERE clauses (preventing check-then-modify races).


from contextlib import contextmanager


@contextmanager
def atomic_transaction(session_maker, isolation_level: str = "IMMEDIATE"):
"""Context manager for atomic SQLite transactions.

Acquires a write lock immediately via BEGIN IMMEDIATE, preventing
stale reads in read-modify-write patterns. This is essential for
preventing race conditions in parallel mode.

Note: The engine is configured via _configure_sqlite_immediate_transactions()
to use BEGIN IMMEDIATE for all transactions. The isolation_level parameter
is kept for backwards compatibility and for EXCLUSIVE transactions when
blocking readers is required.

Args:
session_maker: SQLAlchemy sessionmaker
isolation_level: "IMMEDIATE" (default) or "EXCLUSIVE"
- IMMEDIATE: Acquires write lock at transaction start (default via event hooks)
- EXCLUSIVE: Also blocks other readers (requires explicit BEGIN EXCLUSIVE)

Yields:
SQLAlchemy session with automatic commit/rollback

Example:
with atomic_transaction(session_maker) as session:
# All reads in this block are protected by write lock
feature = session.query(Feature).filter(...).first()
feature.priority = new_priority
# Commit happens automatically on exit
"""
session = session_maker()
try:
# For EXCLUSIVE mode, override the default IMMEDIATE from event hooks
# For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically
if isolation_level == "EXCLUSIVE":
session.execute(text("BEGIN EXCLUSIVE"))
# Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it
# This prevents the fragile "BEGIN on already-begun transaction" issue
yield session
session.commit()
except Exception:
try:
session.rollback()
except Exception:
pass # Don't let rollback failure mask original error
raise
finally:
session.close()
Comment on lines +531 to +548
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if EXCLUSIVE mode is actually used anywhere in the codebase
echo "=== Searching for EXCLUSIVE mode usage ==="
rg -n "EXCLUSIVE" --type=py -B 2 -A 2

echo -e "\n=== Searching for isolation_level assignments ==="
rg -n 'isolation_level' --type=py -B 2 -A 2

echo -e "\n=== Looking for atomic_transaction context manager ==="
rg -n "atomic_transaction\|@atomic_transaction" --type=py -B 2 -A 2 | head -50

Repository: leonvanzyl/autocoder

Length of output: 6215


🏁 Script executed:

#!/bin/bash
# Find the event hook implementation
echo "=== Looking for do_begin event hook ==="
rg -n "do_begin\|event.listen\|before_cursor_execute" --type=py -B 3 -A 5

Repository: leonvanzyl/autocoder

Length of output: 103


🏁 Script executed:

#!/bin/bash
# Check the database.py file structure to find event hook setup
echo "=== Examining api/database.py for event hooks and session setup ==="
head -100 api/database.py

Repository: leonvanzyl/autocoder

Length of output: 3621


🏁 Script executed:

#!/bin/bash
echo "=== Looking for _configure_sqlite_immediate_transactions function ==="
rg -n "_configure_sqlite_immediate_transactions" --type=py -A 20

echo -e "\n=== Searching for event.listen calls ==="
rg -n "event\.listen" --type=py -B 2 -A 5

Repository: leonvanzyl/autocoder

Length of output: 5510


🏁 Script executed:

#!/bin/bash
echo "=== Searching for BEGIN IMMEDIATE in database.py ==="
rg -n "BEGIN IMMEDIATE" api/database.py -B 3 -A 3

echo -e "\n=== Full context around lines 350-420 in api/database.py ==="
sed -n '350,420p' api/database.py

Repository: leonvanzyl/autocoder

Length of output: 5057


🏁 Script executed:

#!/bin/bash
echo "=== Check lines around the atomic_transaction function and event setup ==="
sed -n '460,520p' api/database.py

Repository: leonvanzyl/autocoder

Length of output: 2522


EXCLUSIVE mode causes "cannot start a transaction within a transaction" error.

When isolation_level == "EXCLUSIVE", the code executes session.execute(text("BEGIN EXCLUSIVE")) at line 501. However, SQLAlchemy's "begin" event (lines 367-370) fires and executes BEGIN IMMEDIATE before this statement runs. Since autocommit=False on the sessionmaker, attempting BEGIN EXCLUSIVE within an already-started transaction causes SQLite to error.

This affects the actual usage in feature_mcp.py line 564, which calls atomic_transaction(_session_maker, "EXCLUSIVE").

Proposed fix

Either:

  1. Disable EXCLUSIVE mode support and only allow IMMEDIATE:
     session = session_maker()
     try:
-        # For EXCLUSIVE mode, override the default IMMEDIATE from event hooks
-        # For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically
-        if isolation_level == "EXCLUSIVE":
-            session.execute(text("BEGIN EXCLUSIVE"))
-        # Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it
-        # This prevents the fragile "BEGIN on already-begun transaction" issue
+        if isolation_level != "IMMEDIATE":
+            raise ValueError(f"Only IMMEDIATE isolation level is supported, got: {isolation_level}")
         yield session
  1. Or conditionally disable the event hook for EXCLUSIVE mode connections (requires connection-level isolation which adds complexity).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
session = session_maker()
try:
# For EXCLUSIVE mode, override the default IMMEDIATE from event hooks
# For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically
if isolation_level == "EXCLUSIVE":
session.execute(text("BEGIN EXCLUSIVE"))
# Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it
# This prevents the fragile "BEGIN on already-begun transaction" issue
yield session
session.commit()
except Exception:
try:
session.rollback()
except Exception:
pass # Don't let rollback failure mask original error
raise
finally:
session.close()
session = session_maker()
try:
if isolation_level != "IMMEDIATE":
raise ValueError(f"Only IMMEDIATE isolation level is supported, got: {isolation_level}")
yield session
session.commit()
except Exception:
try:
session.rollback()
except Exception:
pass # Don't let rollback failure mask original error
raise
finally:
session.close()
🤖 Prompt for AI Agents
In `@api/database.py` around lines 496 - 513, The code currently attempts
session.execute(text("BEGIN EXCLUSIVE")) when isolation_level == "EXCLUSIVE",
but the SQLAlchemy "begin" event hook already issues "BEGIN IMMEDIATE", causing
a nested-transaction error; fix by removing the explicit BEGIN EXCLUSIVE and do
not support starting EXCLUSIVE at the session level—either (preferred) map the
"EXCLUSIVE" argument to "IMMEDIATE" (treat isolation_level == "EXCLUSIVE" as
"IMMEDIATE") in the atomic_transaction/session creation logic, or immediately
raise a clear ValueError when callers pass "EXCLUSIVE"; update references to
isolation_level handling and remove session.execute(text("BEGIN EXCLUSIVE")) so
the existing begin event hook (which issues BEGIN IMMEDIATE) is the single
source of BEGIN semantics.



# Note: The following functions were removed as dead code (never imported/called):
# - atomic_claim_feature()
# - atomic_mark_passing()
# - atomic_update_priority_to_end()
# - atomic_get_next_priority()
#
# The MCP server reimplements this logic inline with proper atomic UPDATE WHERE
# clauses. See mcp_server/feature_mcp.py for the actual implementation.
Loading
Loading