diff --git a/moonfish/engines/alpha_beta.py b/moonfish/engines/alpha_beta.py index bc97538..fbf385f 100644 --- a/moonfish/engines/alpha_beta.py +++ b/moonfish/engines/alpha_beta.py @@ -1,3 +1,4 @@ +import time from multiprocessing.managers import DictProxy import chess.syzygy @@ -15,6 +16,9 @@ NEG_INF = float("-inf") NULL_MOVE = Move.null() +# Maximum depth for iterative deepening time-based search +MAX_DEPTH = 100 + class AlphaBeta: """ @@ -24,6 +28,8 @@ class AlphaBeta: def __init__(self, config: Config): self.config = config self.nodes: int = 0 + self._stop_time: float = 0.0 # Deadline for time-managed search + self._time_abort: bool = False # Set True when time runs out # Open Syzygy tablebase once at initialization (not on every eval) self.tablebase = None @@ -98,6 +104,15 @@ def quiescence_search( self.nodes += 1 + # Check time limit periodically (every 512 nodes) + if self._stop_time and (self.nodes & 511) == 0: + if time.perf_counter() >= self._stop_time: + self._time_abort = True + + # If time is up, return stand-pat immediately + if self._time_abort: + return self.eval_board(board) + if board.is_checkmate(): return -self.config.checkmate_score @@ -210,6 +225,15 @@ def negamax( self.nodes += 1 + # Check time limit periodically (every 512 nodes) + if self._stop_time and (self.nodes & 511) == 0: + if time.perf_counter() >= self._stop_time: + self._time_abort = True + + # If time is up, return immediately with a rough score + if self._time_abort: + return 0, None + # check if board was already evaluated if cache_key in cache: return cache[cache_key] @@ -311,6 +335,8 @@ def negamax( def search_move(self, board: Board) -> Move: self.nodes = 0 + self._time_abort = False + self._stop_time = 0.0 # create shared cache cache: CACHE_KEY = {} @@ -319,3 +345,51 @@ def search_move(self, board: Board) -> Move: )[1] assert best_move is not None, "Best move from root should not be None" return best_move + + def search_move_timed(self, board: Board, time_limit_s: float) -> Move: + """ + Search using iterative deepening with a time limit. + + Searches from depth 1 upward. If a depth completes within the time + limit, its result is saved. If time runs out mid-search, the result + from the last completed depth is used. + + Arguments: + - board: chess board state + - time_limit_s: maximum time in seconds for the entire search + + Returns: + - best_move: the best move found within the time limit + """ + self.nodes = 0 + self._time_abort = False + self._stop_time = time.perf_counter() + time_limit_s + cache: CACHE_KEY = {} + + best_move = None + + for depth in range(1, MAX_DEPTH + 1): + score, move = self.negamax( + board, depth, self.config.null_move, cache, + ) + + if self._time_abort: + # Time ran out during this depth — discard partial result + break + + if move is not None: + best_move = move + + # If time remaining is less than what we'd need for the next depth + # (rough heuristic: next depth takes ~4x longer), stop early + elapsed = time.perf_counter() - (self._stop_time - time_limit_s) + remaining = self._stop_time - time.perf_counter() + if remaining < elapsed * 3: + break + + self._stop_time = 0.0 + self._time_abort = False + + if best_move is None: + best_move = self.random_move(board) + return best_move diff --git a/moonfish/mode/uci.py b/moonfish/mode/uci.py index d147b9d..cf304ec 100644 --- a/moonfish/mode/uci.py +++ b/moonfish/mode/uci.py @@ -1,12 +1,68 @@ import sys -from chess import Board, STARTING_FEN +from chess import WHITE, Board, STARTING_FEN from moonfish.config import Config from moonfish.helper import find_best_move, get_engine # UCI based on Sunfish Engine: https://github.com/thomasahle/sunfish/blob/master/uci.py +def _parse_go_params(params: list[str]) -> dict[str, int]: + """Parse 'go' command parameters into a dict.""" + result: dict[str, int] = {} + i = 0 + while i < len(params): + key = params[i] + if key in ("wtime", "btime", "winc", "binc", "movetime", "movestogo", "depth"): + if i + 1 < len(params): + try: + result[key] = int(params[i + 1]) + except ValueError: + pass + i += 2 + continue + elif key == "infinite": + result["infinite"] = 1 + i += 1 + return result + + +def _calculate_time_limit( + go_params: dict[str, int], side_to_move_is_white: bool +) -> float | None: + """ + Calculate time limit in seconds from UCI go parameters. + Returns None if the engine should use fixed depth. + """ + # Fixed time per move (movetime) + if "movetime" in go_params: + # Use movetime minus a 1-second safety margin for Python/GC overhead + movetime_ms = go_params["movetime"] + return max((movetime_ms - 1000) / 1000.0, movetime_ms / 1000.0 * 0.5) + + # Clock-based time management + time_key = "wtime" if side_to_move_is_white else "btime" + inc_key = "winc" if side_to_move_is_white else "binc" + + if time_key in go_params: + remaining_ms = go_params[time_key] + increment_ms = go_params.get(inc_key, 0) + moves_to_go = go_params.get("movestogo", 30) + + # Allocate time: remaining / moves_to_go + most of the increment + time_for_move_ms = remaining_ms / moves_to_go + increment_ms * 0.8 + + # Don't use more than 25% of remaining time on one move + time_for_move_ms = min(time_for_move_ms, remaining_ms * 0.25) + + # Safety margin for Python/GC overhead + time_for_move_ms = max(time_for_move_ms - 500, 10) + + return time_for_move_ms / 1000.0 + + return None + + def main(config: Config): """ Start the command line user interface (UCI based). @@ -66,8 +122,22 @@ def main(config: Config): board.push_uci(move) elif uci_command.startswith("go"): - best_move = find_best_move( - board=board, - engine=engine, + go_params = _parse_go_params(uci_parameters[1:]) + + # If depth is specified in go command, use it + if "depth" in go_params: + config.negamax_depth = go_params["depth"] + + # Calculate time limit from go parameters + time_limit = _calculate_time_limit( + go_params, board.turn == WHITE ) + + if time_limit is not None and hasattr(engine, "search_move_timed"): + best_move = engine.search_move_timed(board, time_limit) + else: + best_move = find_best_move( + board=board, + engine=engine, + ) print(f"bestmove {best_move}")