Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions tests/test_node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import itertools
import typing
from heapq import heappop, heappush
from queue import Queue

import pytest

Expand Down Expand Up @@ -86,13 +88,202 @@ def test_is_successor_of():
game.root.is_successor_of(game.players[0])


# OLD TEST SUITE
def test_is_subgame_root():
"""Test whether nodes are correctly labeled as roots of proper subgames."""
game = games.read_from_file("basic_extensive_game.efg")
assert game.root.is_subgame_root
assert not game.root.children[0].is_subgame_root


# NEW TEST SUITE
# connections between the legacy code and its refactoring (Python prototype below)
#
# helper function: a brute-force absent-minded checker
def is_absent_minded(infoset: gbt.Infoset) -> bool:
"""
Checks if an information set is absent-minded by seeing if it contains
a pair of nodes where one is a successor of the other.
"""
members = list(infoset.members)
if len(members) < 2:
return False

for n1, n2 in itertools.combinations(members, 2):
if n1.is_successor_of(n2) or n2.is_successor_of(n1):
return True

return False


# The Python prototype for the new, efficient subgame root finding algorithm.
# These classes are defined here to be tested before being ported to C++.

class MaxPriorityQueue(Queue):
"""Variant of Queue that retrieves open entries in priority order (highest first)."""
def _init(self, maxsize):
self.queue = []

def _qsize(self):
return len(self.queue)

def _put(self, item):
heappush(self.queue, -item)

def _get(self):
return -heappop(self.queue)


class SubgameRootFinder:

def __init__(self):
self.game = None
# two auxiliary fields
self.index_to_node = None
self.node_to_index = None
# infoset-to-subgame root dict
self.infoset_to_roots = None

def _build_mappings(self) -> tuple[list[gbt.Node], dict[gbt.Node, int]]:
index_to_node = list(self.game.nodes)
node_to_index = {node: index for (index, node) in enumerate(self.game.nodes)}
self.index_to_node = index_to_node
self.node_to_index = node_to_index

def _build_layer(self, nodes: set[int]) -> MaxPriorityQueue:
layer = MaxPriorityQueue()
for node in nodes:
layer.put(node)
return layer

def _explore_component(self, start_node: gbt.Node, infoset_to_roots):
visited_infosets = set()
visited_nodes = set() # set-ification of frontier
frontier = MaxPriorityQueue()

while True:

if start_node.infoset not in infoset_to_roots:
if start_node.infoset not in visited_infosets:
visited_infosets.add(start_node.infoset)
for member in start_node.infoset.members:
if member != start_node:
frontier.put(self.node_to_index[member])
visited_nodes.add(member)
if (
not frontier.empty()
and start_node.parent
and start_node.parent not in visited_nodes
):
frontier.put(self.node_to_index[start_node.parent])
visited_nodes.add(start_node.parent)
if frontier.empty():
for infoset in visited_infosets:
infoset_to_roots[infoset] = start_node
break

else:
reroot = infoset_to_roots[start_node.infoset]
if reroot not in visited_nodes:
frontier.put(self.node_to_index[reroot])
visited_nodes.add(reroot)
for (infoset, root) in infoset_to_roots.items():
if root == reroot:
visited_infosets.add(infoset)
infoset_to_roots = {
infoset: root for infoset, root in infoset_to_roots.items() if root != reroot
}

start_node = self.index_to_node[frontier.get()]

return infoset_to_roots

def _find_roots_layer(
self, layer: MaxPriorityQueue, infoset_to_roots: dict[gbt.Infoset, gbt.Node]
):
while not layer.empty():
node = self.index_to_node[layer.get()]
# check if the node's infoset was encountered and recorded in I2R
if node.infoset in infoset_to_roots:
continue
infoset_to_roots = self._explore_component(node, infoset_to_roots)
return infoset_to_roots

def find_roots(self, game: gbt.Game) -> dict[gbt.Infoset, gbt.Node]:
self.game = game
self._build_mappings()
infoset_to_roots = {}
leaves = {node for node in game.nodes if node.is_terminal}
# initialisation with preterminal decision nodes
# this can be initialised with members of min infosets; checking this may be an overkill?
exploration_layer = self._build_layer({self.node_to_index[leaf.parent] for leaf in leaves})

while not exploration_layer.empty():
infoset_to_roots_old = infoset_to_roots.copy()
infoset_to_roots = self._find_roots_layer(exploration_layer, infoset_to_roots)
exploration_layer = self._build_layer({
self.node_to_index[node.parent]
for node in infoset_to_roots.values()
if node.parent and node not in infoset_to_roots_old.values()
})
self.infoset_to_roots = infoset_to_roots
return infoset_to_roots


@pytest.mark.parametrize(
"game_file",
[
# Standard games where both algorithms are expected to return the same set of roots.
"basic_extensive_game.efg",
"e01.efg",
"e02.efg",
"cent3.efg",
"wichardt.efg",
# Games with absent-mindedness where the new algorithm correctly treats this property.
"noPR-AM-driver-one-player.efg",
"noPR-action-AM-two-hops.efg",
],
)
def test_subgame_root_finder_consistency_with_legacy(game_file: str):
"""
Verifies the new SubgameRootFinder against the legacy is_subgame_root.

This test ensures two conditions hold:
1. The set of the legacy roots is a subset of the new algorithm's roots.
2. Any root found by the new algorithm but not by the legacy one must be
a member of an absent-minded information set.
"""
game = games.read_from_file(game_file)

# --- Get the two sets of subgame roots ---

# Legacy roots.
legacy_roots = {node for node in game.nodes if node.is_subgame_root}

# New roots: the Python prototype algorithm.
finder = SubgameRootFinder()
finder.find_roots(game)
new_roots = set(finder.infoset_to_roots.values())

# --- Step 1: Superset Check ---
# Verifies that the new algorithm finds everything the old one did.
assert legacy_roots <= new_roots, (
"The new algorithm failed to find all roots identified by the legacy method."
)

# --- Step 2: Check the "Extra" Roots ---
# Find the roots that are unique to the new algorithm.
extra_roots = new_roots.difference(legacy_roots)

# Verify that every one of these extra roots is part of an absent-minded
# infoset, explicitly excluded by the legacy algorithm.
for root in extra_roots:
assert is_absent_minded(root.infoset), (
f"New algorithm found an extra root {root} that was excluded by the "
f"legacy method, but its infoset {root.infoset} is not absent-minded."
)


def test_append_move_error_player_actions():
"""Test to ensure there are actions when appending with a player"""
game = games.read_from_file("basic_extensive_game.efg")
Expand Down
Loading