diff --git a/libbenbot/src/engine/Printing.cpp b/libbenbot/src/engine/Printing.cpp index e3cf45b1..9890db36 100644 --- a/libbenbot/src/engine/Printing.cpp +++ b/libbenbot/src/engine/Printing.cpp @@ -151,14 +151,15 @@ void Engine::print_current_position(const string_view arguments) const println(""); - print_labeled_info("FEN: ", chess::notation::to_fen(pos)); + print_labeled_info("FEN: ", chess::notation::to_fen(pos)); + print_labeled_info("XFEN: ", chess::notation::to_fen(pos, false)); print_labeled_info("Zobrist key: ", std::format("{}", pos.hash)); - print_labeled_info("Static eval: ", std::format("{}", eval::evaluate(pos))); searcher.context.probe_transposition_table(pos) .transform([this](const TTData& data) { + println(""); print_labeled_info( "TT hit: ", std::format( diff --git a/libchess/include/libchess/uci/EngineBase.hpp b/libchess/include/libchess/uci/EngineBase.hpp index 39b1f4a2..a17a576a 100644 --- a/libchess/include/libchess/uci/EngineBase.hpp +++ b/libchess/include/libchess/uci/EngineBase.hpp @@ -263,6 +263,7 @@ struct EngineBase { void handle_quit(); void handle_setpos(string_view arguments); void handle_setoption(string_view arguments); + void handle_go(string_view arguments); static_assert( std::atomic_bool::is_always_lock_free, @@ -313,7 +314,7 @@ struct EngineBase { .argsHelp = "[startpos|fen ] [moves ]" }, EngineCommand { .name = "go", - .action = [this](const string_view args) { go(parse_go_options(args, position)); }, + .action = [this](const string_view args) { handle_go(args); }, .description = "Start a search", .argsHelp = { } }, EngineCommand { diff --git a/libchess/src/game/Position.cpp b/libchess/src/game/Position.cpp index 2a38fce4..13902313 100644 --- a/libchess/src/game/Position.cpp +++ b/libchess/src/game/Position.cpp @@ -363,6 +363,8 @@ auto Position::is_illegal() const -> std::optional magic_enum::enum_name(sideToMove)); } + // TODO: our king cannot be in check by more than 2 pieces + return std::nullopt; } diff --git a/libchess/src/uci/EngineBase.cpp b/libchess/src/uci/EngineBase.cpp index 39decc2d..7376ccb6 100644 --- a/libchess/src/uci/EngineBase.cpp +++ b/libchess/src/uci/EngineBase.cpp @@ -143,6 +143,18 @@ void EngineBase::respond_to_newgame() new_game(not wasInitialized); } +void EngineBase::handle_go(const string_view arguments) +{ + // if we haven't received a ucinewgame yet, make sure the subclass + // gets its expected new_game() call before its first go() call + if (not initialized.exchange(true, memory_order_relaxed)) { + new_game(true); + } + + go( + parse_go_options(arguments, position)); +} + void EngineBase::handle_quit() { abort_search(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 76f7bfb1..32da7a30 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,7 @@ endfunction () configure_file (CTestCustom.cmake "${BenBot_BINARY_DIR}/CTestCustom.cmake" @ONLY) +add_subdirectory (e2e) add_subdirectory (fastchess) add_subdirectory (perft) add_subdirectory (position-solver) diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt new file mode 100644 index 00000000..032373f3 --- /dev/null +++ b/tests/e2e/CMakeLists.txt @@ -0,0 +1,32 @@ +# ====================================================================================== +# +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓███████▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░▒▓████████▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓██████▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░ ░▒▓█▓▒░ +# +# ====================================================================================== + +add_feature_info ( + benbot_e2e_testing "TARGET Python::Interpreter" "BenBot end-to-end & integration testing" +) + +if (NOT TARGET Python::Interpreter) + return () +endif () + +add_test (NAME ben_bot.one_shot_clis + COMMAND Python::Interpreter "${CMAKE_CURRENT_LIST_DIR}/OneShotCLIs.py" + "$" +) + +add_test (NAME ben_bot.e2e COMMAND Python::Interpreter "${CMAKE_CURRENT_LIST_DIR}/e2e.py" + "$" +) + +set_tests_properties ( + ben_bot.one_shot_clis ben_bot.e2e PROPERTIES REQUIRED_FILES "$" +) diff --git a/tests/e2e/OneShotCLIs.py b/tests/e2e/OneShotCLIs.py new file mode 100644 index 00000000..fb149ea1 --- /dev/null +++ b/tests/e2e/OneShotCLIs.py @@ -0,0 +1,67 @@ +# ====================================================================================== +# +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓███████▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░▒▓████████▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓██████▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░ ░▒▓█▓▒░ +# +# ====================================================================================== + +import subprocess +import sys +from pathlib import Path + + +def run_oneshot_cli_test_suite(commands: list[str], benbot_path: Path): + num_passed = 0 + num_failed = 0 + + def run_benbot_command(command: str): + nonlocal num_passed + nonlocal num_failed + + print(f"Running command: '{command}'") + + process = subprocess.run( + [benbot_path] + command.split(" ") + ["--no-loop"], + capture_output=True, + text=True, + ) + + if process.returncode == 0: + num_passed += 1 + return + + print(process.stdout) + print(process.stderr) + print(f"Process failed with return code {process.returncode}") + num_failed += 1 + + for cmd in commands: + run_benbot_command(cmd) + + print(f"{num_passed} tests passed") + print(f"{num_failed} tests failed") + + exit(num_failed) + + +TEST_COMMANDS = [ + "go nodes 1000", + "go depth 10", + "perft 4 json", + "go movetime 1000", + "go wtime 8000 btime 8000 winc 500 binc 500", + "go wtime 1000 btime 1000 winc 0 binc 0", + "go wtime 1000 btime 1000 winc 0 binc 0 movestogo 5", + "go movetime 200", + "go nodes 20000 searchmoves e2e4 d2d4", + "showpos", + "compiler", + "uci", +] + +run_oneshot_cli_test_suite(TEST_COMMANDS, Path(sys.argv[1])) diff --git a/tests/e2e/README.md b/tests/e2e/README.md new file mode 100644 index 00000000..22025786 --- /dev/null +++ b/tests/e2e/README.md @@ -0,0 +1,4 @@ +# End-to-end testing + +This directory defines tests that are scripted to test the engine +executable in various scenarios. diff --git a/tests/e2e/e2e.py b/tests/e2e/e2e.py new file mode 100644 index 00000000..7042e87c --- /dev/null +++ b/tests/e2e/e2e.py @@ -0,0 +1,350 @@ +# ====================================================================================== +# +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓███████▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░▒▓████████▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓██████▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░ ░▒▓█▓▒░ +# +# ====================================================================================== + +import argparse +import re +import sys +import pathlib +import os + +from testing import ( + BenBot as Engine, + MiniTestFramework, + OrderedClassMembers, +) + +PATH = pathlib.Path(__file__).parent.resolve() +CWD = os.getcwd() + + +def get_path(): + return os.path.abspath(os.path.join(CWD, args.engine_path)) + + +def BenBot(*args, **kwargs): + return Engine(get_path(), *args, **kwargs) + + +class TestInteractive(metaclass=OrderedClassMembers): + @staticmethod + def test_uci_command(): + engine = BenBot() + engine.send_command("uci") + engine.equals("uciok") + + @staticmethod + def test_set_threads_option(): + engine = BenBot() + engine.send_command(f"setoption name Threads value 1") + + @staticmethod + def test_ucinewgame_and_startpos_nodes_1000(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position startpos") + engine.send_command("go nodes 1000") + engine.starts_with("bestmove") + + @staticmethod + def test_ucinewgame_and_startpos_moves(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position startpos moves e2e4 e7e6") + engine.send_command("go nodes 1000") + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_1(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 5rk1/1K4p1/8/8/3B4/8/8/8 b - - 0 1") + engine.send_command("go nodes 1000") + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_2_flip(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 5rk1/1K4p1/8/8/3B4/8/8/8 b - - 0 1") + engine.send_command("flip") + engine.send_command("go nodes 1000") + engine.starts_with("bestmove") + + @staticmethod + def test_depth_5_with_callback(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position startpos") + engine.send_command("go depth 5") + + def callback(output): + regex = r"info depth \d+ seldepth \d+ score cp -?\d+ time \d+ hashfull \d+ nodes \d+ nps \d+ tbhits \d+ pv" + if output.startswith("info depth") and not re.match(regex, output): + assert False + if output.startswith("bestmove"): + return True + return False + + engine.check_output(callback) + + @staticmethod + def test_ucinewgame_and_go_depth_4(): + total_depth = 4 + + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position startpos") + engine.send_command(f"go depth {total_depth}") + + depth = 0 + + def callback(output): + nonlocal depth + nonlocal total_depth + + if output.startswith("info depth"): + depth += 1 + + regex = rf"info depth {depth} seldepth \d+ score cp -?\d+ time \d+ hashfull \d+ nodes \d+ nps \d+ tbhits \d+ pv" + + if not re.match(regex, output): + assert False + + if output.startswith("bestmove"): + assert depth == total_depth + return True + + return False + + engine.check_output(callback) + + @staticmethod + def test_clear_hash(): + engine = BenBot() + engine.send_command("setoption name Clear Hash") + + @staticmethod + def test_fen_position_mate_1(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 5K2/8/2qk4/2nPp3/3r4/6B1/B7/3R4 w - e6") + engine.send_command("go depth 10") + + engine.expect("* score mate 1 * pv d5e6") + engine.equals("bestmove d5e6") + + @staticmethod + def test_fen_position_mate_minus_1(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 2brrb2/8/p7/Q7/1p1kpPp1/1P1pN1K1/3P4/8 b - -") + engine.send_command("go depth 10") + engine.expect("* score mate -1 *") + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_fixed_node(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 5K2/8/2P1P1Pk/6pP/3p2P1/1P6/3P4/8 w - - 0 1") + engine.send_command("go nodes 10000") + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_with_mate_go_depth(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - -") + engine.send_command("go depth 18 searchmoves c6d7") + engine.expect("* score mate 2 * pv c6d7 * f7f5") + + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_with_mate_go_mate(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - -") + engine.send_command("go mate 2 searchmoves c6d7") + engine.expect("* score mate 2 * pv c6d7 *") + + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_with_mate_go_nodes(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - -") + engine.send_command("go nodes 500000 searchmoves c6d7") + engine.expect("* score mate 2 * pv c6d7 * f7f5") + + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_depth_8(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command( + "position fen r1b2r1k/pp1p2pp/2p5/2B1q3/8/8/P1PN2PP/R4RK1 w - - 0 18" + ) + engine.send_command("go depth 8") + engine.contains("score mate 1") + + engine.starts_with("bestmove") + + @staticmethod + def test_fen_position_with_mate_go_depth_and_promotion(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command( + "position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - - moves c6d7 f2f1q" + ) + engine.send_command("go depth 13") + engine.expect("* score mate 1 * pv f7f5") + engine.starts_with("bestmove f7f5") + + @staticmethod + def test_fen_position_with_mate_go_depth_and_searchmoves(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command("position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - -") + engine.send_command("go depth 18 searchmoves c6d7") + engine.expect("* score mate 2 * pv c6d7 * f7f5") + + engine.starts_with("bestmove c6d7") + + @staticmethod + def test_fen_position_with_moves_with_mate_go_depth_and_searchmoves(): + engine = BenBot() + engine.send_command("ucinewgame") + engine.send_command( + "position fen 8/5R2/2K1P3/4k3/8/b1PPpp1B/5p2/8 w - - moves c6d7" + ) + engine.send_command("go depth 18 searchmoves e3e2") + engine.expect("* score mate -1 * pv e3e2 f7f5") + engine.starts_with("bestmove e3e2") + + +class TestEnPassantSanitization(metaclass=OrderedClassMembers): + @staticmethod + def test_position_1(): + engine = BenBot() + engine.send_command( + "position fen rnbqkbnr/ppp1p1pp/5p2/3pP3/8/8/PPPP1PPP/RNBQKBNR w kq d6 0 3" + ) + engine.send_command("showpos") + + engine.expect_for_line_matching( + "FEN*", "*rnbqkbnr/ppp1p1pp/5p2/3pP3/8/8/PPPP1PPP/RNBQKBNR w kq d6 0 3*" + ) + + @staticmethod + def test_position_2(): + engine = BenBot() + engine.send_command("position fen k7/8/8/1pP5/2K5/8/8/8 w - b6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*k7/8/8/1pP5/2K5/8/8/8 w - b6 0 1*") + + # def test_position_3(self): + # self.engine.send_command("position fen k1r5/8/8/1pP5/2K5/8/8/8 w - b6 0 1") + # self.engine.send_command("showpos") + # + # self.engine.expect_for_line_matching( + # "XFEN*", "*k1r5/8/8/1pP5/2K5/8/8/8 w - - 0 1*" + # ) + + # def test_position_4(self): + # self.engine.send_command("position fen k1r5/8/8/1pP5/8/2K5/8/8 w - b6 0 1") + # self.engine.send_command("showpos") + # + # self.engine.expect_for_line_matching( + # "XFEN*", "*k1r5/8/8/1pP5/8/2K5/8/8 w - - 0 1*" + # ) + + @staticmethod + def test_position_5(): + engine = BenBot() + engine.send_command("position fen k1r5/8/8/PpP5/8/2K5/8/8 w - b6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*k1r5/8/8/PpP5/8/2K5/8/8 w - b6 0 1*") + + @staticmethod + def test_position_6(): + engine = BenBot() + engine.send_command("position fen k1r5/8/8/PpP5/2K5/8/8/8 w - b6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*k1r5/8/8/PpP5/2K5/8/8/8 w - b6 0 1*") + + @staticmethod + def test_position_7(): + engine = BenBot() + engine.send_command("position fen k7/4b3/8/PpP5/1K6/8/8/8 w - b6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*k7/4b3/8/PpP5/1K6/8/8/8 w - b6 0 1*") + + # def test_position_8(self): + # self.engine.send_command("position fen k7/b5b1/8/2PpP3/3K4/8/8/8 w - d6 0 1") + # self.engine.send_command("showpos") + # + # self.engine.expect_for_line_matching( + # "XFEN*", "*k7/b5b1/8/2PpP3/3K4/8/8/8 w - - 0 1*" + # ) + + # def test_position_9(self): + # self.engine.send_command("position fen k7/8/8/r2pPK2/8/8/8/8 w - d6 0 1") + # self.engine.send_command("showpos") + # + # self.engine.expect_for_line_matching( + # "XFEN*", "*k7/8/8/r2pPK2/8/8/8/8 w - - 0 1*" + # ) + + @staticmethod + def test_position_10(): + engine = BenBot() + engine.send_command("position fen k7/8/8/r1PpPK2/8/8/8/8 w - d6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*k7/8/8/r1PpPK2/8/8/8/8 w - d6 0 1*") + + @staticmethod + def test_position_11(): + engine = BenBot() + engine.send_command("position fen kb6/8/8/3pP3/5K2/8/8/8 w - d6 0 1") + engine.send_command("showpos") + + engine.expect_for_line_matching("FEN*", "*kb6/8/8/3pP3/5K2/8/8/8 w - d6 0 1*") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Run BenBot end-to-end tests") + + parser.add_argument("engine_path", type=str, help="Path to BenBot binary") + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + + framework = MiniTestFramework() + + # Each test suite will be run inside a temporary directory + framework.run([TestInteractive, TestEnPassantSanitization]) + + if framework.has_failed(): + sys.exit(1) + + sys.exit(0) diff --git a/tests/e2e/testing.py b/tests/e2e/testing.py new file mode 100644 index 00000000..cb566301 --- /dev/null +++ b/tests/e2e/testing.py @@ -0,0 +1,320 @@ +# ====================================================================================== +# +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓███████▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░▒▓████████▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓██████▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░░▒▓█▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓█▓▒░ +# ░▒▓███████▓▒░░▒▓████████▓▒░▒▓█▓▒░░▒▓█▓▒░ ░▒▓███████▓▒░ ░▒▓██████▓▒░ ░▒▓█▓▒░ +# +# ====================================================================================== + +import subprocess +from typing import List +import os +import collections +import time +import sys +import traceback +import fnmatch +from functools import wraps +from contextlib import redirect_stdout +import io +import concurrent.futures +import tempfile + +CYAN_COLOR = "\033[36m" +GRAY_COLOR = "\033[2m" +RED_COLOR = "\033[31m" +GREEN_COLOR = "\033[32m" +RESET_COLOR = "\033[0m" +WHITE_BOLD = "\033[1m" + +MAX_TIMEOUT = 60 * 5 + + +class OrderedClassMembers(type): + @classmethod + def __prepare__(self, name, bases): + return collections.OrderedDict() + + def __new__(self, name, bases, classdict): + classdict["__ordered__"] = [ + key for key in classdict.keys() if key not in ("__module__", "__qualname__") + ] + return type.__new__(self, name, bases, classdict) + + +class TimeoutException(Exception): + def __init__(self, message: str, timeout: int): + self.message = message + self.timeout = timeout + + +class UnexpectedOutputException(Exception): + def __init__(self, actual: str, expected: str): + self.actual = actual + self.expected = expected + + +def timeout_decorator(timeout: float): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(func, *args, **kwargs) + try: + result = future.result(timeout=timeout) + except concurrent.futures.TimeoutError: + raise TimeoutException( + f"Function {func.__name__} timed out after {timeout} seconds", + int(timeout), + ) + return result + + return wrapper + + return decorator + + +class MiniTestFramework: + def __init__(self): + self.passed_test_suites = 0 + self.failed_test_suites = 0 + self.passed_tests = 0 + self.failed_tests = 0 + self.start_time = None + self.stop_on_failure = True + + def has_failed(self) -> bool: + return self.failed_test_suites > 0 + + def run(self, classes: List[type]) -> bool: + self.start_time = time.time() + + for test_class in classes: + with tempfile.TemporaryDirectory() as tmpdirname: + original_cwd = os.getcwd() + os.chdir(tmpdirname) + + try: + if self.__run(test_class): + self.failed_test_suites += 1 + else: + self.passed_test_suites += 1 + except Exception as e: + self.failed_test_suites += 1 + print(f"\n{RED_COLOR}Error: {e}{RESET_COLOR}") + finally: + os.chdir(original_cwd) + + self.__print_summary(round(time.time() - self.start_time, 2)) + return self.has_failed() + + def __run(self, test_class) -> bool: + test_instance = test_class() + test_name = test_instance.__class__.__name__ + test_methods = [m for m in test_instance.__ordered__ if m.startswith("test_")] + + print(f"\nTest Suite: {test_name}") + + fails = 0 + + for method in test_methods: + fails += self.__run_test_method(test_instance, method) + + self.failed_tests += fails + + return fails > 0 + + def __run_test_method(self, test_instance, method: str) -> int: + print(f" Running {method}... \r", end="", flush=True) + + buffer = io.StringIO() + fails = 0 + + try: + t0 = time.time() + + with redirect_stdout(buffer): + getattr(test_instance, method)() + + duration = time.time() - t0 + + self.print_success(f" {method} ({duration * 1000:.2f}ms)") + self.passed_tests += 1 + except Exception as e: + if isinstance(e, TimeoutException): + self.print_failure( + f" {method} (hit execution limit of {e.timeout} seconds)" + ) + + if isinstance(e, UnexpectedOutputException): + self.print_failure( + f' {method} encountered unexpected output: "{e.actual}" when output matching "{e.expected}" was expected' + ) + + if isinstance(e, AssertionError): + self.__handle_assertion_error(t0, method) + + if self.stop_on_failure: + self.__print_buffer_output(buffer) + raise e + + fails += 1 + finally: + self.__print_buffer_output(buffer) + + return fails + + def __handle_assertion_error(self, start_time, method: str): + duration = time.time() - start_time + self.print_failure(f" {method} ({duration * 1000:.2f}ms)") + traceback_output = "".join(traceback.format_tb(sys.exc_info()[2])) + + colored_traceback = "\n".join( + f" {CYAN_COLOR}{line}{RESET_COLOR}" + for line in traceback_output.splitlines() + ) + + print(colored_traceback) + + @staticmethod + def __print_buffer_output(buffer: io.StringIO): + output = buffer.getvalue() + if output: + indented_output = "\n".join(f" {line}" for line in output.splitlines()) + print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}") + print(f"{GRAY_COLOR}{indented_output}{RESET_COLOR}") + print(f" {RED_COLOR}⎯⎯⎯⎯⎯OUTPUT⎯⎯⎯⎯⎯{RESET_COLOR}") + + def __print_summary(self, duration: float): + print(f"\n{WHITE_BOLD}Test Summary{RESET_COLOR}\n") + print( + f" Test Suites: {GREEN_COLOR}{self.passed_test_suites} passed{RESET_COLOR}, {RED_COLOR}{self.failed_test_suites} failed{RESET_COLOR}, {self.passed_test_suites + self.failed_test_suites} total" + ) + print( + f" Tests: {GREEN_COLOR}{self.passed_tests} passed{RESET_COLOR}, {RED_COLOR}{self.failed_tests} failed{RESET_COLOR}, {self.passed_tests + self.failed_tests} total" + ) + print(f" Time: {duration}s\n") + + @staticmethod + def print_failure(add: str): + print(f" {RED_COLOR}✗{RESET_COLOR}{add}", flush=True) + + @staticmethod + def print_success(add: str): + print(f" {GREEN_COLOR}✓{RESET_COLOR}{add}", flush=True) + + +class BenBot: + def __init__(self, path: str, args: List[str] = []): + self.path = path + self.process = None + self.args = args + self.output = [] + + self.start() + + def __del__(self): + self.quit() + assert self.close() == 0 + + def _check_process_alive(self): + if not self.process or self.process.poll() is not None: + print("\n".join(self.output)) + raise RuntimeError("BenBot process has terminated") + + def start(self): + self.process = subprocess.Popen( + [self.path] + self.args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + ) + + def setoption(self, name: str, value: str): + self.send_command(f"setoption name {name} value {value}") + + def send_command(self, command: str): + if not self.process: + raise RuntimeError("BenBot process is completed or not started") + + self._check_process_alive() + + self.process.stdin.write(command + "\n") + self.process.stdin.flush() + + @timeout_decorator(MAX_TIMEOUT) + def equals(self, expected_output: str): + for line in self.readline(): + if line == expected_output: + return + + @timeout_decorator(MAX_TIMEOUT) + def expect(self, expected_output: str): + for line in self.readline(): + if fnmatch.fnmatch(line, expected_output): + return + + @timeout_decorator(MAX_TIMEOUT) + def contains(self, expected_output: str): + for line in self.readline(): + if expected_output in line: + return + + @timeout_decorator(MAX_TIMEOUT) + def starts_with(self, expected_output: str): + for line in self.readline(): + if line.startswith(expected_output): + return + + @timeout_decorator(MAX_TIMEOUT) + def check_output(self, callback): + if not callback: + raise ValueError("Callback function is required") + + for line in self.readline(): + if callback(line): + return + + @timeout_decorator(MAX_TIMEOUT) + def expect_for_line_matching(self, line_match: str, expected: str): + for line in self.readline(): + if fnmatch.fnmatch(line, line_match): + if fnmatch.fnmatch(line, expected): + break + else: + raise UnexpectedOutputException(line, expected) + + def readline(self): + if not self.process: + raise RuntimeError("BenBot process is completed or not started") + + while True: + self._check_process_alive() + line = self.process.stdout.readline().strip() + self.output.append(line) + + yield line + + def clear_output(self): + self.output = [] + + def get_output(self) -> List[str]: + return self.output + + def quit(self): + if self.process: + self.send_command("quit") + + def close(self): + if self.process: + self.process.stdin.close() + self.process.stdout.close() + return self.process.wait() + + return 0