From d389b6e877c18123c356096cf751107a55da1705 Mon Sep 17 00:00:00 2001 From: "Keyu(Frank) He" Date: Sun, 21 Sep 2025 01:10:35 -0400 Subject: [PATCH 01/23] werewolf game in progress with minor bugs, will fix in future iterations --- .../experimental/werewolves/game_rules.json | 215 ++++ examples/experimental/werewolves/main.py | 255 +++++ .../experimental/werewolves/role_actions.json | 75 ++ examples/experimental/werewolves/roster.json | 61 ++ sotopia/database/persistent_profile.py | 4 + sotopia/envs/__init__.py | 3 +- sotopia/envs/social_game.py | 933 ++++++++++++++++++ sotopia/samplers/uniform_sampler.py | 16 +- 8 files changed, 1559 insertions(+), 3 deletions(-) create mode 100644 examples/experimental/werewolves/game_rules.json create mode 100644 examples/experimental/werewolves/main.py create mode 100644 examples/experimental/werewolves/role_actions.json create mode 100644 examples/experimental/werewolves/roster.json create mode 100644 sotopia/envs/social_game.py diff --git a/examples/experimental/werewolves/game_rules.json b/examples/experimental/werewolves/game_rules.json new file mode 100644 index 000000000..02a602519 --- /dev/null +++ b/examples/experimental/werewolves/game_rules.json @@ -0,0 +1,215 @@ +{ + "initial_phase": "night_werewolves", + "phases": [ + { + "name": "night_werewolves", + "kind": "team_target", + "turn_mode": "simultaneous", + "acting_roles": [ + "Werewolf" + ], + "acting_teams": [ + "Werewolves" + ], + "speech_visibility": "team", + "action_visibility": "team", + "resolution": { + "operation": "store_target", + "state_key": "night_target", + "visibility": "team" + }, + "entry_messages": [ + "Night phase: werewolves pick a target." + ], + "exit_messages": [ + "Werewolves have chosen their move." + ], + "group": "night", + "instructions": [ + "Secret night phase. Only werewolves act here." + ], + "role_instructions": { + "Werewolf": [ + "Coordinate quietly with packmates and issue 'kill NAME'." + ] + } + }, + { + "name": "night_seer", + "kind": "single_target", + "turn_mode": "single", + "acting_roles": [ + "Seer" + ], + "speech_visibility": "private", + "action_visibility": "private", + "resolution": { + "operation": "seer_inspect", + "visibility": "private" + }, + "entry_messages": [ + "Seer, choose someone to inspect." + ], + "exit_messages": [ + "Seer's vision is complete." + ], + "group": "night", + "instructions": [ + "Seer takes a private action." + ], + "role_instructions": { + "Seer": [ + "Use 'inspect NAME' to learn their alignment." + ] + } + }, + { + "name": "night_witch", + "kind": "single_target", + "turn_mode": "single", + "acting_roles": [ + "Witch" + ], + "speech_visibility": "private", + "action_visibility": "private", + "resolution": { + "operation": "witch_phase", + "visibility": "private" + }, + "entry_messages": [ + "Witch, decide to save, poison, or pass." + ], + "exit_messages": [ + "Witch phase ends." + ], + "group": "night", + "instructions": [ + "Witch decides whether to intervene." + ], + "role_instructions": { + "Witch": [ + "Choose 'save NAME', 'poison NAME', or 'pass'. Each potion may be used once." + ] + } + }, + { + "name": "dawn_report", + "kind": "announcement", + "turn_mode": "simultaneous", + "resolution": { + "operation": "resolve_night", + "visibility": "public" + }, + "entry_messages": [ + "Dawn report:" + ], + "exit_messages": [], + "group": "night", + "instructions": [ + "Public summary of night outcomes." + ], + "role_instructions": {} + }, + { + "name": "day_discussion", + "kind": "discussion", + "turn_mode": "round-robin", + "acting_roles": [ + "Villager", + "Seer", + "Witch", + "Werewolf" + ], + "max_cycles": 2, + "max_turns": 12, + "speech_visibility": "public", + "action_visibility": "public", + "resolution": { + "operation": "noop" + }, + "entry_messages": [ + "Day discussion starts. Speak in turn." + ], + "exit_messages": [ + "Discussion ends." + ], + "group": "day", + "instructions": [ + "Each villager speaks in turn. Share concise reasoning tied to observations." + ], + "role_instructions": {} + }, + { + "name": "day_vote", + "kind": "vote", + "turn_mode": "simultaneous", + "acting_roles": [ + "Villager", + "Seer", + "Witch", + "Werewolf" + ], + "speech_visibility": "hidden", + "action_visibility": "public", + "resolution": { + "operation": "vote", + "visibility": "public" + }, + "entry_messages": [ + "Voting phase: use 'vote NAME' or 'vote none'." + ], + "exit_messages": [ + "Votes are tallied." + ], + "group": "day", + "instructions": [ + "Voting phase: respond with action 'vote NAME' or 'vote none'." + ], + "role_instructions": {} + }, + { + "name": "twilight_execution", + "kind": "announcement", + "turn_mode": "simultaneous", + "resolution": { + "operation": "post_vote_cleanup", + "visibility": "public" + }, + "entry_messages": [ + "Execution results:" + ], + "exit_messages": [ + "Night returns." + ], + "group": "day", + "instructions": [ + "Resolve the vote and announce results." + ], + "role_instructions": {} + } + ], + "phase_transitions": { + "night_werewolves": "night_seer", + "night_seer": "night_witch", + "night_witch": "dawn_report", + "dawn_report": "day_discussion", + "day_discussion": "day_vote", + "day_vote": "twilight_execution", + "twilight_execution": "night_werewolves" + }, + "end_conditions": [ + { + "operation": "team_eliminated", + "team": "Werewolves", + "winner": "Villagers", + "message": "[God] Villagers win; no werewolves remain." + }, + { + "operation": "parity", + "team": "Werewolves", + "other_team": "Villagers", + "winner": "Werewolves", + "message": "[God] Werewolves win; they now match the village." + } + ] +} diff --git a/examples/experimental/werewolves/main.py b/examples/experimental/werewolves/main.py new file mode 100644 index 000000000..50b9422c6 --- /dev/null +++ b/examples/experimental/werewolves/main.py @@ -0,0 +1,255 @@ +"""Launcher for the Duskmire Werewolves social game scenario.""" + +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path +from typing import Any, Dict, List + +import redis + +from sotopia.agents import LLMAgent +from sotopia.database.persistent_profile import ( + AgentProfile, + EnvironmentProfile, + RelationshipType, +) +from sotopia.envs import SocialGameEnv +from sotopia.envs.evaluators import ( + EpisodeLLMEvaluator, + EvaluationForAgents, + RuleBasedTerminatedEvaluator, +) +from sotopia.server import arun_one_episode +from sotopia.database import SotopiaDimensions + +BASE_DIR = Path(__file__).resolve().parent +ROLE_ACTIONS_PATH = BASE_DIR / "role_actions.json" +RULEBOOK_PATH = BASE_DIR / "game_rules.json" +ROSTER_PATH = BASE_DIR / "roster.json" + +os.environ.setdefault("REDIS_OM_URL", "redis://:@localhost:6379") +redis.Redis(host="localhost", port=6379) + +COMMON_GUIDANCE = ( + "During your turn you must respond. If 'action' is available, use commands like 'kill NAME', " + "'inspect NAME', 'save NAME', 'poison NAME', or 'vote NAME'. Werewolf night speech is private to the pack. " + "Day discussion is public. Voting requires an 'action' beginning with 'vote'." +) + + +def load_json(path: Path) -> Dict[str, Any]: + return json.loads(path.read_text()) + + +def ensure_agent(player: Dict[str, Any]) -> AgentProfile: + try: + profile = AgentProfile.find( + AgentProfile.first_name == player["first_name"], + AgentProfile.last_name == player["last_name"], + ).all()[0] + return profile # type: ignore[return-value] + except IndexError: + profile = AgentProfile( + first_name=player["first_name"], + last_name=player["last_name"], + age=player.get("age", 30), + occupation="", + gender="", + gender_pronoun=player.get("pronouns", "they/them"), + public_info="", + personality_and_values="", + decision_making_style="", + secret=player.get("secret", ""), + ) + profile.save() + return profile + + +def build_agent_goal(player: Dict[str, Any], role_prompt: str) -> str: + return ( + f"You are {player['first_name']} {player['last_name']}, publicly known only as a villager.\n" + f"Primary directives: {player['goal']}\n" + f"Role guidance: {role_prompt}\n" + f"System constraints: {COMMON_GUIDANCE}" + ) + + +def prepare_scenario() -> tuple[EnvironmentProfile, List[AgentProfile], Dict[str, str]]: + role_actions = load_json(ROLE_ACTIONS_PATH) + roster = load_json(ROSTER_PATH) + + agents: List[AgentProfile] = [] + agent_goals: List[str] = [] + role_assignments: Dict[str, str] = {} + + for player in roster["players"]: + profile = ensure_agent(player) + agents.append(profile) + full_name = f"{player['first_name']} {player['last_name']}" + role = player["role"] + role_prompt = role_actions["roles"][role]["goal_prompt"] + agent_goals.append(build_agent_goal(player, role_prompt)) + role_assignments[full_name] = role + + scenario_text = ( + roster["scenario"] + + " Werewolves must be eliminated before they achieve parity with villagers." + ) + + env_profile = EnvironmentProfile( + scenario=scenario_text, + agent_goals=agent_goals, + relationship=RelationshipType.acquaintance, + game_metadata={ + "mode": "social_game", + "rulebook_path": str(RULEBOOK_PATH), + "actions_path": str(ROLE_ACTIONS_PATH), + "role_assignments": role_assignments, + }, + tag="werewolves", + ) + env_profile.save() + return env_profile, agents, role_assignments + + +def build_environment( + env_profile: EnvironmentProfile, + role_assignments: Dict[str, str], + model_name: str, +) -> SocialGameEnv: + return SocialGameEnv( + env_profile=env_profile, + rulebook_path=str(RULEBOOK_PATH), + actions_path=str(ROLE_ACTIONS_PATH), + role_assignments=role_assignments, + model_name=model_name, + action_order="round-robin", + evaluators=[RuleBasedTerminatedEvaluator(max_turn_number=40, max_stale_turn=2)], + terminal_evaluators=[ + EpisodeLLMEvaluator( + model_name, + EvaluationForAgents[SotopiaDimensions], + ) + ], + ) + + +def create_agents( + agent_profiles: List[AgentProfile], + env_profile: EnvironmentProfile, + model_names: List[str], +) -> List[LLMAgent]: + agents: List[LLMAgent] = [] + for profile, model_name, goal in zip( + agent_profiles, + model_names, + env_profile.agent_goals, + strict=True, + ): + agent = LLMAgent(agent_profile=profile, model_name=model_name) + agent.goal = goal + agents.append(agent) + return agents + + +def summarize_phase_log(phase_log: List[Dict[str, Any]]) -> None: + if not phase_log: + print("\nNo structured events recorded.") + return + + print("\nTimeline by Phase") + print("=" * 60) + + last_label: str | None = None + for entry in phase_log: + phase_name = entry["phase"] + meta = entry.get("meta", {}) + group = meta.get("group") + cycle = meta.get("group_cycle") + stage = meta.get("group_stage") + title = phase_name.replace("_", " ").title() + if group: + group_label = group.replace("_", " ").title() + if cycle and stage: + label = f"{group_label} {cycle}.{stage} – {title}" + elif cycle: + label = f"{group_label} {cycle} – {title}" + else: + label = f"{group_label}: {title}" + else: + label = title + + if label != last_label: + print(f"\n[{label}]") + last_label = label + instructions = entry.get("instructions", []) + for info_line in instructions: + print(f" Info: {info_line}") + role_instr = entry.get("role_instructions", {}) + for role, lines in role_instr.items(): + for line in lines: + print(f" Role {role}: {line}") + + for msg in entry.get("public", []): + print(f" Public: {msg}") + for team, messages in entry.get("team", {}).items(): + for msg in messages: + print(f" Team ({team}) private: {msg}") + for agent, messages in entry.get("private", {}).items(): + for msg in messages: + print(f" Private to {agent}: {msg}") + for actor, action in entry.get("actions", {}).items(): + print( + f" Action logged: {actor} -> {action['action_type']} {action['argument']}" + ) + + +def print_roster(role_assignments: Dict[str, str]) -> None: + print("Participants & roles:") + for name, role in role_assignments.items(): + print(f" - {name}: {role}") + + +async def main() -> None: + env_profile, agent_profiles, role_assignments = prepare_scenario() + env_model = "gpt-4o-mini" + agent_model_list = [ + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + ] + + env = build_environment(env_profile, role_assignments, env_model) + agents = create_agents(agent_profiles, env_profile, agent_model_list) + + print("🌕 Duskmire Werewolves — Structured Social Game") + print("=" * 60) + print_roster(role_assignments) + print("=" * 60) + + await arun_one_episode( + env=env, + agent_list=agents, + omniscient=False, + script_like=False, + json_in_script=False, + tag=None, + push_to_db=False, + ) + + summarize_phase_log(env.phase_log) + + if env._winner_payload: # noqa: SLF001 (internal inspection for demo) + print("\nGame Result:") + print(f"Winner: {env._winner_payload['winner']}") + print(f"Reason: {env._winner_payload['message']}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/experimental/werewolves/role_actions.json b/examples/experimental/werewolves/role_actions.json new file mode 100644 index 000000000..2c88851a9 --- /dev/null +++ b/examples/experimental/werewolves/role_actions.json @@ -0,0 +1,75 @@ +{ + "roles": { + "Villager": { + "name": "Villager", + "team": "Villagers", + "description": "Ordinary resident with no night power but vital voice in daytime debates.", + "goal_prompt": "Keep sharp notes about player behaviour and vote to execute suspected werewolves each day.", + "default_actions": ["speak"], + "phase_actions": { + "night_werewolves": ["none"], + "night_seer": ["none"], + "night_witch": ["none"], + "dawn_report": ["none"], + "day_discussion": ["speak"], + "day_vote": ["action"], + "twilight_execution": ["none"] + }, + "initial_state": {} + }, + "Seer": { + "name": "Seer", + "team": "Villagers", + "description": "Mystic who divines alignments during the night.", + "goal_prompt": "Inspect one player each night using an action like 'inspect NAME'; leak findings strategically without exposing yourself too early.", + "default_actions": ["speak"], + "phase_actions": { + "night_werewolves": ["none"], + "night_seer": ["action"], + "night_witch": ["none"], + "dawn_report": ["none"], + "day_discussion": ["speak"], + "day_vote": ["action"], + "twilight_execution": ["none"] + }, + "initial_state": {} + }, + "Witch": { + "name": "Witch", + "team": "Villagers", + "description": "Potion expert who may save one player per game and poison one player per game during the night.", + "goal_prompt": "During your witch phase, decide whether to 'save NAME', 'poison NAME', or pass. Use your limited potions wisely to keep villagers alive and remove wolves when confident.", + "default_actions": ["speak"], + "phase_actions": { + "night_werewolves": ["none"], + "night_seer": ["none"], + "night_witch": ["action"], + "dawn_report": ["none"], + "day_discussion": ["speak"], + "day_vote": ["action"], + "twilight_execution": ["none"] + }, + "initial_state": { + "save_available": true, + "poison_available": true + } + }, + "Werewolf": { + "name": "Werewolf", + "team": "Werewolves", + "description": "Predator hiding among villagers, coordinating nightly kills and sowing mistrust by day.", + "goal_prompt": "Confer quietly with fellow wolves at night. Use actions like 'kill NAME' to propose a victim. During the day, blend in while pushing misdirection.", + "default_actions": ["speak"], + "phase_actions": { + "night_werewolves": ["speak", "action"], + "night_seer": ["none"], + "night_witch": ["none"], + "dawn_report": ["none"], + "day_discussion": ["speak"], + "day_vote": ["action"], + "twilight_execution": ["none"] + }, + "initial_state": {} + } + } +} diff --git a/examples/experimental/werewolves/roster.json b/examples/experimental/werewolves/roster.json new file mode 100644 index 000000000..10aa70e57 --- /dev/null +++ b/examples/experimental/werewolves/roster.json @@ -0,0 +1,61 @@ +{ + "scenario": "In Duskmire, six villagers gather each night to expose the hidden werewolves among them before the pack reaches equal numbers.", + "players": [ + { + "first_name": "Aurora", + "last_name": "Harper", + "role": "Villager", + "public_role": "Villager", + "age": 54, + "pronouns": "she/her", + "goal": "Keep discussion orderly and support executions only when evidence feels solid." + }, + { + "first_name": "Bram", + "last_name": "Nightshade", + "role": "Werewolf", + "public_role": "Villager", + "age": 33, + "pronouns": "he/him", + "goal": "Blend in with confident speech while steering suspicion toward ordinary villagers.", + "secret": "You are a werewolf working with Dorian. Coordinate night kills." + }, + { + "first_name": "Celeste", + "last_name": "Moonseer", + "role": "Seer", + "public_role": "Villager", + "age": 29, + "pronouns": "she/her", + "goal": "Inspect one player per night and nudge the village toward the wolves." + }, + { + "first_name": "Dorian", + "last_name": "Blackwood", + "role": "Werewolf", + "public_role": "Villager", + "age": 38, + "pronouns": "he/him", + "goal": "Support Bram's stories and pressure outspoken villagers into missteps.", + "secret": "You are a werewolf working with Bram. Coordinate night kills." + }, + { + "first_name": "Elise", + "last_name": "Farrow", + "role": "Witch", + "public_role": "Villager", + "age": 41, + "pronouns": "she/her", + "goal": "Use your save and poison sparingly; protect confirmed villagers and strike when a wolf is exposed." + }, + { + "first_name": "Finn", + "last_name": "Alder", + "role": "Villager", + "public_role": "Villager", + "age": 36, + "pronouns": "he/him", + "goal": "Track inconsistencies and rally the town to execute the most suspicious player each day." + } + ] +} diff --git a/sotopia/database/persistent_profile.py b/sotopia/database/persistent_profile.py index ab2f78fcb..23e1871e9 100644 --- a/sotopia/database/persistent_profile.py +++ b/sotopia/database/persistent_profile.py @@ -88,6 +88,10 @@ class BaseEnvironmentProfile(BaseModel): agent_constraint: list[list[str]] | None = Field( default_factory=lambda: None, ) + game_metadata: dict[str, Any] | None = Field( + default_factory=lambda: None, + description="Optional metadata for structured social games (rulebooks, config paths, etc.).", + ) tag: str = Field( index=True, default_factory=lambda: "", diff --git a/sotopia/envs/__init__.py b/sotopia/envs/__init__.py index fa56ad757..30b8d8a37 100644 --- a/sotopia/envs/__init__.py +++ b/sotopia/envs/__init__.py @@ -1,3 +1,4 @@ from .parallel import ParallelSotopiaEnv +from .social_game import SocialGameEnv -__all__ = ["ParallelSotopiaEnv"] +__all__ = ["ParallelSotopiaEnv", "SocialGameEnv"] diff --git a/sotopia/envs/social_game.py b/sotopia/envs/social_game.py new file mode 100644 index 000000000..926e4ffc8 --- /dev/null +++ b/sotopia/envs/social_game.py @@ -0,0 +1,933 @@ +"""Social game environment that reads its rulebook and action space from JSON.""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Optional, Sequence + +from pydantic import BaseModel, Field, RootModel, ValidationError + +from sotopia.envs.parallel import ParallelSotopiaEnv, render_text_for_agent +from sotopia.messages import AgentAction, Observation, SimpleMessage + + +class RoleActionConfig(BaseModel): + """Declared abilities and messaging semantics for a specific role.""" + + name: str + team: str + description: str = "" + goal_prompt: str = "" + default_actions: list[str] = Field(default_factory=lambda: ["speak", "action"]) + phase_actions: dict[str, list[str]] = Field(default_factory=dict) + initial_state: dict[str, Any] = Field(default_factory=dict) + allow_team_private_speech: bool = False + allow_role_private_speech: bool = False + + +class RoleActionLibrary(RootModel[dict[str, RoleActionConfig]]): + """Pydantic wrapper for mapping roles to role metadata.""" + + def team_for_role(self, role: str) -> str: + return self.root[role].team + + +class PhaseResolution(BaseModel): + operation: str = Field( + default="noop", + description="Name of the builtin resolution handler to invoke at phase end.", + ) + state_key: str | None = None + visibility: str = Field( + default="public", + description="Default visibility for resolution feedback.", + ) + + +class PhaseDefinition(BaseModel): + name: str + kind: str = Field( + default="discussion", + description="Macro describing how the phase behaves (discussion, team_target, vote, single_target, announcement).", + ) + group: str | None = Field( + default=None, + description="Optional label used to cluster phases into higher-level cycles (e.g., 'night', 'day').", + ) + turn_mode: str = Field( + default="round-robin", + description="round-robin => sequential actors, simultaneous => everyone at once, single => one actor only.", + ) + acting_roles: list[str] | None = None + acting_teams: list[str] | None = None + max_cycles: int = Field( + default=1, + description="Number of complete round-robin passes required before the phase advances.", + ) + max_turns: int | None = Field( + default=None, + description="Optional cap on total turns inside the phase (overrides max_cycles when smaller).", + ) + speech_visibility: str = Field( + default="public", + description="Where speech is visible ('public', 'team', 'private', 'hidden').", + ) + action_visibility: str = Field( + default="public", + description="Where action outcomes are visible ('public', 'team', 'private', 'hidden').", + ) + instructions: list[str] = Field( + default_factory=list, + description="General prompts injected into agent observations for this phase.", + ) + role_instructions: dict[str, list[str]] = Field( + default_factory=dict, + description="Optional role-specific prompts keyed by role name.", + ) + resolution: PhaseResolution | None = None + entry_messages: list[str] = Field(default_factory=list) + exit_messages: list[str] = Field(default_factory=list) + description: str = "" + + +class EndConditionDefinition(BaseModel): + operation: str + team: str | None = None + other_team: str | None = None + winner: str | None = None + message: str | None = None + + +class RulebookConfig(BaseModel): + initial_phase: str + phases: list[PhaseDefinition] + phase_transitions: dict[str, str] + end_conditions: list[EndConditionDefinition] = Field(default_factory=list) + max_cycles: int | None = Field( + default=None, + description="Optional safety bound on day/night cycles to prevent infinite games.", + ) + + +@dataclass +class AgentState: + name: str + role: str + team: str + alive: bool = True + attributes: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class PhaseEvents: + public: list[str] = field(default_factory=list) + team: dict[str, list[str]] = field(default_factory=dict) + private: dict[str, list[str]] = field(default_factory=dict) + system: list[str] = field(default_factory=list) + + def extend(self, other: "PhaseEvents") -> None: + self.public.extend(other.public) + for team, messages in other.team.items(): + self.team.setdefault(team, []).extend(messages) + for agent, messages in other.private.items(): + self.private.setdefault(agent, []).extend(messages) + self.system.extend(other.system) + + @classmethod + def phase_entry(cls, phase_name: str, messages: list[str]) -> "PhaseEvents": + events = cls() + for msg in messages: + events.public.append(f"[God] Phase '{phase_name}' begins: {msg}") + if not messages: + events.public.append(f"[God] Phase '{phase_name}' begins.") + return events + + +class GameRulebook: + """Runtime state machine that enforces the JSON described social game.""" + + def __init__(self, rules: RulebookConfig, roles: RoleActionLibrary) -> None: + self.rules = rules + self.roles = roles + self.phase_lookup = {phase.name: phase for phase in rules.phases} + self.agent_states: dict[str, AgentState] = {} + self.agent_name_lookup: dict[str, str] = {} + self.current_phase: str = rules.initial_phase + self.phase_cycle_progress: int = 0 + self.turns_in_phase: int = 0 + self.current_actor_index: int = 0 + self.state_flags: dict[str, Any] = {} + self.group_cycle: dict[str, int] = {} + self.group_stage: dict[str, int] = {} + self.current_phase_meta: dict[str, Any] = {} + self.pending_events: PhaseEvents = PhaseEvents() + + # ------------------------------------------------------------------ + # Initialisation + # ------------------------------------------------------------------ + def assign_agents( + self, + agents: Sequence[str], + role_assignments: dict[str, str], + ) -> None: + self.agent_states = {} + self.agent_name_lookup = {} + for name in agents: + role = role_assignments[name] + role_cfg = self.roles.root.get(role) + if role_cfg is None: + raise ValueError(f"Unknown role '{role}' for agent '{name}'") + attrs = dict(role_cfg.initial_state) + state = AgentState( + name=name, + role=role, + team=role_cfg.team, + alive=True, + attributes=attrs, + ) + self.agent_states[name] = state + self.agent_name_lookup[name.lower()] = name + self.agent_name_lookup[name.split()[0].lower()] = name + + self.current_phase = self.rules.initial_phase + self.phase_cycle_progress = 0 + self.turns_in_phase = 0 + self.current_actor_index = 0 + self.state_flags = { + "day_execution": None, + "night_target": None, + "witch_saved": None, + "witch_poisoned": None, + "seer_result": "", + } + self.group_cycle.clear() + self.group_stage.clear() + self.current_phase_meta = {} + self._register_phase_entry(self.current_phase) + entry_phase = self.phase_lookup[self.current_phase] + self.pending_events = PhaseEvents.phase_entry( + self.current_phase, entry_phase.entry_messages + ) + + # ------------------------------------------------------------------ + # Accessors used by the environment + # ------------------------------------------------------------------ + def alive_agents(self) -> list[str]: + return [name for name, state in self.agent_states.items() if state.alive] + + def active_agents_for_phase(self) -> list[str]: + phase = self.phase_lookup[self.current_phase] + eligible = self._eligible_candidates(phase) + if not eligible: + return [] + if phase.turn_mode == "round-robin": + idx = self.current_actor_index + if idx >= len(eligible): + idx = len(eligible) - 1 + if idx < 0: + idx = 0 + return [eligible[idx]] + return eligible + + def available_actions(self, agent_name: str) -> list[str]: + agent_state = self.agent_states[agent_name] + if not agent_state.alive: + return ["none"] + role_cfg = self.roles.root[agent_state.role] + actions = role_cfg.phase_actions.get( + self.current_phase, role_cfg.default_actions + ) + if "none" not in actions: + actions = list(actions) + ["none"] + return actions + + def collect_pending_events(self) -> PhaseEvents: + events = self.pending_events + self.pending_events = PhaseEvents() + return events + + # ------------------------------------------------------------------ + # Core update logic + # ------------------------------------------------------------------ + def process_actions( + self, actions: dict[str, AgentAction] + ) -> tuple[PhaseEvents, bool, Optional[dict[str, str]]]: + phase = self.phase_lookup[self.current_phase] + acting_agents = self.active_agents_for_phase() + events = PhaseEvents() + + if phase.kind == "announcement": + events.extend(self._resolve_phase(phase, {})) + winner = self._check_end_conditions() + self._schedule_phase_exit(phase) + return events, True, winner + + if not acting_agents: + events.extend(self._resolve_phase(phase, {})) + winner = self._check_end_conditions() + self._schedule_phase_exit(phase) + return events, True, winner + + relevant = { + name: actions.get(name, AgentAction(action_type="none", argument="")) + for name in acting_agents + } + + if phase.turn_mode == "round-robin": + actor = acting_agents[0] + events.extend(self._record_speech(actor, relevant[actor], phase)) + events.extend(self._resolve_phase(phase, {actor: relevant[actor]})) + self._advance_round_robin(phase) + advance = self._should_advance(phase) + else: + for actor, action in relevant.items(): + events.extend(self._record_speech(actor, action, phase)) + events.extend(self._resolve_phase(phase, relevant)) + advance = True + + winner = self._check_end_conditions() + if winner: + self._schedule_phase_exit(phase) + return events, True, winner + + if advance: + self._schedule_phase_exit(phase) + return events, advance, winner + + def start_next_phase(self) -> PhaseEvents: + next_phase = self.rules.phase_transitions.get(self.current_phase) + if next_phase is None: + raise ValueError( + f"No transition defined after phase '{self.current_phase}'" + ) + self.current_phase = next_phase + self.phase_cycle_progress = 0 + self.turns_in_phase = 0 + self.current_actor_index = 0 + self._register_phase_entry(next_phase) + phase_def = self.phase_lookup[next_phase] + entry = PhaseEvents.phase_entry(next_phase, phase_def.entry_messages) + return entry + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + def _phase_group(self, phase: PhaseDefinition) -> str: + if phase.group: + return phase.group + return phase.name + + def _register_phase_entry(self, phase_name: str) -> None: + phase = self.phase_lookup[phase_name] + group = self._phase_group(phase) + previous_group = ( + self.current_phase_meta.get("group") if self.current_phase_meta else None + ) + cycle = self.group_cycle.get(group, 0) + stage = self.group_stage.get(group, 0) + if previous_group != group: + cycle += 1 + stage = 1 + else: + stage += 1 + self.group_cycle[group] = cycle + self.group_stage[group] = stage + self.current_phase_meta = { + "phase": phase_name, + "group": group, + "group_cycle": cycle, + "group_stage": stage, + "display_name": phase.name.replace("_", " ").title(), + } + + def current_phase_metadata(self) -> dict[str, Any]: + return dict(self.current_phase_meta) if self.current_phase_meta else {} + + def _eligible_candidates(self, phase: PhaseDefinition) -> list[str]: + names = [name for name, state in self.agent_states.items() if state.alive] + if phase.acting_roles: + names = [ + name + for name in names + if self.agent_states[name].role in phase.acting_roles + ] + if phase.acting_teams: + names = [ + name + for name in names + if self.agent_states[name].team in phase.acting_teams + ] + return names + + def _record_speech( + self, actor: str, action: AgentAction, phase: PhaseDefinition + ) -> PhaseEvents: + events = PhaseEvents() + if action.action_type not in {"speak", "non-verbal communication"}: + return events + utterance = action.argument.strip() + if not utterance: + return events + line = f'{actor} said: "{utterance}"' + if phase.speech_visibility == "team": + team = self.agent_states[actor].team + events.team.setdefault(team, []).append(line) + elif phase.speech_visibility == "private": + events.private.setdefault(actor, []).append(line) + elif phase.speech_visibility == "hidden": + pass + else: + events.public.append(line) + return events + + def _resolve_phase( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + ) -> PhaseEvents: + if phase.resolution is None: + return PhaseEvents() + handler = getattr(self, f"_resolve_{phase.resolution.operation}", None) + if handler is None: + raise ValueError( + f"Unsupported resolution operation '{phase.resolution.operation}'" + ) + return handler(phase, actions, phase.resolution) + + def _resolve_noop( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + return PhaseEvents() + + def _resolve_store_target( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + target = self._extract_target(actions.values()) + if target: + self.state_flags[resolution.state_key or "night_target"] = target + teams = phase.acting_teams or [self.agent_states[a].team for a in actions] + for team in teams: + events.team.setdefault(team, []).append( + f"[God] Target locked: {target}." + ) + return events + + def _resolve_seer_inspect( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + if not actions: + return events + actor, action = next(iter(actions.items())) + target = self._extract_target([action]) + if not target: + events.private.setdefault(actor, []).append( + "[God] Vision failed: unable to interpret your target." + ) + return events + team = self.agent_states[target].team + message = f"[God] Vision reveals {target} serves team {team}." + events.private.setdefault(actor, []).append(message) + self.state_flags["seer_result"] = message + return events + + def _resolve_witch_phase( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + if not actions: + return events + actor, action = next(iter(actions.items())) + state = self.agent_states[actor] + text = action.argument.lower() + if "save" in text and state.attributes.get("save_available", True): + target = self._extract_target([action]) or self.state_flags.get( + "night_target" + ) + if target: + self.state_flags["witch_saved"] = target + state.attributes["save_available"] = False + events.private.setdefault(actor, []).append( + f"[God] You secretly saved {target} tonight." + ) + if "poison" in text and state.attributes.get("poison_available", True): + target = self._extract_target([action]) + if target: + self.state_flags["witch_poisoned"] = target + state.attributes["poison_available"] = False + events.private.setdefault(actor, []).append( + f"[God] You poisoned {target}." + ) + if not text.strip() or "pass" in text: + events.private.setdefault(actor, []).append( + "[God] You chose to remain idle." + ) + return events + + def _resolve_resolve_night( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + saved = self.state_flags.get("witch_saved") + target = self.state_flags.get("night_target") + poison = self.state_flags.get("witch_poisoned") + casualties: list[str] = [] + if target and target != saved: + casualties.append(target) + if poison and poison not in casualties: + casualties.append(poison) + if not casualties: + events.public.append("[God] Dawn breaks peacefully. No one died.") + for victim in casualties: + if victim in self.agent_states and self.agent_states[victim].alive: + self.agent_states[victim].alive = False + events.public.append(f"[God] {victim} was found dead at dawn.") + self.state_flags["night_target"] = None + self.state_flags["witch_saved"] = None + self.state_flags["witch_poisoned"] = None + self.state_flags["seer_result"] = "" + return events + + def _resolve_vote( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + tally: dict[str, int] = {} + for action in actions.values(): + target = self._extract_target([action]) + if target: + tally[target] = tally.get(target, 0) + 1 + elif "none" in action.argument.lower(): + tally.setdefault("none", 0) + tally["none"] += 1 + if not tally: + events.public.append("[God] No valid votes were cast.") + self.state_flags["day_execution"] = None + return events + winner, votes = max(tally.items(), key=lambda kv: kv[1]) + if winner == "none": + events.public.append("[God] The town decided to stay their hand.") + self.state_flags["day_execution"] = None + return events + if list(tally.values()).count(votes) > 1: + events.public.append("[God] The vote is tied. No execution today.") + self.state_flags["day_execution"] = None + return events + self.state_flags["day_execution"] = winner + events.public.append( + f"[God] Majority condemns {winner}. Execution will happen at twilight." + ) + return events + + def _resolve_post_vote_cleanup( + self, + phase: PhaseDefinition, + actions: dict[str, AgentAction], + resolution: PhaseResolution, + ) -> PhaseEvents: + events = PhaseEvents() + target = self.state_flags.get("day_execution") + if target and target in self.agent_states and self.agent_states[target].alive: + self.agent_states[target].alive = False + team = self.agent_states[target].team + events.public.append( + f"[God] {target} was executed. They belonged to team {team}." + ) + self.state_flags["day_execution"] = None + return events + + def _extract_target(self, actions: Iterable[AgentAction]) -> str | None: + for action in actions: + corpus = f"{action.action_type} {action.argument}".lower() + for name in self.agent_states: + if name.lower() in corpus: + return name + for name in self.agent_states: + first = name.split()[0].lower() + if first in corpus: + return name + return None + + def _advance_round_robin(self, phase: PhaseDefinition) -> None: + base = self._eligible_candidates(phase) + self.turns_in_phase += 1 + if not base: + self.current_actor_index = 0 + return + self.current_actor_index += 1 + if self.current_actor_index >= len(base): + self.phase_cycle_progress += 1 + self.current_actor_index = 0 + + def _should_advance(self, phase: PhaseDefinition) -> bool: + if phase.turn_mode != "round-robin": + return True + base = self._eligible_candidates(phase) + if not base: + return True + if phase.max_turns is not None and self.turns_in_phase >= phase.max_turns: + return True + if self.phase_cycle_progress >= phase.max_cycles: + return True + return False + + def _schedule_phase_exit(self, phase: PhaseDefinition) -> None: + exit_events = PhaseEvents() + for msg in phase.exit_messages: + exit_events.public.append(f"[God] {msg}") + self.pending_events.extend(exit_events) + + def _check_end_conditions(self) -> Optional[dict[str, str]]: + for cond in self.rules.end_conditions: + if cond.operation == "team_eliminated" and cond.team: + alive = sum( + 1 + for state in self.agent_states.values() + if state.alive and state.team == cond.team + ) + if alive == 0: + message = ( + cond.message or f"[God] Team {cond.team} has been eliminated." + ) + return { + "winner": cond.winner or cond.other_team or cond.team, + "message": message, + } + if cond.operation == "parity" and cond.team and cond.other_team: + team_count = sum( + 1 + for state in self.agent_states.values() + if state.alive and state.team == cond.team + ) + other_count = sum( + 1 + for state in self.agent_states.values() + if state.alive and state.team == cond.other_team + ) + if team_count >= other_count: + message = cond.message or ( + f"[God] Parity reached: {cond.team} now matches or exceeds {cond.other_team}." + ) + return { + "winner": cond.winner or cond.team, + "message": message, + } + return None + + +class SocialGameEnv(ParallelSotopiaEnv): + """Environment subclass that enforces multi-phase social game mechanics.""" + + def __init__( + self, + env_profile, + *, + rulebook_path: str, + actions_path: str, + role_assignments: dict[str, str], + **kwargs: Any, + ) -> None: + super().__init__(env_profile=env_profile, **kwargs) + self._rulebook_path = Path(rulebook_path) + self._actions_path = Path(actions_path) + self._role_assignments = role_assignments + self.game_rulebook: GameRulebook | None = None + self._last_events: PhaseEvents = PhaseEvents() + self._winner_payload: dict[str, str] | None = None + self.phase_log: list[dict[str, Any]] = [] + + # ------------------------------------------------------------------ + # Config loading helpers + # ------------------------------------------------------------------ + def _load_configs(self) -> tuple[RulebookConfig, RoleActionLibrary]: + try: + rules = RulebookConfig.model_validate_json(self._rulebook_path.read_text()) + except ValidationError as exc: + raise ValueError(f"Invalid rulebook config: {exc}") from exc + actions_raw = json.loads(self._actions_path.read_text()) + try: + roles = RoleActionLibrary.model_validate(actions_raw["roles"]) + except (KeyError, ValidationError) as exc: + raise ValueError(f"Invalid action-space config: {exc}") from exc + return rules, roles + + # ------------------------------------------------------------------ + # Overrides + # ------------------------------------------------------------------ + def reset( + self, + seed: int | None = None, + options: dict[str, str] | None = None, + agents=None, + omniscient: bool = False, + lite: bool = False, + ) -> dict[str, Observation]: + base_obs = super().reset( + seed=seed, + options=options, + agents=agents, + omniscient=omniscient, + lite=lite, + ) + rules, role_actions = self._load_configs() + self.game_rulebook = GameRulebook(rules, role_actions) + self.game_rulebook.assign_agents(self.agents, self._role_assignments) + self.phase_log = [] + self._apply_action_mask() + self._last_events = self.game_rulebook.collect_pending_events() + self._winner_payload = None + self._record_phase_history( + phase_name=self.game_rulebook.current_phase, + actions={}, + events=self._last_events, + ) + return self._augment_observations(base_obs, append_to_existing=True) + + def _phase_prompt_lines( + self, + *, + agent_name: str, + phase: PhaseDefinition, + acting: bool, + available: list[str], + ) -> list[str]: + assert self.game_rulebook is not None + meta = self.game_rulebook.current_phase_metadata() + group = meta.get("group") + cycle = meta.get("group_cycle") + stage = meta.get("group_stage") + title = phase.name.replace("_", " ").title() + if group: + group_label = group.replace("_", " ").title() + if cycle and stage: + label = f"{group_label} {cycle}.{stage} – {title}" + elif cycle: + label = f"{group_label} {cycle} – {title}" + else: + label = f"{group_label}: {title}" + else: + label = title + lines = [f"[God] Phase: {label}"] + if acting: + lines.append("[God] It is your turn to act in this phase.") + else: + lines.append("[God] You are observing while others act.") + lines.append(f"[God] Available actions right now: {', '.join(available)}") + lines.extend(f"[God] {text}" for text in phase.instructions) + role = self.game_rulebook.agent_states[agent_name].role + for text in phase.role_instructions.get(role, []): + lines.append(f"[God] {text}") + return lines + + def _record_phase_history( + self, + *, + phase_name: str, + actions: dict[str, AgentAction], + events: PhaseEvents, + ) -> None: + if self.game_rulebook is None: + return + if not (events.public or events.team or events.private): + if any(a.action_type != "none" for a in actions.values()): + pass + else: + return + action_summary = { + agent: {"action_type": action.action_type, "argument": action.argument} + for agent, action in actions.items() + if action.action_type != "none" + } + phase_def = ( + self.game_rulebook.phase_lookup.get(phase_name) + if self.game_rulebook + else None + ) + snapshot = { + "phase": phase_name, + "turn": self.turn_number, + "public": list(events.public), + "team": {team: list(msgs) for team, msgs in events.team.items()}, + "private": {agent: list(msgs) for agent, msgs in events.private.items()}, + "actions": action_summary, + "meta": self.game_rulebook.current_phase_metadata() + if self.game_rulebook + else {}, + "instructions": phase_def.instructions if phase_def else [], + "role_instructions": phase_def.role_instructions if phase_def else {}, + } + self.phase_log.append(snapshot) + + def _augment_observations( + self, + baseline: dict[str, Observation], + *, + append_to_existing: bool, + ) -> dict[str, Observation]: + assert self.game_rulebook is not None + acting = set(self.game_rulebook.active_agents_for_phase()) + events = self._last_events + phase_name = self.game_rulebook.current_phase + phase_def = self.game_rulebook.phase_lookup[phase_name] + new_obs: dict[str, Observation] = {} + for idx, agent_name in enumerate(self.agents): + current = baseline[agent_name] + available = ( + self.game_rulebook.available_actions(agent_name) + if agent_name in acting + else ["none"] + ) + phase_lines = self._phase_prompt_lines( + agent_name=agent_name, + phase=phase_def, + acting=agent_name in acting, + available=available, + ) + messages: list[str] = [] + messages.extend(events.public) + team = self.game_rulebook.agent_states[agent_name].team + messages.extend(events.team.get(team, [])) + messages.extend(events.private.get(agent_name, [])) + if not messages: + messages.append("[God] Await instructions from the host.") + segments: list[str] = [] + if append_to_existing: + prefix = current.last_turn.strip() + if prefix: + segments.append(prefix) + segments.extend(phase_lines) + segments.extend(messages) + combined = "\n".join(segment for segment in segments if segment) + new_obs[agent_name] = Observation( + last_turn=render_text_for_agent(combined, agent_id=idx), + turn_number=current.turn_number, + available_actions=available, + ) + return new_obs + + def _create_blank_observations(self) -> dict[str, Observation]: + assert self.game_rulebook is not None + acting = set(self.game_rulebook.active_agents_for_phase()) + blank: dict[str, Observation] = {} + for agent_name in self.agents: + available = ( + self.game_rulebook.available_actions(agent_name) + if agent_name in acting + else ["none"] + ) + blank[agent_name] = Observation( + last_turn="", + turn_number=self.turn_number, + available_actions=available, + ) + return blank + + def _apply_action_mask(self) -> None: + assert self.game_rulebook is not None + acting = set(self.game_rulebook.active_agents_for_phase()) + self.action_mask = [ + agent in acting and self.game_rulebook.agent_states[agent].alive + for agent in self.agents + ] + + async def astep( + self, actions: dict[str, AgentAction] | dict[str, dict[str, int | str]] + ) -> tuple[ + dict[str, Observation], + dict[str, float], + dict[str, bool], + dict[str, bool], + dict[str, dict[Any, Any]], + ]: + assert self.game_rulebook is not None + self._apply_action_mask() + self.turn_number += 1 + prepared = self._coerce_actions(actions) + self.recv_message( + "Environment", SimpleMessage(message=f"Turn #{self.turn_number}") + ) + for agent, action in prepared.items(): + self.recv_message(agent, action) + phase_name = self.game_rulebook.current_phase + events, advance, winner = self.game_rulebook.process_actions(prepared) + exit_events = self.game_rulebook.collect_pending_events() + events.extend(exit_events) + self._record_phase_history( + phase_name=phase_name, + actions=prepared, + events=events, + ) + self._last_events = events + if advance: + next_events = self.game_rulebook.start_next_phase() + self._record_phase_history( + phase_name=self.game_rulebook.current_phase, + actions={}, + events=next_events, + ) + self._last_events.extend(next_events) + self._apply_action_mask() + baseline = self._create_blank_observations() + observations = self._augment_observations(baseline, append_to_existing=False) + rewards = {agent_name: 0 for agent_name in self.agents} + terminated = {agent_name: bool(winner) for agent_name in self.agents} + truncations = {agent_name: False for agent_name in self.agents} + info = { + agent_name: { + "comments": winner["message"] if winner else "", + "complete_rating": 0, + } + for agent_name in self.agents + } + if winner: + self._winner_payload = winner + return observations, rewards, terminated, truncations, info + + def _coerce_actions( + self, actions: dict[str, AgentAction] | dict[str, dict[str, int | str]] + ) -> dict[str, AgentAction]: + prepared: dict[str, AgentAction] = {} + for agent, raw in actions.items(): + if isinstance(raw, AgentAction): + prepared[agent] = raw + else: + idx = int(raw.get("action_type", 0)) + action_type = self.available_action_types[idx] + prepared[agent] = AgentAction( + action_type=action_type, + argument=str(raw.get("argument", "")), + ) + return prepared + + def step( + self, actions: dict[str, AgentAction] | dict[str, dict[str, int | str]] + ) -> tuple[ + dict[str, Observation], + dict[str, float], + dict[str, bool], + dict[str, bool], + dict[str, dict[Any, Any]], + ]: + return asyncio.run(self.astep(actions)) diff --git a/sotopia/samplers/uniform_sampler.py b/sotopia/samplers/uniform_sampler.py index d519eee0d..bcc308ead 100644 --- a/sotopia/samplers/uniform_sampler.py +++ b/sotopia/samplers/uniform_sampler.py @@ -65,8 +65,20 @@ def sample( env_profile = random.choice(self.env_candidates) if isinstance(env_profile, str): env_profile = EnvironmentProfile.get(env_profile) - logger.info("Creating ParallelSotopiaEnv with %s agents", n_agent) - env = ParallelSotopiaEnv(env_profile=env_profile, **env_params) + logger.info("Creating environment with %s agents", n_agent) + game_meta = getattr(env_profile, "game_metadata", None) or {} + if game_meta.get("mode") == "social_game": + from sotopia.envs import SocialGameEnv + + env = SocialGameEnv( + env_profile=env_profile, + rulebook_path=game_meta["rulebook_path"], + actions_path=game_meta["actions_path"], + role_assignments=game_meta["role_assignments"], + **env_params, + ) + else: + env = ParallelSotopiaEnv(env_profile=env_profile, **env_params) agent_profile_candidates = self.agent_candidates if len(agent_profile_candidates) == n_agent: From 8b8850d9d9b265bb2f981a6743150ab120044ce9 Mon Sep 17 00:00:00 2001 From: "Keyu(Frank) He" Date: Sun, 21 Sep 2025 01:27:53 -0400 Subject: [PATCH 02/23] werewolf game in progress contain minor bugs, will fix in future iterations --- examples/experimental/werewolves/main.py | 4 ++-- sotopia/envs/social_game.py | 12 +++++++----- sotopia/samplers/uniform_sampler.py | 1 + uv.lock | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/experimental/werewolves/main.py b/examples/experimental/werewolves/main.py index 50b9422c6..e5f4dd062 100644 --- a/examples/experimental/werewolves/main.py +++ b/examples/experimental/werewolves/main.py @@ -6,7 +6,7 @@ import json import os from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, cast import redis @@ -41,7 +41,7 @@ def load_json(path: Path) -> Dict[str, Any]: - return json.loads(path.read_text()) + return cast(Dict[str, Any], json.loads(path.read_text())) def ensure_agent(player: Dict[str, Any]) -> AgentProfile: diff --git a/sotopia/envs/social_game.py b/sotopia/envs/social_game.py index 926e4ffc8..a0b2b29bc 100644 --- a/sotopia/envs/social_game.py +++ b/sotopia/envs/social_game.py @@ -6,11 +6,13 @@ import json from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Iterable, Optional, Sequence +from typing import Any, Iterable, Optional, Sequence, cast from pydantic import BaseModel, Field, RootModel, ValidationError from sotopia.envs.parallel import ParallelSotopiaEnv, render_text_for_agent +from sotopia.agents.llm_agent import Agents +from sotopia.database import EnvironmentProfile from sotopia.messages import AgentAction, Observation, SimpleMessage @@ -395,7 +397,7 @@ def _resolve_phase( raise ValueError( f"Unsupported resolution operation '{phase.resolution.operation}'" ) - return handler(phase, actions, phase.resolution) + return cast(PhaseEvents, handler(phase, actions, phase.resolution)) def _resolve_noop( self, @@ -642,7 +644,7 @@ class SocialGameEnv(ParallelSotopiaEnv): def __init__( self, - env_profile, + env_profile: EnvironmentProfile, *, rulebook_path: str, actions_path: str, @@ -680,7 +682,7 @@ def reset( self, seed: int | None = None, options: dict[str, str] | None = None, - agents=None, + agents: Agents | None = None, omniscient: bool = False, lite: bool = False, ) -> dict[str, Observation]: @@ -891,7 +893,7 @@ async def astep( self._apply_action_mask() baseline = self._create_blank_observations() observations = self._augment_observations(baseline, append_to_existing=False) - rewards = {agent_name: 0 for agent_name in self.agents} + rewards = {agent_name: 0.0 for agent_name in self.agents} terminated = {agent_name: bool(winner) for agent_name in self.agents} truncations = {agent_name: False for agent_name in self.agents} info = { diff --git a/sotopia/samplers/uniform_sampler.py b/sotopia/samplers/uniform_sampler.py index bcc308ead..38d1585f7 100644 --- a/sotopia/samplers/uniform_sampler.py +++ b/sotopia/samplers/uniform_sampler.py @@ -67,6 +67,7 @@ def sample( env_profile = EnvironmentProfile.get(env_profile) logger.info("Creating environment with %s agents", n_agent) game_meta = getattr(env_profile, "game_metadata", None) or {} + env: ParallelSotopiaEnv if game_meta.get("mode") == "social_game": from sotopia.envs import SocialGameEnv diff --git a/uv.lock b/uv.lock index a0d147290..2e38d6dd9 100644 --- a/uv.lock +++ b/uv.lock @@ -3163,7 +3163,7 @@ requires-dist = [ { name = "google-generativeai", marker = "extra == 'google-generativeai'" }, { name = "groq", marker = "extra == 'groq'" }, { name = "hiredis", specifier = ">=3.0.0" }, - { name = "json-repair", specifier = ">=0.35.0,<0.45.0" }, + { name = "json-repair", specifier = ">=0.35.0,<0.49.0" }, { name = "litellm", specifier = ">=1.65.0" }, { name = "lxml", specifier = ">=4.9.3,<6.0.0" }, { name = "modal", marker = "extra == 'api'" }, From 5a131bc5da06217a85b1f5e1e23c7a8d563a31a6 Mon Sep 17 00:00:00 2001 From: "Keyu(Frank) He" Date: Fri, 10 Oct 2025 13:21:52 -0400 Subject: [PATCH 03/23] werewolf with human player --- .../experimental/werewolves/main_human.py | 1176 +++++++++++++++++ .../experimental/werewolves/player_input.json | 1 + sotopia/agents/llm_agent.py | 79 +- 3 files changed, 1252 insertions(+), 4 deletions(-) create mode 100644 examples/experimental/werewolves/main_human.py create mode 100644 examples/experimental/werewolves/player_input.json diff --git a/examples/experimental/werewolves/main_human.py b/examples/experimental/werewolves/main_human.py new file mode 100644 index 000000000..983c67624 --- /dev/null +++ b/examples/experimental/werewolves/main_human.py @@ -0,0 +1,1176 @@ +"""Launcher for the Duskmire Werewolves social game scenario with human player.""" + +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Union, cast +from datetime import datetime +import webbrowser + +import redis + +from sotopia.agents import LLMAgent, HumanAgent +from sotopia.database.persistent_profile import ( + AgentProfile, + EnvironmentProfile, + RelationshipType, +) +from sotopia.envs import SocialGameEnv +from sotopia.envs.evaluators import ( + EpisodeLLMEvaluator, + EvaluationForAgents, + RuleBasedTerminatedEvaluator, +) +from sotopia.server import arun_one_episode +from sotopia.database import SotopiaDimensions + +BASE_DIR = Path(__file__).resolve().parent +ROLE_ACTIONS_PATH = BASE_DIR / "role_actions.json" +RULEBOOK_PATH = BASE_DIR / "game_rules.json" +ROSTER_PATH = BASE_DIR / "roster.json" +PLAYER_VIEW_HTML = BASE_DIR / "player_view.html" + +os.environ.setdefault("REDIS_OM_URL", "redis://:@localhost:6379") +redis.Redis(host="localhost", port=6379) + +COMMON_GUIDANCE = ( + "During your turn you must respond. If 'action' is available, use commands like 'kill NAME', " + "'inspect NAME', 'save NAME', 'poison NAME', or 'vote NAME'. Werewolf night speech is private to the pack. " + "Day discussion is public. Voting requires an 'action' beginning with 'vote'." +) + + +class PlayerView: + """Manages the HTML output for player-visible game information.""" + + def __init__( + self, + output_path: Path, + player_name: str, + role: str, + all_player_names: List[str], + ): + self.output_path = output_path + self.player_name = player_name + self.role = role + self.all_player_names = all_player_names + self.events: List[str] = [] + self.input_file = output_path.parent / "player_input.json" + self.waiting_for_input = False + self.available_actions: List[str] = [] + self._initialize_html() + + def _initialize_html(self) -> None: + """Create the initial HTML file.""" + player_list = "\n".join([f"
  • {name}
  • " for name in self.all_player_names]) + + html = f""" + + + + Duskmire Werewolves - {self.player_name} + + + + + +
    +
    +

    🌕 Duskmire Werewolves

    +
    + You are: {self.player_name} | Role: {self.role} +
    +
    +
    +
    +
    {datetime.now().strftime('%H:%M:%S')}
    +
    Game starting...
    +
    +
    +
    + + + +""" + self.output_path.write_text(html) + + def add_event(self, event_type: str, content: str, speaker: str = "") -> None: + """Add a new event to the player view.""" + timestamp = datetime.now().strftime("%H:%M:%S") + + speaker_html = f'
    {speaker}
    ' if speaker else "" + + event_html = f""" +
    +
    {timestamp}
    + {speaker_html} +
    {content}
    +
    """ + + self.events.append(event_html) + self._update_html() + + def enable_input(self, available_actions: List[str]) -> None: + """Enable the input controls with available actions.""" + self.waiting_for_input = True + self.available_actions = available_actions + # Clear any previous input + if self.input_file.exists(): + self.input_file.unlink() + self._update_html() + + def wait_for_input(self) -> dict[str, str]: + """Wait for player input from the HTML interface.""" + import time + + while not self.input_file.exists(): + time.sleep(0.5) + + try: + data = json.loads(self.input_file.read_text()) + self.waiting_for_input = False + self._update_html() + return data + except (json.JSONDecodeError, KeyError): + # If file is corrupt, wait and try again + time.sleep(0.5) + return self.wait_for_input() + + def _update_html(self) -> None: + """Update the HTML file with all events and dynamic input state.""" + events_html = "\n".join(self.events) + player_list = "\n".join([f"
  • {name}
  • " for name in self.all_player_names]) + + # Generate action buttons HTML based on available actions + action_buttons_html = "" + input_display = "none" + status_text = "Waiting for your turn..." + input_box_class = "" + + if self.waiting_for_input and self.available_actions: + input_display = "block" + status_text = "🎮 YOUR TURN! Select an action:" + input_box_class = "waiting" + action_buttons = [] + for action in self.available_actions: + action_label = action.replace("_", " ").title() + action_buttons.append( + f'' + ) + action_buttons_html = "\n".join(action_buttons) + + html = f""" + + + + Duskmire Werewolves - {self.player_name} + + + + + +
    +
    +

    🌕 Duskmire Werewolves

    +
    + You are: {self.player_name} | Role: {self.role} +
    +
    +
    +{events_html} +
    +
    + + + +""" + self.output_path.write_text(html) + + +class PlayerViewHumanAgent(HumanAgent): + """HumanAgent that also writes to PlayerView HTML and reads input from it.""" + + def __init__( + self, + agent_name: str | None = None, + uuid_str: str | None = None, + agent_profile: Any | None = None, + available_agent_names: list[str] | None = None, + player_view: PlayerView | None = None, + ) -> None: + super().__init__( + agent_name=agent_name, + uuid_str=uuid_str, + agent_profile=agent_profile, + available_agent_names=available_agent_names, + ) + self.player_view = player_view + + async def aact(self, obs: Any) -> Any: + """Act and update player view with relevant information.""" + from sotopia.messages import AgentAction + + self.recv_message("Environment", obs) + + # Parse observation to extract player-visible information + if self.player_view and hasattr(obs, "to_natural_language"): + obs_text = obs.to_natural_language() + + # Parse line by line to avoid duplicates and properly categorize events + lines = obs_text.split("\n") + for line in lines: + line = line.strip() + if not line: + continue + + # Check for game over / winner announcement + if ( + "GAME OVER" in line + or ("Werewolves win" in line) + or ("Villagers win" in line) + ): + self.player_view.add_event("phase", f"🎮 GAME OVER: {line}") + + # Check for voting results and eliminations + elif ( + "voted for" in line + or "has been eliminated" in line + or "was eliminated" in line + ): + self.player_view.add_event( + "action", line.replace("[God]", "").strip() + ) + + # Check for death announcements + elif "was found dead" in line or "died" in line: + self.player_view.add_event( + "death", line.replace("[God]", "").strip() + ) + + # Check for phase announcements + elif "Night phase begins" in line: + if not ( + self.player_view.events + and "Night phase begins" in self.player_view.events[-1] + ): + self.player_view.add_event( + "phase", "🌙 Night phase begins. Stay quiet..." + ) + + elif "Day discussion starts" in line or ( + "Phase: 'day_discussion' begins" in line + ): + if not ( + self.player_view.events + and "Day breaks" in self.player_view.events[-1] + ): + self.player_view.add_event( + "phase", "☀️ Day breaks. Time to discuss!" + ) + + elif "Voting phase" in line or ("Phase: 'voting' begins" in line): + if not ( + self.player_view.events + and "Voting phase" in self.player_view.events[-1] + ): + self.player_view.add_event( + "phase", "🗳️ Voting phase. Time to make your choice." + ) + + # Check for speech from players (avoid God messages and duplicates) + elif (" said:" in line or " says:" in line) and "[God]" not in line: + parts = line.split(" said:" if " said:" in line else " says:") + if len(parts) == 2: + speaker = parts[0].strip() + message = parts[1].strip().strip('"') + # Check if not duplicate + if not ( + self.player_view.events + and speaker in self.player_view.events[-1] + and message in self.player_view.events[-1] + ): + self.player_view.add_event("speak", message, speaker) + + # Get available actions from observation + available_actions = ( + obs.available_actions if hasattr(obs, "available_actions") else ["none"] + ) + + if available_actions != ["none"] and self.player_view: + # Enable HTML input and wait for player response + self.player_view.enable_input(available_actions) + print( + f"\n🎮 Waiting for {self.agent_name}'s input in the HTML interface..." + ) + + # Wait for input from HTML + input_data = self.player_view.wait_for_input() + action_type = input_data.get("action_type", "none") + argument = input_data.get("argument", "") + + # Enhanced voting support + if action_type == "action" and argument.lower().startswith("vote"): + name_part = argument[4:].strip() + if name_part and self.available_agent_names: + matched_name = self._find_matching_name(name_part) + if matched_name: + argument = f"vote {matched_name}" + print(f"✓ Voting for: {matched_name}") + + result = AgentAction(action_type=action_type, argument=argument) + else: + result = AgentAction(action_type="none", argument="") + + # Log player's own action to HTML + if self.player_view and result.action_type in ["speak", "action"]: + if result.action_type == "speak": + self.player_view.add_event( + "speak", result.argument, f"{self.agent_name} (You)" + ) + elif result.action_type == "action": + self.player_view.add_event( + "action", f"You performed action: {result.argument}" + ) + + return result + + +def load_json(path: Path) -> Dict[str, Any]: + return cast(Dict[str, Any], json.loads(path.read_text())) + + +def ensure_agent(player: Dict[str, Any]) -> AgentProfile: + try: + profile = AgentProfile.find( + AgentProfile.first_name == player["first_name"], + AgentProfile.last_name == player["last_name"], + ).all()[0] + return profile # type: ignore[return-value] + except IndexError: + profile = AgentProfile( + first_name=player["first_name"], + last_name=player["last_name"], + age=player.get("age", 30), + occupation="", + gender="", + gender_pronoun=player.get("pronouns", "they/them"), + public_info="", + personality_and_values="", + decision_making_style="", + secret=player.get("secret", ""), + ) + profile.save() + return profile + + +def build_agent_goal(player: Dict[str, Any], role_prompt: str) -> str: + return ( + f"You are {player['first_name']} {player['last_name']}, publicly known only as a villager.\n" + f"Primary directives: {player['goal']}\n" + f"Role guidance: {role_prompt}\n" + f"System constraints: {COMMON_GUIDANCE}" + ) + + +def prepare_scenario() -> tuple[EnvironmentProfile, List[AgentProfile], Dict[str, str]]: + role_actions = load_json(ROLE_ACTIONS_PATH) + roster = load_json(ROSTER_PATH) + + agents: List[AgentProfile] = [] + agent_goals: List[str] = [] + role_assignments: Dict[str, str] = {} + + for player in roster["players"]: + profile = ensure_agent(player) + agents.append(profile) + full_name = f"{player['first_name']} {player['last_name']}" + role = player["role"] + role_prompt = role_actions["roles"][role]["goal_prompt"] + agent_goals.append(build_agent_goal(player, role_prompt)) + role_assignments[full_name] = role + + scenario_text = ( + roster["scenario"] + + " Werewolves must be eliminated before they achieve parity with villagers." + ) + + env_profile = EnvironmentProfile( + scenario=scenario_text, + agent_goals=agent_goals, + relationship=RelationshipType.acquaintance, + game_metadata={ + "mode": "social_game", + "rulebook_path": str(RULEBOOK_PATH), + "actions_path": str(ROLE_ACTIONS_PATH), + "role_assignments": role_assignments, + }, + tag="werewolves", + ) + env_profile.save() + return env_profile, agents, role_assignments + + +def build_environment( + env_profile: EnvironmentProfile, + role_assignments: Dict[str, str], + model_name: str, +) -> SocialGameEnv: + return SocialGameEnv( + env_profile=env_profile, + rulebook_path=str(RULEBOOK_PATH), + actions_path=str(ROLE_ACTIONS_PATH), + role_assignments=role_assignments, + model_name=model_name, + action_order="round-robin", + evaluators=[RuleBasedTerminatedEvaluator(max_turn_number=40, max_stale_turn=2)], + terminal_evaluators=[ + EpisodeLLMEvaluator( + model_name, + EvaluationForAgents[SotopiaDimensions], + ) + ], + ) + + +def create_agents( + agent_profiles: List[AgentProfile], + env_profile: EnvironmentProfile, + model_names: List[str], +) -> List[Union[LLMAgent, HumanAgent]]: + agents: List[Union[LLMAgent, HumanAgent]] = [] + for profile, model_name, goal in zip( + agent_profiles, + model_names, + env_profile.agent_goals, + strict=True, + ): + agent = LLMAgent(agent_profile=profile, model_name=model_name) + agent.goal = goal + agents.append(agent) + return agents + + +def summarize_phase_log(phase_log: List[Dict[str, Any]]) -> None: + if not phase_log: + print("\nNo structured events recorded.") + return + + print("\nTimeline by Phase") + print("=" * 60) + + last_label: str | None = None + for entry in phase_log: + phase_name = entry["phase"] + meta = entry.get("meta", {}) + group = meta.get("group") + cycle = meta.get("group_cycle") + stage = meta.get("group_stage") + title = phase_name.replace("_", " ").title() + if group: + group_label = group.replace("_", " ").title() + if cycle and stage: + label = f"{group_label} {cycle}.{stage} – {title}" + elif cycle: + label = f"{group_label} {cycle} – {title}" + else: + label = f"{group_label}: {title}" + else: + label = title + + if label != last_label: + print(f"\n[{label}]") + last_label = label + instructions = entry.get("instructions", []) + for info_line in instructions: + print(f" Info: {info_line}") + role_instr = entry.get("role_instructions", {}) + for role, lines in role_instr.items(): + for line in lines: + print(f" Role {role}: {line}") + + for msg in entry.get("public", []): + print(f" Public: {msg}") + for team, messages in entry.get("team", {}).items(): + for msg in messages: + print(f" Team ({team}) private: {msg}") + for agent, messages in entry.get("private", {}).items(): + for msg in messages: + print(f" Private to {agent}: {msg}") + for actor, action in entry.get("actions", {}).items(): + print( + f" Action logged: {actor} -> {action['action_type']} {action['argument']}" + ) + + +def print_roster(role_assignments: Dict[str, str]) -> None: + print("Participants & roles:") + for name, role in role_assignments.items(): + print(f" - {name}: {role}") + + +def start_http_server(port: int = 8000) -> None: + """Start a simple HTTP server to handle player input.""" + from http.server import HTTPServer, SimpleHTTPRequestHandler + import threading + + class PlayerInputHandler(SimpleHTTPRequestHandler): + def do_POST(self): + if self.path == "/save_action": + content_length = int(self.headers["Content-Length"]) + post_data = self.rfile.read(content_length) + try: + data = json.loads(post_data.decode("utf-8")) + # Write to player_input.json in the same directory + input_file = BASE_DIR / "player_input.json" + input_file.write_text(json.dumps(data)) + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(b'{"status": "success"}') + except Exception as e: + self.send_response(500) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write( + json.dumps({"status": "error", "message": str(e)}).encode() + ) + else: + self.send_response(404) + self.end_headers() + + def do_OPTIONS(self): + self.send_response(200) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def log_message(self, _format, *_args): + # Suppress log messages + pass + + os.chdir(BASE_DIR) + server = HTTPServer(("localhost", port), PlayerInputHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + print(f"✓ HTTP server started on http://localhost:{port}") + + +async def main() -> None: + # Start HTTP server for handling player input from HTML + start_http_server(8000) + + env_profile, agent_profiles, role_assignments = prepare_scenario() + env_model = "gpt-4o-mini" + agent_model_list = [ + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + "gpt-4o-mini", + ] + + env = build_environment(env_profile, role_assignments, env_model) + agents = create_agents(agent_profiles, env_profile, agent_model_list) + + # Get all agent names for voting support + all_agent_names = [f"{p.first_name} {p.last_name}" for p in agent_profiles] + + # Get player info + player_name = f"{agent_profiles[0].first_name} {agent_profiles[0].last_name}" + player_role = list(role_assignments.values())[0] + + # Create PlayerView HTML for clean player-visible information + player_view = PlayerView( + PLAYER_VIEW_HTML, player_name, player_role, all_agent_names + ) + + # Replace first agent with human player that writes to PlayerView + human_agent = PlayerViewHumanAgent( + agent_profile=agent_profiles[0], + available_agent_names=all_agent_names, + player_view=player_view, + ) + human_agent.goal = env_profile.agent_goals[0] + agents[0] = human_agent + + print("\n🌕 Duskmire Werewolves — Interactive Social Game") + print("=" * 60) + print(f"You are playing as: {player_name}") + print(f"Your role: {player_role}") + print("=" * 60) + print( + "\n📖 PLAYER VIEW: Opens in your browser at http://localhost:8000/player_view.html" + ) + print(" This shows only what your character can see + interactive input.") + print("\n🔮 TERMINAL: Shows the full omniscient game state") + print(" (all agent actions and decisions for debugging)") + print("=" * 60) + + # Auto-open the HTML file in browser via HTTP server + try: + webbrowser.open("http://localhost:8000/player_view.html") + print("✓ Player view opened in your browser") + except Exception as e: + print(f"⚠ Could not auto-open browser: {e}") + print(" Please manually open: http://localhost:8000/player_view.html") + + print("=" * 60) + print("Other participants:") + for name in role_assignments.keys(): + if name != player_name: + print(f" - {name}") + print("=" * 60) + + await arun_one_episode( + env=env, + agent_list=agents, + omniscient=False, + script_like=False, + json_in_script=False, + tag=None, + push_to_db=False, + ) + + summarize_phase_log(env.phase_log) + + if env._winner_payload: # noqa: SLF001 (internal inspection for demo) + print("\n" + "=" * 60) + print("GAME RESULT") + print("=" * 60) + print(f"Winner: {env._winner_payload['winner']}") + print(f"Reason: {env._winner_payload['message']}") + player_view.add_event( + "phase", + f"🎮 Game Over! Winner: {env._winner_payload['winner']}. Reason: {env._winner_payload['message']}", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/experimental/werewolves/player_input.json b/examples/experimental/werewolves/player_input.json new file mode 100644 index 000000000..42d333040 --- /dev/null +++ b/examples/experimental/werewolves/player_input.json @@ -0,0 +1 @@ +{"action_type": "none", "argument": "", "timestamp": "2025-10-10T17:10:45.842Z"} diff --git a/sotopia/agents/llm_agent.py b/sotopia/agents/llm_agent.py index 497954d7b..41315307d 100644 --- a/sotopia/agents/llm_agent.py +++ b/sotopia/agents/llm_agent.py @@ -140,6 +140,7 @@ def __init__( agent_name: str | None = None, uuid_str: str | None = None, agent_profile: AgentProfile | None = None, + available_agent_names: list[str] | None = None, ) -> None: super().__init__( agent_name=agent_name, @@ -147,6 +148,7 @@ def __init__( agent_profile=agent_profile, ) self.model_name = "human" + self.available_agent_names = available_agent_names or [] @property def goal(self) -> str: @@ -171,14 +173,53 @@ def act(self, obs: Observation) -> AgentAction: return AgentAction(action_type=action_type, argument=argument) + def _find_matching_name(self, user_input: str) -> str | None: + """Find a matching agent name from partial input (case-insensitive).""" + user_input_lower = user_input.lower().strip() + + # Try exact match first + for name in self.available_agent_names: + if name.lower() == user_input_lower: + return name + + # Try partial match on first name or last name + matches = [] + for name in self.available_agent_names: + name_parts = name.lower().split() + if any(part.startswith(user_input_lower) for part in name_parts): + matches.append(name) + + if len(matches) == 1: + return matches[0] + elif len(matches) > 1: + print("Ambiguous name. Did you mean one of these?") + for i, match in enumerate(matches): + print(f" {i}: {match}") + return None + + return None + async def aact(self, obs: Observation) -> AgentAction: self.recv_message("Environment", obs) - print("Available actions:") - for i, action in enumerate(obs.available_actions): - print(f"{i}: {action}") + # Only print if last_turn changed (avoid duplicate prompts) + should_prompt = True + if len(self.inbox) >= 2: + last_obs = self.inbox[-2][1] + if ( + isinstance(last_obs, Observation) + and last_obs.last_turn == obs.last_turn + ): + should_prompt = False if obs.available_actions != ["none"]: + if should_prompt: + print("\n" + "=" * 60) + print("YOUR TURN") + print("=" * 60) + print("Available actions:") + for i, action in enumerate(obs.available_actions): + print(f"{i}: {action}") action_type_number = await ainput( "Action type (Please only input the number): " ) @@ -194,8 +235,38 @@ async def aact(self, obs: Observation) -> AgentAction: action_type = obs.available_actions[action_type_number] else: action_type = "none" - if action_type in ["speak", "non-verbal communication"]: + + if action_type in ["speak", "non-verbal communication", "action"]: argument = await ainput("Argument: ") + + # Enhanced voting support + if action_type == "action" and argument.lower().startswith("vote"): + # Extract the name part after "vote" + name_part = argument[4:].strip() + if name_part and self.available_agent_names: + matched_name = self._find_matching_name(name_part) + if matched_name: + argument = f"vote {matched_name}" + print(f"✓ Voting for: {matched_name}") + else: + print(f"⚠ Could not find player matching '{name_part}'") + print("Available players:") + for i, name in enumerate(self.available_agent_names): + print(f" {i}: {name}") + retry = await ainput( + "Enter player number or name to vote for: " + ) + try: + idx = int(retry) + if 0 <= idx < len(self.available_agent_names): + matched_name = self.available_agent_names[idx] + argument = f"vote {matched_name}" + print(f"✓ Voting for: {matched_name}") + except ValueError: + matched_name = self._find_matching_name(retry) + if matched_name: + argument = f"vote {matched_name}" + print(f"✓ Voting for: {matched_name}") else: argument = "" From 3f062faaceb710ce44a9bfc92be351e7114773f1 Mon Sep 17 00:00:00 2001 From: "Keyu(Frank) He" Date: Thu, 23 Oct 2025 01:15:11 -0400 Subject: [PATCH 04/23] bug fixes --- .../experimental/werewolves/game_rules.json | 7 +- .../experimental/werewolves/main_human.py | 477 ++++++++++++++--- .../experimental/werewolves/player_input.json | 2 +- .../experimental/werewolves/player_view.html | 495 ++++++++++++++++++ 4 files changed, 892 insertions(+), 89 deletions(-) create mode 100644 examples/experimental/werewolves/player_view.html diff --git a/examples/experimental/werewolves/game_rules.json b/examples/experimental/werewolves/game_rules.json index 02a602519..77285fc97 100644 --- a/examples/experimental/werewolves/game_rules.json +++ b/examples/experimental/werewolves/game_rules.json @@ -120,8 +120,8 @@ "Witch", "Werewolf" ], - "max_cycles": 2, - "max_turns": 12, + "max_cycles": 1, + "max_turns": null, "speech_visibility": "public", "action_visibility": "public", "resolution": { @@ -135,7 +135,7 @@ ], "group": "day", "instructions": [ - "Each villager speaks in turn. Share concise reasoning tied to observations." + "Each villager speaks once in turn. Share concise reasoning tied to observations." ], "role_instructions": {} }, @@ -158,6 +158,7 @@ "entry_messages": [ "Voting phase: use 'vote NAME' or 'vote none'." ], + "max_turns": 1, "exit_messages": [ "Votes are tallied." ], diff --git a/examples/experimental/werewolves/main_human.py b/examples/experimental/werewolves/main_human.py index 983c67624..37faf87f3 100644 --- a/examples/experimental/werewolves/main_human.py +++ b/examples/experimental/werewolves/main_human.py @@ -61,6 +61,7 @@ def __init__( self.input_file = output_path.parent / "player_input.json" self.waiting_for_input = False self.available_actions: List[str] = [] + self._seen_event_keys: set[str] = set() self._initialize_html() def _initialize_html(self) -> None: @@ -144,9 +145,6 @@ def _initialize_html(self) -> None: background: #27ae60; animation: pulse 2s infinite; }} - .input-box.disabled {{ - opacity: 0.5; - }} .action-buttons {{ display: flex; gap: 10px; @@ -283,15 +281,17 @@ def _initialize_html(self) -> None: """ @@ -343,6 +372,14 @@ def _initialize_html(self) -> None: def add_event(self, event_type: str, content: str, speaker: str = "") -> None: """Add a new event to the player view.""" + # De-duplicate identical events across live updates and post-game backfill + content_norm = content.strip() + speaker_norm = speaker.strip() + event_key = f"{event_type}|{speaker_norm}|{content_norm}" + if event_key in self._seen_event_keys: + return + self._seen_event_keys.add(event_key) + timestamp = datetime.now().strftime("%H:%M:%S") speaker_html = f'
    {speaker}
    ' if speaker else "" @@ -377,7 +414,7 @@ def wait_for_input(self) -> dict[str, str]: data = json.loads(self.input_file.read_text()) self.waiting_for_input = False self._update_html() - return data + return cast(Dict[str, str], data) except (json.JSONDecodeError, KeyError): # If file is corrupt, wait and try again time.sleep(0.5) @@ -402,14 +439,14 @@ def _update_html(self) -> None: for action in self.available_actions: action_label = action.replace("_", " ").title() action_buttons.append( - f'' + f'' ) action_buttons_html = "\n".join(action_buttons) html = f""" - + Duskmire Werewolves - {self.player_name} -