From 00df3e73f3a568121c58a75d38dec313868b1621 Mon Sep 17 00:00:00 2001 From: fuzi233 Date: Thu, 24 Jul 2025 16:47:22 +0800 Subject: [PATCH] fix asynchronous bugs --- mas_arena/agents/EvoMAC.py | 505 ++++++++++++++-------------------- mas_arena/benchmark_runner.py | 24 +- 2 files changed, 217 insertions(+), 312 deletions(-) diff --git a/mas_arena/agents/EvoMAC.py b/mas_arena/agents/EvoMAC.py index 2f8034f..762f463 100644 --- a/mas_arena/agents/EvoMAC.py +++ b/mas_arena/agents/EvoMAC.py @@ -15,10 +15,7 @@ import os -import asyncio import re -import tempfile -import sys from collections import defaultdict, deque from typing import Dict, Any, List, Tuple, Optional @@ -26,9 +23,10 @@ from langchain_openai import ChatOpenAI from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from mas_arena.agents.base import AgentSystem, AgentSystemRegistry +from asyncio import Lock # Load environment variables for configuration -load_dotenv() +load_dotenv(override=True) # SYSTEM PROMPTS - Define roles and behaviors for each agent type AGENT_SYSTEM_PROMPTS = { @@ -114,7 +112,8 @@ 1) The code should be fully functional. No placeholders (such as 'pass' in Python). 2) You should write the test file with 'unittest' python library. Import the functions you need to test if necessary. 3) The test case should be as simple as possible, and the test case number should be less than 5. -4) According to example test case in the Task description, please only write these test cases to locate the bugs. You should not add any other testcases by yourself except for the example test case given in the Task description""", +4) According to example test case in the Task description, please only write these test cases to locate the bugs. You should not add any other testcases by yourself except for the example test case given in the Task description +5) Be careful with triple-quoted strings (docstrings or multiline strings) to ensure they are always correctly closed.""", "updater": """You are a Senior Developer at EvoMAC. Your task is to organize a programmer team to solve current issues in the code. @@ -178,13 +177,12 @@ ```python def function_name(parameters): - \"\"\" - Function description - \"\"\" # Your implementation here return result ``` +Use docstrings where appropriate to explain complex logic or important sections of code. Be careful with triple-quoted strings (docstrings or multiline strings) to ensure they are always correctly closed.(if not necessary, do not use triple-quoted) + {format_prompt}""", "test_organization": """According to the function completion requirements listed below: @@ -271,13 +269,13 @@ def _extract_code_with_fallbacks(self, text: str) -> str: ) if validated_match: code = validated_match.group(1).strip() - return self._clean_extracted_code(code) + return code # Strategy 2: Extract from standard Python code blocks block_match = re.search(r"```python\s*([\s\S]*?)```", text, re.IGNORECASE) if block_match: code = block_match.group(1).strip() - return self._clean_extracted_code(code) + return code # Strategy 3: Find function definition patterns function_match = re.search( @@ -286,179 +284,19 @@ def _extract_code_with_fallbacks(self, text: str) -> str: ) if function_match: code = function_match.group(1).strip() - return self._clean_extracted_code(code) + return code # Strategy 4: Parse filename + code patterns (legacy support) filename_pattern = r'([a-z_]+\.py)\s*\n\s*```python\s*(.*?)```' filename_matches = re.findall(filename_pattern, text, re.DOTALL) if filename_matches: code = filename_matches[0][1].strip() - return self._clean_extracted_code(code) + return code # Strategy 5: Last resort - find Python-like content return self._extract_python_like_content(text) - def _clean_extracted_code(self, code: str) -> str: - """ - Clean extracted code to fix common syntax issues. - - Args: - code: Raw extracted code - - Returns: - Cleaned code string - """ - if not code: - return "" - - # 确保代码以函数定义开始,如果不是则尝试找到函数定义 - if not code.strip().startswith('def '): - # 尝试找到第一个函数定义 - lines = code.split('\n') - start_idx = -1 - for i, line in enumerate(lines): - if line.strip().startswith('def '): - start_idx = i - break - - if start_idx >= 0: - code = '\n'.join(lines[start_idx:]) - else: - # 如果没有找到函数定义,返回原代码 - return code.strip() - - # 修复三引号docstring问题 - lines = code.split('\n') - cleaned_lines = [] - in_triple_quote = False - quote_type = None - quote_start_line = -1 - - for i, line in enumerate(lines): - # 检测三引号的开始和结束 - if '"""' in line: - if not in_triple_quote: - # 开始三引号字符串 - in_triple_quote = True - quote_type = '"""' - quote_start_line = i - cleaned_lines.append(line) - else: - # 结束三引号字符串 - if quote_type == '"""': - in_triple_quote = False - quote_type = None - cleaned_lines.append(line) - else: - cleaned_lines.append(line) - elif "'''" in line: - if not in_triple_quote: - # 开始三引号字符串 - in_triple_quote = True - quote_type = "'''" - quote_start_line = i - cleaned_lines.append(line) - else: - # 结束三引号字符串 - if quote_type == "'''": - in_triple_quote = False - quote_type = None - cleaned_lines.append(line) - else: - cleaned_lines.append(line) - else: - cleaned_lines.append(line) - - # 如果docstring未正确关闭,添加关闭标记 - if in_triple_quote and quote_type: - # 找到正确的缩进级别 - indent = "" - if quote_start_line >= 0 and quote_start_line < len(cleaned_lines): - # 获取docstring开始行的缩进 - start_line = cleaned_lines[quote_start_line] - if '"""' in start_line or "'''" in start_line: - # 如果docstring在同一行开始,使用相同缩进 - indent = re.match(r'^(\s*)', start_line).group(1) - else: - # 否则找到函数体的缩进 - for line in cleaned_lines: - if line.strip().startswith('def '): - # 找到下一个非空行的缩进 - func_idx = cleaned_lines.index(line) - for j in range(func_idx + 1, len(cleaned_lines)): - next_line = cleaned_lines[j] - if next_line.strip(): - indent = re.match(r'^(\s*)', next_line).group(1) - break - break - - # 添加关闭的三引号 - cleaned_lines.append(f'{indent}{quote_type}') - print(f"[DEBUG] Added missing closing quote: {quote_type}") - - cleaned_code = '\n'.join(cleaned_lines) - - # 检查是否需要添加必要的导入语句 - cleaned_code = self._add_missing_imports(cleaned_code) - - # 移除多余的空行 - cleaned_code = re.sub(r'\n{3,}', '\n\n', cleaned_code) - - return cleaned_code.strip() - - def _add_missing_imports(self, code: str) -> str: - """ - Add missing import statements that are commonly needed. - - Args: - code: Code to check for missing imports - - Returns: - Code with necessary imports added - """ - if not code: - return code - - lines = code.split('\n') - imports_to_add = [] - - # 检查是否使用了typing相关的类型提示 - if any(re.search(r'\b(List|Dict|Tuple|Optional|Union|Any)\[', line) or re.search(r'\bAny\b', line) for line in lines): - if not any('from typing import' in line or 'import typing' in line for line in lines): - # 确定需要导入的类型 - needed_types = set() - for line in lines: - for type_hint in ['List', 'Dict', 'Tuple', 'Optional', 'Union']: - if re.search(rf'\b{type_hint}\[', line): - needed_types.add(type_hint) - # 单独检查Any,因为它可能不带方括号 - if re.search(r'\bAny\b', line): - needed_types.add('Any') - - if needed_types: - imports_to_add.append(f"from typing import {', '.join(sorted(needed_types))}") - - # 如果需要添加导入,将它们插入到函数定义之前 - if imports_to_add: - # 找到第一个函数定义的位置 - func_start_idx = -1 - for i, line in enumerate(lines): - if line.strip().startswith('def '): - func_start_idx = i - break - - if func_start_idx >= 0: - # 在函数定义前插入导入语句 - new_lines = (lines[:func_start_idx] + - imports_to_add + - [''] + # 空行分隔 - lines[func_start_idx:]) - return '\n'.join(new_lines) - else: - # 如果没有找到函数定义,在开头添加导入 - return '\n'.join(imports_to_add + [''] + lines) - - return code + def _extract_python_like_content(self, text: str) -> str: """ @@ -486,7 +324,7 @@ def _extract_python_like_content(self, text: str) -> str: break code_lines.append(line) - return '\n'.join(code_lines).strip() if code_lines else "" + return '\n'.join(code_lines) if code_lines else "" def get_formatted_codes(self) -> str: """ @@ -567,16 +405,16 @@ def _parse_composition(self, response: str) -> None: Args: response: LLM response text """ - # 首先尝试标准的COMPOSITION格式 + # First, try the standard COMPOSITION format composition_match = re.search( r'COMPOSITION[:\s]*\n?\s*(.*?)(?=WORKFLOW:|$)', response, re.DOTALL | re.IGNORECASE ) - # 如果标准格式失败,尝试其他格式 + # If the standard format fails, try other formats if not composition_match: - # 尝试查找标签内的COMPOSITION + # Try to find COMPOSITION inside tags answer_match = re.search(r'(.*?)', response, re.DOTALL | re.IGNORECASE) if answer_match: answer_content = answer_match.group(1) @@ -586,12 +424,13 @@ def _parse_composition(self, response: str) -> None: re.DOTALL | re.IGNORECASE ) - # 如果还是没有找到,检查是否是纯文本响应需要创建单个任务 + # If still not found, check if it's a plain text response and create a single task if not composition_match: - # 检查响应是否看起来像一个实现或解释 + # Check if the response looks like an implementation or explanation if ('implement' in response.lower() or 'function' in response.lower() or 'test' in response.lower() or 'case' in response.lower()): - # 创建单个任务 + + # Create a single task self.composition = {"Task_1": "Complete the implementation based on requirements"} return @@ -599,7 +438,7 @@ def _parse_composition(self, response: str) -> None: comp_text = composition_match.group(1).strip() self.composition = {} - # 移除可能的markdown代码块标记 + # Remove possible markdown code block markers comp_text = re.sub(r'^```.*?\n', '', comp_text, flags=re.MULTILINE) comp_text = re.sub(r'\n```$', '', comp_text) @@ -608,28 +447,30 @@ def _parse_composition(self, response: str) -> None: if not line: continue - # 支持多种格式: - # 1. "- Task_1: description" (原格式) - # 2. "Task 1: description" (CTO prompt格式) - # 3. "Programmer 1: description" (updater格式) + + # Support multiple formats: + # 1. "- Task_1: description" (original format) + # 2. "Task 1: description" (CTO prompt format) + # 3. "Programmer 1: description" (updater format) task_match = None if line.startswith('- '): - # 原格式: "- Task_1: description" + + # Original format: "- Task_1: description" task_match = re.match(r'- (Task_?\d+): (.+)', line) elif ':' in line and not line.endswith(':'): - # 新格式: "Task 1: description" 或 "Programmer 1: description" - # 但确保不是单独的"COMPOSITION:"这样的标题行 + # New format: "Task 1: description" or "Programmer 1: description" + # But make sure it's not a standalone "COMPOSITION:" title line task_match = re.match(r'((?:Task|Programmer)\s*\d+):\s*(.+)', line) if task_match: - task_name = task_match.group(1).replace(' ', '_') # 统一格式:Task_1 + task_name = task_match.group(1).replace(' ', '_') # Unified format: Task_1 task_desc = task_match.group(2).strip() - # 过滤掉无效的任务描述 + # Filter out invalid task descriptions if task_desc and task_desc != '[]' and len(task_desc) > 2: self.composition[task_name] = task_desc - # 如果没有解析到有效的composition,输出调试信息 + # If no valid composition is parsed, output debug info if not self.composition: print(f"[DEBUG] Failed to parse composition. Response preview: {response[:300]}...") @@ -640,20 +481,20 @@ def _parse_workflow(self, response: str) -> None: Args: response: LLM response text """ - # 首先尝试标准的WORKFLOW格式 + # First, try the standard WORKFLOW format workflow_match = re.search(r'WORKFLOW[:\s]*\n?\s*(.*)', response, re.DOTALL | re.IGNORECASE) - # 如果标准格式失败,尝试其他格式 + # If the standard format fails, try other formats if not workflow_match: - # 尝试查找标签内的WORKFLOW + # Try to find WORKFLOW inside tags answer_match = re.search(r'(.*?)', response, re.DOTALL | re.IGNORECASE) if answer_match: answer_content = answer_match.group(1) workflow_match = re.search(r'WORKFLOW[:\s]*\n?\s*(.*)', answer_content, re.DOTALL | re.IGNORECASE) - # 如果还是没有找到,为composition中的任务创建简单的workflow + # If still not found, create a simple workflow for the tasks in composition if not workflow_match and self.composition: - # 为现有的composition创建简单的线性workflow + # Create a simple linear workflow for the existing composition self.workflow = {} task_names = list(self.composition.keys()) for i, task_name in enumerate(task_names): @@ -667,7 +508,7 @@ def _parse_workflow(self, response: str) -> None: workflow_text = workflow_match.group(1).strip() self.workflow = {} - # 移除可能的markdown代码块标记 + # Remove possible markdown code block markers workflow_text = re.sub(r'^```.*?\n', '', workflow_text, flags=re.MULTILINE) workflow_text = re.sub(r'\n```$', '', workflow_text) @@ -678,12 +519,12 @@ def _parse_workflow(self, response: str) -> None: if ':' in line: task, deps_str = line.split(':', 1) - task = task.strip().replace(' ', '_') # 统一格式:Task_1 + task = task.strip().replace(' ', '_') # Unified format: Task_1 deps_str = deps_str.strip() # Parse dependencies from [dep1, dep2] format dependencies = self._parse_dependencies(deps_str) - # 同样统一依赖项的格式 + # Also unify the format of dependencies dependencies = [dep.replace(' ', '_') for dep in dependencies] self.workflow[task] = dependencies @@ -701,7 +542,7 @@ def _parse_dependencies(self, deps_str: str) -> List[str]: deps_content = deps_str[1:-1].strip() if deps_content: deps = [dep.strip() for dep in deps_content.split(',')] - # 统一格式化依赖项名称 (Task 1 -> Task_1) + # Unify dependency name format (Task 1 -> Task_1) normalized_deps = [] for dep in deps: if re.match(r'Task\s+\d+', dep): @@ -714,6 +555,7 @@ def _parse_dependencies(self, deps_str: str) -> List[str]: def _set_fallback_organization(self) -> None: """Set fallback organization structure when parsing fails.""" + # Set fallback organization structure self.composition = {"Task_1": "Complete the implementation"} self.workflow = {"Task_1": []} @@ -816,85 +658,127 @@ class TestExecutionEngine: Provides safe execution environment for running generated test cases against implementation code, with proper timeout and error handling. + Based on HumanEval evaluation approach. """ + class TimeoutError(Exception): + """Raised when execution exceeds the allowed time limit.""" + @staticmethod - async def execute_test_code(test_code: str, timeout: int = 30) -> Tuple[bool, str]: + def run_with_timeout(func, args, timeout: int = 60): """ - Execute test code and return results. + Execute func(*args) in a separate thread and abort if it does not finish within timeout seconds. Args: - test_code: Combined source and test code to execute + func: Function to execute + args: Arguments for the function timeout: Maximum execution time in seconds Returns: - Tuple of (has_bugs: bool, test_report: str) + Function result or raises exception """ - temp_file = None - try: - # Create temporary file for testing - with tempfile.NamedTemporaryFile( - mode='w', - suffix='.py', - delete=False - ) as f: - f.write(test_code) - temp_file = f.name - - # Determine appropriate Python executable - python_cmd = TestExecutionEngine._get_python_command() - - # Execute tests with timeout - process = await asyncio.create_subprocess_exec( - *python_cmd, '-m', 'unittest', - os.path.splitext(os.path.basename(temp_file))[0], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=os.path.dirname(temp_file) - ) - + from threading import Thread + result: list = [] + exception: list = [] + + def target(): try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout=timeout - ) - - stdout_text = stdout.decode() if stdout else "" - stderr_text = stderr.decode() if stderr else "" - - # Determine if tests passed - has_bugs = process.returncode != 0 - test_report = stdout_text + stderr_text - - return has_bugs, test_report - - except asyncio.TimeoutError: - process.kill() - return True, f"Test execution timeout after {timeout} seconds" - - except Exception as e: - return True, f"Test execution failed: {str(e)}" - - finally: - # Clean up temporary file - if temp_file: - try: - os.unlink(temp_file) - except OSError: - pass # Ignore cleanup errors + result.append(func(*args)) + except BaseException as e: + exception.append(e) + + thread = Thread(target=target, daemon=True) + thread.start() + thread.join(timeout) + + if thread.is_alive(): + raise TestExecutionEngine.TimeoutError("Execution timed out") + + if exception: + raise exception[0] + + return result[0] if result else None @staticmethod - def _get_python_command() -> List[str]: + def execute_test_code(test_code: str, timeout: int = 60) -> Tuple[bool, str]: """ - Get appropriate Python command for current platform. + Execute test code and return detailed results. + Args: + test_code: Combined source and test code to execute + timeout: Maximum execution time in seconds + Returns: - List of command components for subprocess execution + Tuple of (has_bugs: bool, detailed_test_report: str) """ - if sys.platform.startswith('win'): - return [sys.executable] - else: - return ['python3'] + try: + # Create an isolated namespace for execution + env = {} + + # Execute the test code in the isolated environment + TestExecutionEngine.run_with_timeout( + lambda code: exec(code, env), + (test_code,), + timeout=timeout + ) + + # If execution completed without exception, tests passed + return False, "All tests passed successfully" + + except TestExecutionEngine.TimeoutError as te: + msg = f"Test execution timeout after {timeout} seconds" + return True, msg + + except AssertionError as ae: + # Test assertion failed - provide detailed information + msg = f"Test assertion failed: {str(ae)}" + return True, msg + + except SyntaxError as se: + # Syntax error in code + msg = f"Syntax error in code: {str(se)}" + if hasattr(se, 'lineno') and se.lineno: + msg += f" at line {se.lineno}" + if hasattr(se, 'text') and se.text: + msg += f" in: {se.text.strip()}" + return True, msg + + except NameError as ne: + # Undefined variable or function + msg = f"Name error: {str(ne)}" + return True, msg + + except TypeError as te: + # Type error (wrong arguments, etc.) + msg = f"Type error: {str(te)}" + return True, msg + + except ValueError as ve: + # Value error (wrong values) + msg = f"Value error: {str(ve)}" + return True, msg + + except ImportError as ie: + # Import error + msg = f"Import error: {str(ie)}" + return True, msg + + except Exception as exc: + # Other runtime errors + import traceback + msg = f"Runtime error: {str(exc)}\n" + msg += f"Error type: {type(exc).__name__}\n" + # Add traceback information for debugging + tb_lines = traceback.format_exc().split('\n') + # Filter out internal execution lines and focus on user code + relevant_tb = [] + for line in tb_lines: + if 'exec(code, env)' not in line and 'run_with_timeout' not in line: + relevant_tb.append(line) + if relevant_tb: + msg += "Traceback (most recent call last):\n" + msg += '\n'.join(relevant_tb[-10:]) # Show last 10 lines of traceback + return True, msg # MAIN EVOMAC AGENT SYSTEM - Orchestrates the entire multi-agent workflow @@ -924,8 +808,13 @@ def __init__(self, name: str = "evomac", config: Dict[str, Any] = None): """ super().__init__(name, config) - # System configuration - self.config = config or {} + # System configuration (instance-specific) + self.config = config.copy() if config else {} + # Instance isolation: unique identifier for this EvoMAC instance + self.instance_id = f"evomac_{id(self)}" + # Lock to serialize state initialization per instance + self.task_lock = Lock() + self.max_iterations = self.config.get("iteration", 5) self.programming_language = self.config.get("language", "python") @@ -938,6 +827,8 @@ def __init__(self, name: str = "evomac", config: Dict[str, Any] = None): # Initialize LLM client self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") self.llm_client = self._initialize_llm_client() + # 初始化 conversation_history 用于记录所有 agent 消息 + self.conversation_history: List[AIMessage] = [] def _initialize_llm_client(self) -> ChatOpenAI: """ @@ -946,10 +837,11 @@ def _initialize_llm_client(self) -> ChatOpenAI: Returns: Configured ChatOpenAI client """ - return ChatOpenAI( + client = ChatOpenAI( model=self.model_name, temperature=0.7 ) + return client def _format_messages_for_llm(self, system_prompt: str, user_content: str) -> List: """ @@ -967,7 +859,7 @@ def _format_messages_for_llm(self, system_prompt: str, user_content: str) -> Lis HumanMessage(content=user_content) ] - async def _call_llm_async(self, messages: List) -> str: + async def _call_llm_async(self, messages: List) -> Tuple[str, Optional[Any]]: """ Call LLM asynchronously and return response. @@ -975,19 +867,19 @@ async def _call_llm_async(self, messages: List) -> str: messages: List of LangChain message objects Returns: - LLM response content + Tuple of (LLM response content, complete response object with metadata) """ try: response = await self.llm_client.ainvoke( messages, config={"temperature": 0.7} ) - return response.content + return response.content, response except Exception as e: print(f"LLM call failed: {e}") - return "" + return "", None - async def _generate_initial_implementation(self, problem_statement: str) -> str: + async def _generate_initial_implementation(self, problem_statement: str) -> Tuple[str, Optional[Any]]: """ Generate initial code implementation. @@ -995,7 +887,7 @@ async def _generate_initial_implementation(self, problem_statement: str) -> str: problem_statement: The coding problem to solve Returns: - Initial implementation response + Tuple of (Initial implementation response, response object with metadata) """ prompt = TASK_PROMPT_TEMPLATES["initial_coding"].format(task=problem_statement) messages = self._format_messages_for_llm( @@ -1004,7 +896,7 @@ async def _generate_initial_implementation(self, problem_statement: str) -> str: ) return await self._call_llm_async(messages) - async def _organize_workflow(self, problem_statement: str) -> str: + async def _organize_workflow(self, problem_statement: str) -> Tuple[str, Optional[Any]]: """ Generate workflow organization from CTO agent. @@ -1012,7 +904,7 @@ async def _organize_workflow(self, problem_statement: str) -> str: problem_statement: The coding problem to solve Returns: - Workflow organization response + Tuple of (Workflow organization response, response object with metadata) """ format_prompt = self.format_prompt or "" prompt = TASK_PROMPT_TEMPLATES["task_organization"].format( @@ -1085,10 +977,12 @@ async def _execute_single_task(self, problem_statement: str, task_name: str, tas AGENT_SYSTEM_PROMPTS["programmer"], prompt ) - response = await self._call_llm_async(messages) + response_content, response_obj = await self._call_llm_async(messages) + # 保存 programmer 阶段的消息 + self.conversation_history.append(self._create_message_record(response_content, 'programmer', response_obj)) # Update implementation with new code - self.code_manager.update_from_response(response) + self.code_manager.update_from_response(response_content) async def _execute_testing_workflow(self, problem_statement: str) -> Tuple[bool, str]: """ @@ -1101,8 +995,10 @@ async def _execute_testing_workflow(self, problem_statement: str) -> Tuple[bool, Tuple of (has_bugs: bool, test_reports: str) """ # Generate test organization - test_organization_response = await self._organize_testing(problem_statement) - self.test_workflow_organizer.update_from_response(test_organization_response) + test_organization_response_content, test_organization_response_obj = await self._organize_testing(problem_statement) + # save test_organizer phase message + self.conversation_history.append(self._create_message_record(test_organization_response_content, 'test_organizer', test_organization_response_obj)) + self.test_workflow_organizer.update_from_response(test_organization_response_content) # Execute test tasks test_composition = self.test_workflow_organizer.get_composition() @@ -1128,7 +1024,7 @@ async def _execute_testing_workflow(self, problem_statement: str) -> Tuple[bool, return has_any_bugs, "\n\n".join(all_test_reports) - async def _organize_testing(self, problem_statement: str) -> str: + async def _organize_testing(self, problem_statement: str) -> Tuple[str, Optional[Any]]: """ Generate test organization plan. @@ -1136,7 +1032,7 @@ async def _organize_testing(self, problem_statement: str) -> str: problem_statement: The coding problem to solve Returns: - Test organization response + Tuple of (Test organization response, response object with metadata) """ format_prompt = self.format_prompt or "" prompt = TASK_PROMPT_TEMPLATES["test_organization"].format( @@ -1178,14 +1074,16 @@ async def _execute_test_task(self, problem_statement: str, task_name: str, task_ AGENT_SYSTEM_PROMPTS["test_engineer"], prompt ) - test_response = await self._call_llm_async(messages) + test_response_content, test_response_obj = await self._call_llm_async(messages) + # save test_engineer phase message + self.conversation_history.append(self._create_message_record(test_response_content, 'test_engineer', test_response_obj)) # Extract and execute test code - test_code_match = re.search(r'```python\s*(.*?)```', test_response, re.DOTALL) + test_code_match = re.search(r'```python\s*(.*?)```', test_response_content, re.DOTALL) if test_code_match: test_code = test_code_match.group(1).strip() combined_code = self.code_manager.get_raw_code() + "\n\n" + test_code - return await TestExecutionEngine.execute_test_code(combined_code) + return TestExecutionEngine.execute_test_code(combined_code) return True, "Failed to extract test code" @@ -1204,8 +1102,10 @@ async def _perform_iterative_optimization(self, problem_statement: str, test_rep for iteration in range(self.max_iterations - 1): # Generate update organization - update_response = await self._organize_updates(problem_statement, current_reports) - self.workflow_organizer.update_from_response(update_response) + update_response_content, update_response_obj = await self._organize_updates(problem_statement, current_reports) + # save updater phase message + self.conversation_history.append(self._create_message_record(update_response_content, 'updater', update_response_obj)) + self.workflow_organizer.update_from_response(update_response_content) # Execute update workflow await self._execute_implementation_workflow(problem_statement) @@ -1220,7 +1120,7 @@ async def _perform_iterative_optimization(self, problem_statement: str, test_rep return True, current_reports - async def _organize_updates(self, problem_statement: str, test_reports: str) -> str: + async def _organize_updates(self, problem_statement: str, test_reports: str) -> Tuple[str, Optional[Any]]: """ Generate update organization to fix current issues. @@ -1229,7 +1129,7 @@ async def _organize_updates(self, problem_statement: str, test_reports: str) -> test_reports: Current test failure reports Returns: - Update organization response + Tuple of (Update organization response, response object with metadata) """ format_prompt = self.format_prompt or "" prompt = TASK_PROMPT_TEMPLATES["issue_resolution"].format( @@ -1285,24 +1185,31 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: - messages: List of all agent interactions - final_answer: Final implementation code """ - problem_statement = problem["problem"] - conversation_history = [] + # Ensure state initialization is serialized per instance + async with self.task_lock: + problem_statement = problem["problem"] + # initialize the state of this run + self.code_manager = CodeManager() + self.test_code_manager = CodeManager() + self.workflow_organizer = WorkflowOrganizer() + self.test_workflow_organizer = WorkflowOrganizer() + self.conversation_history = [] try: # Phase 1: Generate initial implementation print("Phase 1: Generating initial implementation...") - initial_response = await self._generate_initial_implementation(problem_statement) - self.code_manager.update_from_response(initial_response) - conversation_history.append( - self._create_message_record(initial_response, 'initial_coder') + initial_response_content, initial_response_obj = await self._generate_initial_implementation(problem_statement) + self.code_manager.update_from_response(initial_response_content) + self.conversation_history.append( + self._create_message_record(initial_response_content, 'initial_coder', initial_response_obj) ) # Phase 2: Organize workflow and task decomposition print("Phase 2: Organizing workflow...") - organization_response = await self._organize_workflow(problem_statement) - self.workflow_organizer.update_from_response(organization_response) - conversation_history.append( - self._create_message_record(organization_response, 'workflow_organizer') + organization_response_content, organization_response_obj = await self._organize_workflow(problem_statement) + self.workflow_organizer.update_from_response(organization_response_content) + self.conversation_history.append( + self._create_message_record(organization_response_content, 'workflow_organizer', organization_response_obj) ) # Phase 3: Execute implementation workflow @@ -1327,7 +1234,7 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: print(f"EvoMAC execution completed. Final implementation has {len(final_implementation)} characters.") return { - "messages": conversation_history, + "messages": self.conversation_history, "final_answer": final_implementation } @@ -1336,12 +1243,12 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: f"EvoMAC execution failed: {str(e)}", 'error_handler' ) - conversation_history.append(error_message) + self.conversation_history.append(error_message) print(f"EvoMAC execution failed: {e}") return { - "messages": conversation_history, + "messages": self.conversation_history, "final_answer": f"Error: {str(e)}" } diff --git a/mas_arena/benchmark_runner.py b/mas_arena/benchmark_runner.py index d54436e..d97f29c 100644 --- a/mas_arena/benchmark_runner.py +++ b/mas_arena/benchmark_runner.py @@ -1,24 +1,21 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -Simple Benchmark Runner Interface +Benchmark Runner -This module provides a simplified interface for running benchmarks on multi-agent systems. +This module provides functionality for running benchmarks on agent systems. """ import os import json import random -import subprocess -import sys import shutil -import tempfile from pathlib import Path from datetime import datetime import asyncio from tqdm.asyncio import tqdm from openai.types.completion_usage import CompletionUsage import traceback -from concurrent.futures import ThreadPoolExecutor from rich import print as rprint from mas_arena.metrics import ( @@ -126,7 +123,7 @@ def _prepare_benchmark(self, benchmark_name, data_path, limit, agent_system, age agent.set_metrics_registry(self.metrics_registry) try: - with open(data_path, "r") as f: + with open(data_path, "r", encoding="utf-8") as f: problems = [json.loads(line) for line in f] except FileNotFoundError: raise FileNotFoundError(f"Data file not found: {data_path}") @@ -386,7 +383,8 @@ def run(self, benchmark_name="math", data_path=None, limit=None, agent_system="s )) async def arun(self, benchmark_name="math", data_path=None, limit=None, agent_system="single_agent", agent_config=None, verbose=True, concurrency=10): - agent, problems, benchmark_config, output_file = self._prepare_benchmark( + # Prepare benchmark; we only need problems and config here + _, problems, benchmark_config, output_file = self._prepare_benchmark( benchmark_name, data_path, limit, agent_system, agent_config, verbose ) @@ -401,12 +399,12 @@ async def arun(self, benchmark_name="math", data_path=None, limit=None, agent_sy async def process_with_semaphore(i, p): async with semaphore: - return await self._process_one_problem(i, p, agent, benchmark_config, verbose) + # Create a fresh agent instance per problem to isolate state + new_agent = create_agent_system(agent_system, self.agent_config) + new_agent.set_metrics_registry(self.metrics_registry) + return await self._process_one_problem(i, p, new_agent, benchmark_config, verbose) - tasks = [ - process_with_semaphore(i, p) - for i, p in enumerate(problems) - ] + tasks = [process_with_semaphore(i, p) for i, p in enumerate(problems)] all_results = await tqdm.gather(*tasks, desc="Processing Problems")