From 13f83ec51bc01ebe70d1c530993b6ecfcfda3db6 Mon Sep 17 00:00:00 2001 From: Lucca Bertoncini Date: Mon, 16 Feb 2026 01:16:12 -0800 Subject: [PATCH] Add UCI time management with iterative deepening The engine previously searched to a fixed depth (default 3) regardless of available time, wasting most of the allocated thinking time in timed games. With 60s per move, the engine would finish in <1s. Add search_move_timed() to AlphaBeta that uses iterative deepening under a time constraint: searches depth 1, 2, 3, ... until time runs out, keeping the best move from the last completed depth. Time is checked every 512 nodes to minimize overshoot. Update the UCI handler to parse go parameters (wtime, btime, winc, binc, movetime, movestogo, depth) and calculate appropriate time allocation per move with 1-second safety margins for Python overhead. --- moonfish/engines/alpha_beta.py | 74 ++++++++++++++++++++++++++++++++ moonfish/mode/uci.py | 78 ++++++++++++++++++++++++++++++++-- 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/moonfish/engines/alpha_beta.py b/moonfish/engines/alpha_beta.py index bc97538..fbf385f 100644 --- a/moonfish/engines/alpha_beta.py +++ b/moonfish/engines/alpha_beta.py @@ -1,3 +1,4 @@ +import time from multiprocessing.managers import DictProxy import chess.syzygy @@ -15,6 +16,9 @@ NEG_INF = float("-inf") NULL_MOVE = Move.null() +# Maximum depth for iterative deepening time-based search +MAX_DEPTH = 100 + class AlphaBeta: """ @@ -24,6 +28,8 @@ class AlphaBeta: def __init__(self, config: Config): self.config = config self.nodes: int = 0 + self._stop_time: float = 0.0 # Deadline for time-managed search + self._time_abort: bool = False # Set True when time runs out # Open Syzygy tablebase once at initialization (not on every eval) self.tablebase = None @@ -98,6 +104,15 @@ def quiescence_search( self.nodes += 1 + # Check time limit periodically (every 512 nodes) + if self._stop_time and (self.nodes & 511) == 0: + if time.perf_counter() >= self._stop_time: + self._time_abort = True + + # If time is up, return stand-pat immediately + if self._time_abort: + return self.eval_board(board) + if board.is_checkmate(): return -self.config.checkmate_score @@ -210,6 +225,15 @@ def negamax( self.nodes += 1 + # Check time limit periodically (every 512 nodes) + if self._stop_time and (self.nodes & 511) == 0: + if time.perf_counter() >= self._stop_time: + self._time_abort = True + + # If time is up, return immediately with a rough score + if self._time_abort: + return 0, None + # check if board was already evaluated if cache_key in cache: return cache[cache_key] @@ -311,6 +335,8 @@ def negamax( def search_move(self, board: Board) -> Move: self.nodes = 0 + self._time_abort = False + self._stop_time = 0.0 # create shared cache cache: CACHE_KEY = {} @@ -319,3 +345,51 @@ def search_move(self, board: Board) -> Move: )[1] assert best_move is not None, "Best move from root should not be None" return best_move + + def search_move_timed(self, board: Board, time_limit_s: float) -> Move: + """ + Search using iterative deepening with a time limit. + + Searches from depth 1 upward. If a depth completes within the time + limit, its result is saved. If time runs out mid-search, the result + from the last completed depth is used. + + Arguments: + - board: chess board state + - time_limit_s: maximum time in seconds for the entire search + + Returns: + - best_move: the best move found within the time limit + """ + self.nodes = 0 + self._time_abort = False + self._stop_time = time.perf_counter() + time_limit_s + cache: CACHE_KEY = {} + + best_move = None + + for depth in range(1, MAX_DEPTH + 1): + score, move = self.negamax( + board, depth, self.config.null_move, cache, + ) + + if self._time_abort: + # Time ran out during this depth — discard partial result + break + + if move is not None: + best_move = move + + # If time remaining is less than what we'd need for the next depth + # (rough heuristic: next depth takes ~4x longer), stop early + elapsed = time.perf_counter() - (self._stop_time - time_limit_s) + remaining = self._stop_time - time.perf_counter() + if remaining < elapsed * 3: + break + + self._stop_time = 0.0 + self._time_abort = False + + if best_move is None: + best_move = self.random_move(board) + return best_move diff --git a/moonfish/mode/uci.py b/moonfish/mode/uci.py index d147b9d..cf304ec 100644 --- a/moonfish/mode/uci.py +++ b/moonfish/mode/uci.py @@ -1,12 +1,68 @@ import sys -from chess import Board, STARTING_FEN +from chess import WHITE, Board, STARTING_FEN from moonfish.config import Config from moonfish.helper import find_best_move, get_engine # UCI based on Sunfish Engine: https://github.com/thomasahle/sunfish/blob/master/uci.py +def _parse_go_params(params: list[str]) -> dict[str, int]: + """Parse 'go' command parameters into a dict.""" + result: dict[str, int] = {} + i = 0 + while i < len(params): + key = params[i] + if key in ("wtime", "btime", "winc", "binc", "movetime", "movestogo", "depth"): + if i + 1 < len(params): + try: + result[key] = int(params[i + 1]) + except ValueError: + pass + i += 2 + continue + elif key == "infinite": + result["infinite"] = 1 + i += 1 + return result + + +def _calculate_time_limit( + go_params: dict[str, int], side_to_move_is_white: bool +) -> float | None: + """ + Calculate time limit in seconds from UCI go parameters. + Returns None if the engine should use fixed depth. + """ + # Fixed time per move (movetime) + if "movetime" in go_params: + # Use movetime minus a 1-second safety margin for Python/GC overhead + movetime_ms = go_params["movetime"] + return max((movetime_ms - 1000) / 1000.0, movetime_ms / 1000.0 * 0.5) + + # Clock-based time management + time_key = "wtime" if side_to_move_is_white else "btime" + inc_key = "winc" if side_to_move_is_white else "binc" + + if time_key in go_params: + remaining_ms = go_params[time_key] + increment_ms = go_params.get(inc_key, 0) + moves_to_go = go_params.get("movestogo", 30) + + # Allocate time: remaining / moves_to_go + most of the increment + time_for_move_ms = remaining_ms / moves_to_go + increment_ms * 0.8 + + # Don't use more than 25% of remaining time on one move + time_for_move_ms = min(time_for_move_ms, remaining_ms * 0.25) + + # Safety margin for Python/GC overhead + time_for_move_ms = max(time_for_move_ms - 500, 10) + + return time_for_move_ms / 1000.0 + + return None + + def main(config: Config): """ Start the command line user interface (UCI based). @@ -66,8 +122,22 @@ def main(config: Config): board.push_uci(move) elif uci_command.startswith("go"): - best_move = find_best_move( - board=board, - engine=engine, + go_params = _parse_go_params(uci_parameters[1:]) + + # If depth is specified in go command, use it + if "depth" in go_params: + config.negamax_depth = go_params["depth"] + + # Calculate time limit from go parameters + time_limit = _calculate_time_limit( + go_params, board.turn == WHITE ) + + if time_limit is not None and hasattr(engine, "search_move_timed"): + best_move = engine.search_move_timed(board, time_limit) + else: + best_move = find_best_move( + board=board, + engine=engine, + ) print(f"bestmove {best_move}")