Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions moonfish/engines/alpha_beta.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from multiprocessing.managers import DictProxy

import chess.syzygy
Expand All @@ -15,6 +16,9 @@
NEG_INF = float("-inf")
NULL_MOVE = Move.null()

# Maximum depth for iterative deepening time-based search
MAX_DEPTH = 100


class AlphaBeta:
"""
Expand All @@ -24,6 +28,8 @@ class AlphaBeta:
def __init__(self, config: Config):
self.config = config
self.nodes: int = 0
self._stop_time: float = 0.0 # Deadline for time-managed search
self._time_abort: bool = False # Set True when time runs out

# Open Syzygy tablebase once at initialization (not on every eval)
self.tablebase = None
Expand Down Expand Up @@ -98,6 +104,15 @@ def quiescence_search(

self.nodes += 1

# Check time limit periodically (every 512 nodes)
if self._stop_time and (self.nodes & 511) == 0:
if time.perf_counter() >= self._stop_time:
self._time_abort = True

# If time is up, return stand-pat immediately
if self._time_abort:
return self.eval_board(board)

if board.is_checkmate():
return -self.config.checkmate_score

Expand Down Expand Up @@ -210,6 +225,15 @@ def negamax(

self.nodes += 1

# Check time limit periodically (every 512 nodes)
if self._stop_time and (self.nodes & 511) == 0:
if time.perf_counter() >= self._stop_time:
self._time_abort = True

# If time is up, return immediately with a rough score
if self._time_abort:
return 0, None

# check if board was already evaluated
if cache_key in cache:
return cache[cache_key]
Expand Down Expand Up @@ -311,6 +335,8 @@ def negamax(

def search_move(self, board: Board) -> Move:
self.nodes = 0
self._time_abort = False
self._stop_time = 0.0
# create shared cache
cache: CACHE_KEY = {}

Expand All @@ -319,3 +345,51 @@ def search_move(self, board: Board) -> Move:
)[1]
assert best_move is not None, "Best move from root should not be None"
return best_move

def search_move_timed(self, board: Board, time_limit_s: float) -> Move:
"""
Search using iterative deepening with a time limit.

Searches from depth 1 upward. If a depth completes within the time
limit, its result is saved. If time runs out mid-search, the result
from the last completed depth is used.

Arguments:
- board: chess board state
- time_limit_s: maximum time in seconds for the entire search

Returns:
- best_move: the best move found within the time limit
"""
self.nodes = 0
self._time_abort = False
self._stop_time = time.perf_counter() + time_limit_s
cache: CACHE_KEY = {}

best_move = None

for depth in range(1, MAX_DEPTH + 1):
score, move = self.negamax(
board, depth, self.config.null_move, cache,
)

if self._time_abort:
# Time ran out during this depth — discard partial result
break

if move is not None:
best_move = move

# If time remaining is less than what we'd need for the next depth
# (rough heuristic: next depth takes ~4x longer), stop early
elapsed = time.perf_counter() - (self._stop_time - time_limit_s)
remaining = self._stop_time - time.perf_counter()
if remaining < elapsed * 3:
break

self._stop_time = 0.0
self._time_abort = False

if best_move is None:
best_move = self.random_move(board)
return best_move
78 changes: 74 additions & 4 deletions moonfish/mode/uci.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
import sys

from chess import Board, STARTING_FEN
from chess import WHITE, Board, STARTING_FEN
from moonfish.config import Config
from moonfish.helper import find_best_move, get_engine

# UCI based on Sunfish Engine: https://github.com/thomasahle/sunfish/blob/master/uci.py


def _parse_go_params(params: list[str]) -> dict[str, int]:
"""Parse 'go' command parameters into a dict."""
result: dict[str, int] = {}
i = 0
while i < len(params):
key = params[i]
if key in ("wtime", "btime", "winc", "binc", "movetime", "movestogo", "depth"):
if i + 1 < len(params):
try:
result[key] = int(params[i + 1])
except ValueError:
pass
i += 2
continue
elif key == "infinite":
result["infinite"] = 1
i += 1
return result


def _calculate_time_limit(
go_params: dict[str, int], side_to_move_is_white: bool
) -> float | None:
"""
Calculate time limit in seconds from UCI go parameters.
Returns None if the engine should use fixed depth.
"""
# Fixed time per move (movetime)
if "movetime" in go_params:
# Use movetime minus a 1-second safety margin for Python/GC overhead
movetime_ms = go_params["movetime"]
return max((movetime_ms - 1000) / 1000.0, movetime_ms / 1000.0 * 0.5)

# Clock-based time management
time_key = "wtime" if side_to_move_is_white else "btime"
inc_key = "winc" if side_to_move_is_white else "binc"

if time_key in go_params:
remaining_ms = go_params[time_key]
increment_ms = go_params.get(inc_key, 0)
moves_to_go = go_params.get("movestogo", 30)

# Allocate time: remaining / moves_to_go + most of the increment
time_for_move_ms = remaining_ms / moves_to_go + increment_ms * 0.8

# Don't use more than 25% of remaining time on one move
time_for_move_ms = min(time_for_move_ms, remaining_ms * 0.25)

# Safety margin for Python/GC overhead
time_for_move_ms = max(time_for_move_ms - 500, 10)

return time_for_move_ms / 1000.0

return None


def main(config: Config):
"""
Start the command line user interface (UCI based).
Expand Down Expand Up @@ -66,8 +122,22 @@ def main(config: Config):
board.push_uci(move)

elif uci_command.startswith("go"):
best_move = find_best_move(
board=board,
engine=engine,
go_params = _parse_go_params(uci_parameters[1:])

# If depth is specified in go command, use it
if "depth" in go_params:
config.negamax_depth = go_params["depth"]

# Calculate time limit from go parameters
time_limit = _calculate_time_limit(
go_params, board.turn == WHITE
)

if time_limit is not None and hasattr(engine, "search_move_timed"):
best_move = engine.search_move_timed(board, time_limit)
else:
best_move = find_best_move(
board=board,
engine=engine,
)
print(f"bestmove {best_move}")
Loading