From 282519e946658923bc1de9ffc47ece31779e643e Mon Sep 17 00:00:00 2001 From: fuzi233 Date: Wed, 23 Jul 2025 20:18:31 +0800 Subject: [PATCH 1/3] modified: mas_arena/agents/jarvis.py modified: mas_arena/agents/llm_debate.py --- mas_arena/agents/jarvis.py | 1052 ++++++++++++++++++-------------- mas_arena/agents/llm_debate.py | 299 +++++---- 2 files changed, 798 insertions(+), 553 deletions(-) diff --git a/mas_arena/agents/jarvis.py b/mas_arena/agents/jarvis.py index 7f9b89b..ce3e1af 100644 --- a/mas_arena/agents/jarvis.py +++ b/mas_arena/agents/jarvis.py @@ -1,497 +1,665 @@ -import os -import time -import re +from __future__ import annotations import asyncio -from typing import Dict, Any, List -from dataclasses import dataclass +import copy +from typing import Dict, List +import json +import re +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Union +import os + +from langchain.base_language import BaseLanguageModel +from langchain.chains import LLMChain +from langchain_core.callbacks.manager import Callbacks +from langchain_core.prompts import PromptTemplate +from langchain_core.prompts.chat import ( + AIMessagePromptTemplate, + ChatPromptTemplate, + HumanMessagePromptTemplate, + SystemMessagePromptTemplate, +) +from langchain_core.tools import BaseTool +from pydantic import BaseModel from langchain_openai import ChatOpenAI -from langchain_core.messages import SystemMessage, HumanMessage, AIMessage +from langchain_core.callbacks.base import BaseCallbackHandler +from langchain_core.outputs import LLMResult +from openai.types.completion_usage import CompletionUsage + + from mas_arena.agents.base import AgentSystem, AgentSystemRegistry +DEMONSTRATIONS: list[dict] = [] + + +class MessageCollectorCallback(BaseCallbackHandler): + """Callback to collect AIMessages and their usage metadata.""" + + def __init__(self, name: str = "jarvis_redesign"): + super().__init__() + self.messages: List[Any] = [] + self.name = name + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + """Called at the end of an LLM call.""" + if response.generations: + generation = response.generations[0][0] + message = generation.message + message.name = self.name + + if response.llm_output and "token_usage" in response.llm_output: + token_usage_data = response.llm_output["token_usage"] + # Create a CompletionUsage object to match what base.py expects + message.usage_metadata = CompletionUsage( + completion_tokens=token_usage_data.get("completion_tokens", 0), + prompt_tokens=token_usage_data.get("prompt_tokens", 0), + total_tokens=token_usage_data.get("total_tokens", 0), + ) + + self.messages.append(message) + + class Task: - """Task execution representation.""" + """Task to be executed.""" - def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: str): + def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: BaseTool): self.task = task self.id = id self.dep = dep self.args = args self.tool = tool + self.status = "pending" + self.message = "" + self.result = "" + def __str__(self) -> str: + return f"{self.task}({self.args})" -@dataclass -class Agent: - """Represents an LLM agent""" - agent_id: str - name: str - model_name: str - system_prompt: str - chat_history: List[Dict[str, str]] = None - - def __post_init__(self): - self.chat_history = [] - self.llm = ChatOpenAI( - model=self.model_name - ) + def save_product(self) -> None: + """Save text-based products to result field.""" + # For text-based tasks, we directly store the result + # No file saving needed for text outputs + if hasattr(self, 'product'): + self.result = str(self.product) - async def generate_response(self, context: str) -> Dict[str, Any]: - """Generate agent response asynchronously""" - messages = [ - SystemMessage(content=self.system_prompt), - *[HumanMessage(content=msg["human"]) if msg.get("role") == "human" - else AIMessage(content=msg["ai"]) - for msg in self.chat_history], - HumanMessage(content=context) - ] - + def completed(self) -> bool: + return self.status == "completed" + + def failed(self) -> bool: + return self.status == "failed" + + def pending(self) -> bool: + return self.status == "pending" + + def run(self) -> str: + """Execute the task using the associated tool.""" try: - # Use standard output in async mode - response = await self.llm.ainvoke(messages) - # Save response name for source identification - response.name = self.name - - # Add to chat history - self.chat_history.append({ - "role": "human", - "human": context - }) - self.chat_history.append({ - "role": "ai", - "ai": response.content - }) + new_args = copy.deepcopy(self.args) + # For text-based tasks, execute tool and get result + result = self.tool(**new_args) - # Ensure we return the original response object to preserve usage_metadata - return { - "message": response, # Contains the complete AIMessage object with metadata - "content": response.content - } + # Store result directly for text-based outputs + if isinstance(result, str): + self.result = result + else: + # If tool returns complex object, store as product and convert to string + self.product = result + self.save_product() except Exception as e: - print(f"Response generation failed: {str(e)}") - - # Create simple response - error_content = f"Generation failed: {str(e)}" - - self.chat_history.append({ - "role": "human", - "human": context - }) - self.chat_history.append({ - "role": "ai", - "ai": error_content - }) - - # Create a minimal AIMessage object - error_message = AIMessage(content=error_content) - error_message.name = self.name - - return { - "message": error_message, - "content": error_content - } + self.status = "failed" + self.message = str(e) + return self.message + self.status = "completed" + return self.result -class JARVIS(AgentSystem): - """JARVIS multi-agent system""" +class Step: + """A step in the plan.""" - def __init__(self, name: str = "jarvis", config: Dict[str, Any] = None): - super().__init__(name, config) + def __init__( + self, task: str, id: int, dep: List[int], args: Dict[str, str], tool: BaseTool + ): + self.task = task + self.id = id + self.dep = dep + self.args = args + self.tool = tool + +class Plan: + """A plan to execute.""" + + def __init__(self, steps: List[Step]): + self.steps = steps + + def __str__(self) -> str: + return str([str(step) for step in self.steps]) + + def __repr__(self) -> str: + return str(self) + + +class BasePlanner(BaseModel): + """Base class for a planner.""" + + @abstractmethod + def plan(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> Plan: + """Given input, decide what to do.""" + + @abstractmethod + async def aplan( + self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any + ) -> Plan: + """Asynchronous Given input, decide what to do.""" + +class TaskPlanner(BasePlanner): + """Planner for tasks.""" + + llm_chain: LLMChain + output_parser: PlanningOutputParser + stop: Optional[List] = None + + class Config: + arbitrary_types_allowed = True + + def __init__(self, llm: BaseLanguageModel, verbose: bool = False): + """ + 初始化任务规划器。 - # Get model name - self.model_name = os.getenv("MODEL_NAME", "gpt-4o-mini") + Args: + llm: 基础语言模型。 + verbose: 是否启用详细日志。 + """ + llm_chain = TaskPlaningChain.from_llm(llm, verbose=verbose) + output_parser = PlanningOutputParser() + stop = None + + # 使用pydantic的正确初始化方式 + super().__init__( + llm_chain=llm_chain, + output_parser=output_parser, + stop=stop + ) + + def plan(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> Plan: + """Given input, decided what to do.""" + inputs["tools"] = [ + f"{tool.name}: {tool.description}" for tool in inputs["hf_tools"] + ] + llm_response = self.llm_chain.run(**inputs, stop=self.stop, callbacks=callbacks) + return self.output_parser.parse(llm_response, inputs["hf_tools"]) + + async def aplan( + self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any + ) -> Plan: + """Asynchronous Given input, decided what to do.""" + inputs["hf_tools"] = [ + f"{tool.name}: {tool.description}" for tool in inputs["hf_tools"] + ] + llm_response = await self.llm_chain.arun( + **inputs, stop=self.stop, callbacks=callbacks + ) + return self.output_parser.parse(llm_response, inputs["hf_tools"]) + +class ResponseGenerator: + """Generates a response based on the input.""" + + def __init__(self, llm_chain: LLMChain, stop: Optional[List] = None): + self.llm_chain = llm_chain + self.stop = stop + + def generate(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> str: + """Given input, decided what to do.""" + llm_response = self.llm_chain.run(**inputs, stop=self.stop, callbacks=callbacks) + # print(f"[Jarvis-Debug] ResponseGenerator.generate LLM response: {llm_response}") + return llm_response + + def run(self, problem: str, task_list: str, executed_task_list: str, format_prompt: str = None, **kwargs) -> str: + """ + 运行响应生成,兼容evaluation_framework中的调用方式 + + Args: + problem: 原始问题 + task_list: 任务列表 + executed_task_list: 执行结果列表 + format_prompt: 格式化提示,用于指导输出格式 + """ + # 构建基础输入 + task_execution = f"Problem: {problem}\nTasks: {task_list}\nResults: {executed_task_list}" - # Initialize agents - self.planner_agent = None - self.executor_agent = None - self.response_generator_agent = None + # 如果提供了格式化提示,将其作为指令,否则使用通用指令 + format_instructions = "Please summarize the results and generate a response." + if format_prompt: + format_instructions = format_prompt - # Initialize task list + inputs = { + "task_execution": task_execution, + "format_instructions": format_instructions + } + return self.generate(inputs, **kwargs) + +class TaskExecutor: + """Load tools and execute tasks.""" + + def __init__(self, plan: Plan): + self.plan = plan self.tasks = [] - self.task_results = {} - - def _create_agents(self): - """Create the system's required agents""" - # Task planning agent - self.planner_agent = Agent( - agent_id="planner", - name="Task Planner", - model_name=self.model_name, - system_prompt="""You are an expert task planner specialized in problem decomposition. -Your job is to analyze problems and create a detailed task plan for solving them. - -For each task, provide: -1. Task ID (starting from 1) -2. Task description (clearly stating what needs to be accomplished) -3. Dependency tasks (list of IDs of tasks that must be completed before this task can start) - -Ensure dependencies between tasks are clear, and the task sequence effectively solves the problem. -Keep tasks linear and avoid complex parallel processing. - -Be especially careful with: -- Mathematical problems: Break them down into clear computational steps -- Reasoning problems: Create steps for analysis, inference, and conclusion -- Complex problems: Decompose into smaller, manageable sub-problems - -Always ensure the final task synthesizes all previous task results.""" - ) - - # Task execution agent - self.executor_agent = Agent( - agent_id="executor", - name="Task Executor", - model_name=self.model_name, - system_prompt="""You are an expert task executor specialized in precise problem-solving. -Your job is to execute specified tasks and provide detailed execution results. - -When receiving a task description and related parameters, you must: -1. Understand the task objective -2. Perform necessary calculations or reasoning with absolute precision -3. Show your detailed step-by-step work -4. Provide clear execution results - -Ensure your results are accurate, your reasoning process is clear, and your output can be used for subsequent tasks.""" + self.id_task_map = {} + self.status = "pending" + for step in self.plan.steps: + task = Task(step.task, step.id, step.dep, step.args, step.tool) + self.tasks.append(task) + self.id_task_map[step.id] = task + + def completed(self) -> bool: + return all(task.completed() for task in self.tasks) + + def failed(self) -> bool: + return any(task.failed() for task in self.tasks) + + def pending(self) -> bool: + return any(task.pending() for task in self.tasks) + + def check_dependency(self, task: Task) -> bool: + for dep_id in task.dep: + if dep_id == -1: + continue + dep_task = self.id_task_map[dep_id] + if dep_task.failed() or dep_task.pending(): + return False + return True + + def update_args(self, task: Task) -> None: + for dep_id in task.dep: + if dep_id == -1: + continue + dep_task = self.id_task_map[dep_id] + for k, v in task.args.items(): + if f"" in v: + task.args[k] = task.args[k].replace( + f"", dep_task.result + ) + + def run(self) -> str: + # for task in self.tasks: + for task in self.tasks: + # print(f"running {task}") # noqa: T201 + if task.pending() and self.check_dependency(task): + self.update_args(task) + task.run() + if self.completed(): + self.status = "completed" + elif self.failed(): + self.status = "failed" + else: + self.status = "pending" + return self.status + + def __str__(self) -> str: + result = "" + for task in self.tasks: + result += f"{task}\n" + result += f"status: {task.status}\n" + if task.failed(): + result += f"message: {task.message}\n" + if task.completed(): + result += f"result: {task.result}\n" + return result + + def __repr__(self) -> str: + return self.__str__() + + def describe(self) -> str: + return self.__str__() + +class TaskPlaningChain(LLMChain): + """Chain to execute tasks.""" + + @classmethod + def from_llm( + cls, + llm: BaseLanguageModel, + demos: List[Dict] = DEMONSTRATIONS, + verbose: bool = False, # 改为False减少输出 + ) -> LLMChain: + """Get the response parser.""" + system_template = """#1 Task Planning Stage: The AI assistant can parse user input to several tasks: [{{"task": task, "id": task_id, "dep": dependency_task_id, "args": {{"input name": text may contain }}}}]. The special tag "dep_id" refer to the one generated text/image/audio in the dependency task (Please consider whether the dependency task generates resources of this type.) and "dep_id" must be in "dep" list. The "dep" field denotes the ids of the previous prerequisite tasks which generate a new resource that the current task relies on. The task MUST be selected from the following tools (along with tool description, input name and output type): {tools}. There may be multiple tasks of the same type. Think step by step about all the tasks needed to resolve the user's request. Parse out as few tasks as possible while ensuring that the user request can be resolved. Pay attention to the dependencies and order among tasks. If the user input can't be parsed, you need to reply empty JSON [].""" # noqa: E501 + human_template = """Now I input: {input}.""" + system_message_prompt = SystemMessagePromptTemplate.from_template( + system_template ) - - # Response generation agent - self.response_generator_agent = Agent( - agent_id="generator", - name="Response Generator", - model_name=self.model_name, - system_prompt=f"""You are an expert response generator specialized in synthesis and precision. -Your job is to generate a final comprehensive answer based on all previous task results. - -When receiving a problem description and all completed task results, you must: -- Synthesize and analyze all task results comprehensively -- Provide a clear, accurate final answer, and step-by-step solution -{self.format_prompt} -""" + human_message_prompt = HumanMessagePromptTemplate.from_template(human_template) + + demo_messages: List[ + Union[HumanMessagePromptTemplate, AIMessagePromptTemplate] + ] = [] + for demo in demos: + if demo["role"] == "user": + demo_messages.append( + HumanMessagePromptTemplate.from_template(demo["content"]) + ) + else: + demo_messages.append( + AIMessagePromptTemplate.from_template(demo["content"]) + ) + # demo_messages.append(message) + + prompt = ChatPromptTemplate.from_messages( + [system_message_prompt, *demo_messages, human_message_prompt] ) - def _parse_tasks_from_text(self, text: str) -> List[Task]: - """Parse task list from text""" - tasks = [] + return cls(prompt=prompt, llm=llm, verbose=verbose) + +class PlanningOutputParser(BaseModel): + """Parses the output of the planning stage.""" + + class Config: + arbitrary_types_allowed = True + + def parse(self, text: str, hf_tools: List[BaseTool]) -> Plan: + """Parse the output of the planning stage. + + Args: + text: The output of the planning stage. + hf_tools: The tools available. + + Returns: + The plan. + """ + steps = [] try: - # Try multiple regex patterns to catch different task formats - task_patterns = [ - r"Task\s*(\d+)[:\s]+\s*(.*?)(?=Task\s*\d+[:\s]+|$)", - r"(\d+)\.\s*Task[:\s]+\s*(.*?)(?=\d+\.\s*Task[:\s]+|$)", - r"(\d+)\.\s*(.*?)(?=\d+\.\s*|$)" - ] + # 尝试找到JSON数组 + json_match = re.findall(r"\[.*\]", text) + if not json_match: + # 如果没有找到JSON数组,返回空计划 + return Plan(steps=[]) - tasks_found = [] - for pattern in task_patterns: - tasks_found = re.findall(pattern, text, re.DOTALL) - if tasks_found: - break + # 尝试解析JSON + try: + task_list = json.loads(json_match[0]) + except json.JSONDecodeError: + # JSON解析失败,返回空计划 + return Plan(steps=[]) - for task_id_str, task_desc in tasks_found: - task_id = int(task_id_str) - - # Try to extract dependencies from description - dep_patterns = [ - r"Depends on[:\s]+\s*\[(.*?)\]", - r"Dependencies[:\s]+\s*\[(.*?)\]", - r"Dependency[:\s]+\s*\[(.*?)\]" - ] - - deps = [] - for pattern in dep_patterns: - dep_match = re.search(pattern, task_desc) - if dep_match: - deps_str = dep_match.group(1) - deps = [int(d.strip()) for d in deps_str.split(",") if d.strip().isdigit()] + # 处理任务列表 + for v in task_list: + if not isinstance(v, dict) or "task" not in v: + continue + + choose_tool = None + for tool in hf_tools: + if tool.name == v["task"]: + choose_tool = tool break - - # Create task - task = Task( - task=task_desc.strip(), - id=task_id, - dep=deps, - args={"description": task_desc.strip()}, - tool="reasoning" - ) - tasks.append(task) - - # If no tasks found, create a default task - if not tasks: - tasks.append(Task( - task="Solve the problem", - id=1, - dep=[], - args={"description": "Analyze and solve the given problem"}, - tool="reasoning" - )) - + + if choose_tool: + steps.append(Step(v["task"], v["id"], v["dep"], v["args"], tool)) + except Exception as e: - print(f"Error parsing tasks: {str(e)}") - tasks.append(Task( - task="Solve the problem", - id=1, - dep=[], - args={"description": "Analyze and solve the given problem"}, - tool="reasoning" - )) - - return tasks - - async def _plan_tasks(self, problem: str) -> List[Task]: - """Plan task list asynchronously""" - if not self.planner_agent: - self._create_agents() + # 任何其他错误,返回空计划 + pass - prompt = f""" - Please analyze the following problem and create a task plan for solving it: + return Plan(steps=steps) - Problem: {problem} - - Provide a list of tasks, each including: - 1. Task ID - 2. Task description - 3. List of dependency task IDs (empty list if no dependencies) - - Format example: - Task 1: Analyze the problem - Dependencies: [] - - Task 2: Solve sub-problem A - Dependencies: [1] - - Task 3: Solve sub-problem B - Dependencies: [1] - - Task 4: Merge results - Dependencies: [2, 3] - - Ensure tasks are linear and don't require complex parallel processing. - For mathematical problems, break down each computational step clearly. - """ - - response = await self.planner_agent.generate_response(prompt) - content = response.get("content", "") - - return self._parse_tasks_from_text(content) +class ResponseGenerationChain(LLMChain): + """Chain to execute tasks.""" - async def _execute_task(self, task: Task, problem: str) -> Dict[str, Any]: - """Execute a single task asynchronously""" - if not self.executor_agent: - self._create_agents() - - # Prepare results from dependent tasks - dependency_results = {} - for dep_id in task.dep: - if dep_id in self.task_results: - dependency_results[f"Task {dep_id} Result"] = self.task_results[dep_id].get("content", "") - - dependency_text = "\n\n".join([f"{name}:\n{result}" for name, result in dependency_results.items()]) - - prompt = f""" - Original Problem: {problem} - - Current Task: - ID: {task.id} - Description: {task.task} - - Results from Dependency Tasks: - {dependency_text if dependency_text else "No dependency tasks"} - - Please execute this task with precision and provide detailed step-by-step work and results. - For mathematical calculations, show all steps and verify your final answer. - For reasoning tasks, explain your logical process thoroughly. - """ - - response = await self.executor_agent.generate_response(prompt) - - execution_result = { - "task_id": task.id, - "content": response.get("content", ""), - "message": response.get("message", None) # Preserve complete message object with metadata - } - - return execution_result + @classmethod + def from_llm(cls, llm: BaseLanguageModel, verbose: bool = False) -> LLMChain: + execution_template = ( + "The AI assistant has parsed the user input into several tasks" + "and executed them. The results are as follows:\n" + "{task_execution}" + "\n{format_instructions}" + ) + prompt = PromptTemplate( + template=execution_template, + input_variables=["task_execution", "format_instructions"], + ) + return cls(prompt=prompt, llm=llm, verbose=verbose) - async def _generate_final_response(self, problem: str, task_results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]: - """Generate final response asynchronously""" - if not self.response_generator_agent: - self._create_agents() - - # Format all task results - formatted_results = "\n\n".join([ - f"Task {task_id} Result:\n{result.get('content', '')}" - for task_id, result in task_results.items() - ]) - - prompt = f""" - Original Problem: {problem} - - Task Execution Results: - {formatted_results} - - Based on all the information above, please generate a comprehensive final answer. - - Your response must: - - Verify all calculations from the previous tasks - - {self.format_prompt} - - Make sure your answer is concise, logically clear, and directly answers the original problem. - Double-check all mathematical operations for accuracy. + +def load_response_generator(llm: BaseLanguageModel) -> ResponseGenerator: + """Load the ResponseGenerator.""" + + llm_chain = ResponseGenerationChain.from_llm(llm, verbose=False) + return ResponseGenerator( + llm_chain=llm_chain, + ) + +def load_chat_planner(llm: BaseLanguageModel) -> TaskPlanner: + """Load the chat planner.""" + + return TaskPlanner(llm = llm) + +class HuggingGPT: + """Agent for interacting with HuggingGPT - Text Processing Version.""" + + def __init__(self, llm: BaseLanguageModel, tools: List[BaseTool], name: str = "jarvis_redesign"): + self.llm = llm + self.tools = tools + self.name = name + self.task_planner = load_chat_planner(llm) + self.response_generator = load_response_generator(llm) + self.task_executor = None + + def run(self, input: str) -> str: + """Process text input through planning, execution, and response generation.""" + # Plan tasks based on input + plan = self.task_planner.plan(inputs={"input": input, "hf_tools": self.tools}) + + # Execute planned tasks + self.task_executor = TaskExecutor(plan) + execution_status = self.task_executor.run() + + # Generate response based on execution results + response = self.response_generator.generate( + {"task_execution": self.task_executor} + ) + return response + + def get_plan(self, input: str): + """Get the execution plan for debugging purposes.""" + return self.task_planner.plan(inputs={"input": input, "hf_tools": self.tools}) + + def get_execution_details(self): + """Get detailed execution information for analysis.""" + if hasattr(self, 'task_executor') and self.task_executor: + return self.task_executor.describe() + return "No execution performed yet." + + def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> Dict[str, Any]: """ + 运行HuggingGPT,并返回带有完整执行轨迹的结果。 - response = await self.response_generator_agent.generate_response(prompt) - content = response.get("content", "") - - return { - "final_answer": content, - "message": response.get("message", None) # Preserve complete message object with metadata - } + Args: + problem: 输入问题 + format_prompt: 格式化提示,用于指导输出格式 + **kwargs: 其他参数 + """ + message_collector = MessageCollectorCallback(name=self.name) + # Add collector to callbacks + callbacks = kwargs.get("callbacks", []) + if not isinstance(callbacks, list): + callbacks = [callbacks] + callbacks.append(message_collector) + kwargs["callbacks"] = callbacks - async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: - """Run the agent system to process a problem asynchronously""" - print("JARVIS: Starting problem processing...") - start_time = time.time() - - # Extract problem text - problem_text = problem.get("problem", "") - print(f"Problem: {problem_text[:100]}...") - - # Initialize agents - self._create_agents() - print("Agents created successfully") - - # Step 1: Task planning - print("Starting task planning...") - self.tasks = await self._plan_tasks(problem_text) - print(f"Task planning complete, {len(self.tasks)} tasks planned") - - # Collect all messages - containing complete AIMessage objects - all_messages = [] - planner_message = None - if self.planner_agent.chat_history: - planner_content = self.planner_agent.chat_history[-1]["ai"] - # If it's an AIMessage object, preserve it; otherwise create one - if hasattr(planner_content, "content"): - planner_message = planner_content - else: - # Create an AIMessage object - msg = AIMessage(content=str(planner_content)) - msg.name = "Task Planner" - planner_message = msg - - all_messages.append(planner_message) - print(f"Added planner agent response, length: {len(str(planner_message.content))}") - else: - print("Warning: Planner agent didn't generate history") - msg = AIMessage(content="Task planning didn't generate valid output") - msg.name = "Task Planner" - all_messages.append(msg) + # 临时禁用langchain详细输出 + import logging + import warnings + + # 禁用各种日志输出 + loggers_to_silence = [ + "langchain", + "langchain.chains", + "langchain.schema", + "httpx", + ] - # Step 2: Execute tasks with dependency-based parallelization - print("Starting task execution...") - self.task_results = {} + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Track which tasks are completed - completed_tasks = set() - pending_tasks = {task.id: task for task in self.tasks} + # 抑制警告 + warnings.filterwarnings("ignore", category=DeprecationWarning) - while pending_tasks: - # Find tasks that can be executed in parallel (all dependencies satisfied) - ready_tasks = [] - for task_id, task in list(pending_tasks.items()): - dependencies_satisfied = all(dep_id in completed_tasks for dep_id in task.dep) - if dependencies_satisfied: - ready_tasks.append(task) - del pending_tasks[task_id] - - if not ready_tasks: - # If no tasks are ready but there are pending tasks, there might be a dependency cycle - print("Warning: Possible dependency cycle detected. Breaking cycle...") - # Force-execute the first pending task - if pending_tasks: - first_pending = next(iter(pending_tasks.values())) - ready_tasks.append(first_pending) - del pending_tasks[first_pending.id] - else: - # All tasks are done - break - - print(f"Executing {len(ready_tasks)} tasks in parallel...") + try: + # 如果提供了格式化提示,则将其添加到问题中 + formatted_problem = problem + if format_prompt: + formatted_problem = f"{problem}\n\n{format_prompt}" - # Execute all ready tasks in parallel - tasks_execution = [self._execute_task(task, problem_text) for task in ready_tasks] - results = await asyncio.gather(*tasks_execution) + # 使用统一的task_planner + task_list = self.task_planner.plan(inputs={"input": formatted_problem, "hf_tools": self.tools}, **kwargs) + finally: + # 恢复原日志级别 + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + warnings.filterwarnings("default", category=DeprecationWarning) + + if not task_list.steps: + # 同样禁用response_generator的输出 + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Process results - for i, result in enumerate(results): - task = ready_tasks[i] - task_id = task.id + try: + # 为response_generator也传递格式化提示 + response_input = f"Problem: {problem}\nTasks: []\nResults: []" + if format_prompt: + response_input += f"\n\nFormat Requirements: {format_prompt}" - self.task_results[task_id] = result - completed_tasks.add(task_id) - print(f"Task {task_id} completed") - - # Add to message list - preserve complete AIMessage object - if "message" in result and result["message"] is not None: - all_messages.append(result["message"]) - print(f"Added execution message to results, length: {len(str(result['message'].content))}") - else: - # Create an AIMessage object - content = result.get("content", f"Task {task_id} execution result unavailable") - msg = AIMessage(content=content) - msg.name = f"Task Executor {task_id}" - all_messages.append(msg) - print(f"Added execution message to results, length: {len(content)}") - - # Step 3: Generate final response - print("Generating final response...") - final_response = await self._generate_final_response(problem_text, self.task_results) - - # Add final response to message list - preserve complete AIMessage object - if "message" in final_response and final_response["message"] is not None: - all_messages.append(final_response["message"]) - print(f"Added final response to results, length: {len(str(final_response['message'].content))}") - else: - # Create an AIMessage object - content = final_response.get("final_answer", "Final response generation failed") - msg = AIMessage(content=content) - msg.name = "Response Generator" - all_messages.append(msg) - print(f"Added final response to results, length: {len(content)}") + response = self.response_generator.run( + problem=problem, + task_list="[]", + executed_task_list="[]", + format_prompt=format_prompt, + **kwargs + ) + finally: + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + return { + "tasks": [], + "executed_tasks": [], + "response": response, + "messages": message_collector.messages or [("ai", response)] # 模拟消息历史 + } + + # 执行任务 + self.task_executor = TaskExecutor(task_list) + execution_status = self.task_executor.run() + + # 收集执行结果 + executed_task_list = [] + for task in self.task_executor.tasks: + executed_task_list.append({ + "task": task.task, + "id": task.id, + "status": task.status, + "result": task.result, + "message": task.message + }) - end_time = time.time() - duration_ms = (end_time - start_time) * 1000 - print(f"Processing complete, took {duration_ms:.2f}ms") + # 将Plan对象转换为可序列化的格式 + task_list_serializable = [] + for step in task_list.steps: + task_list_serializable.append({ + "task": step.task, + "id": step.id, + "dep": step.dep, + "args": step.args + }) - # Build result - result = { - "problem_id": problem.get("id", ""), - "problem": problem_text, - "messages": all_messages, # Preserve complete AIMessage object list - "final_answer": final_response.get("final_answer", ""), - "execution_time_ms": duration_ms, - } + # 再次禁用response_generator的输出 + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Record agent responses - pass complete AIMessage object list to preserve usage_metadata try: - print("Recording agent responses...") - self._record_agent_responses(result["problem_id"], all_messages) - except Exception as e: - print(f"Error recording responses: {str(e)}") - - print("JARVIS processing complete") - return result + response = self.response_generator.run( + problem=problem, + task_list=json.dumps(task_list_serializable), + executed_task_list=json.dumps(executed_task_list), + format_prompt=format_prompt, + **kwargs + ) + finally: + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + + # 收集所有消息以供评估 + messages = message_collector.messages + if not messages: + messages = [("ai", response)] + + return { + "tasks": task_list, + "executed_tasks": executed_task_list, + "response": response, + "messages": messages + } + +class JarvisRedesignAgent(AgentSystem): + """ + AgentSystem wrapper for the HuggingGPT implementation. + """ + def __init__(self, name: str = "jarvis_redesign", config: Dict[str, Any] | None = None): + super().__init__(name, config) + self.config = config or {} + self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") + self.llm = ChatOpenAI(model=self.model_name, temperature=0.0) -# Register agent system -AgentSystemRegistry.register("jarvis", JARVIS) + # As per user request, tools are initialized as an empty list + self.tools: list[BaseTool] = [] + self.agent = HuggingGPT(self.llm, self.tools, name=self.name) + # self.format_prompt is inherited from AgentSystem and set in super().__init__ + + async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: + """ + Runs the HuggingGPT agent. + """ + problem_text = problem["problem"] + # print(f"[Jarvis-Debug] JarvisRedesignAgent.run_agent received problem: {problem_text}") + + result = await asyncio.to_thread( + self.agent.run_with_trace, + problem=problem_text, + format_prompt=self.format_prompt, + **kwargs + ) + + # print(f"[Jarvis-Debug] JarvisRedesignAgent.run_agent final result: {result}") + + return { + "messages": result.get("messages", []), + "final_answer": result.get("response", "") + } + + def _create_agents(self) -> dict[str, list]: + """ + Create agents for tool integration. For HuggingGPT, we don't have separate agents, + but we can expose the main LLM instance for tool binding. + """ + # The wrapper expects a dictionary: {"workers": [worker1, worker2, ...]} + # Each worker should have a .name and .llm attribute. + # We can create a pseudo-worker representing the HuggingGPT planner. + planner_pseudo_worker = type("Worker", (object,), { + "name": "hugging_gpt_planner", + "llm": self.llm + }) + return {"workers": [planner_pseudo_worker]} -if __name__ == "__main__": - # Test the JARVIS agent - problem = { - "id": "1", - "problem": "What is the sum of the first 100 natural numbers?" - } - - jarvis = JARVIS() - result = jarvis.run_agent(problem) - print(result) +AgentSystemRegistry.register( + "jarvis_redesign", + JarvisRedesignAgent +) diff --git a/mas_arena/agents/llm_debate.py b/mas_arena/agents/llm_debate.py index 46ce314..ba6f80b 100644 --- a/mas_arena/agents/llm_debate.py +++ b/mas_arena/agents/llm_debate.py @@ -8,13 +8,14 @@ import os from typing import Dict, Any, List import contextlib +import asyncio +import time from openai import AsyncOpenAI from dotenv import load_dotenv from mas_arena.agents.base import AgentSystem, AgentSystemRegistry # Load environment variables -load_dotenv() - +load_dotenv(override=True) class LLMDebate(AgentSystem): """ @@ -30,8 +31,8 @@ def __init__(self, name: str = "llm_debate", config: Dict[str, Any] = None): self.config = config or {} # Configuration parameters - self.agents_num = self.config.get("agents_num", 3) # Number of debate agents - self.rounds_num = self.config.get("rounds_num", 2) # Number of debate rounds + self.agents_num = self.config.get("agents_num", 2) # Number of debate agents + self.rounds_num = self.config.get("rounds_num", 3) # Number of debate rounds self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") # System prompt with format requirements @@ -41,7 +42,8 @@ def __init__(self, name: str = "llm_debate", config: Dict[str, Any] = None): # Initialize OpenAI client self.client = AsyncOpenAI( api_key=os.getenv("OPENAI_API_KEY"), - base_url=os.getenv("OPENAI_API_BASE") + base_url=os.getenv("OPENAI_API_BASE"), + timeout=40 ) async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: @@ -56,48 +58,36 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: """ query = problem["problem"] - # Initialize agent contexts - each agent starts with the same query - initial_message = { - "role": "user", - "content": f"{query} Make sure to state your answer at the end of the response." - } - agent_contexts = [[initial_message] for _ in range(self.agents_num)] + # Use the gen_math logic for processing + agent_contexts = await self._process_single_question_async(query, self.agents_num, self.rounds_num) + + # Convert to evaluation framework format all_messages = [] + final_answers = [] - # Multi-round debate + # Extract messages from agent contexts for round_idx in range(self.rounds_num): - round_messages = [] - for agent_idx, agent_context in enumerate(agent_contexts): - # For rounds after the first, add other agents' opinions - if round_idx > 0: - # Get contexts from other agents (excluding current agent) - other_contexts = agent_contexts[:agent_idx] + agent_contexts[agent_idx+1:] - debate_message = self._construct_debate_message(other_contexts, query, round_idx) - agent_context.append(debate_message) - - # Get response from current agent - response = await self._call_llm(agent_context) - - # Create response message with usage metadata - ai_message = { - 'content': response['content'], - 'name': f'debate_agent_{agent_idx+1}', - 'role': 'assistant', - 'message_type': 'ai_response', - 'round': round_idx + 1, - 'agent_id': agent_idx + 1, - 'usage_metadata': response['usage'] - } - - # Add response to agent's context - agent_context.append({"role": "assistant", "content": response['content']}) - round_messages.append(ai_message) - - all_messages.extend(round_messages) + # Get the assistant response for this round + response_index = 2 * round_idx + 1 # Based on gen_math logic + if response_index < len(agent_context): + response_msg = agent_context[response_index] + + ai_message = { + 'content': response_msg['content'], + 'name': f'debate_agent_{agent_idx+1}', + 'role': 'assistant', + 'message_type': 'ai_response', + 'round': round_idx + 1, + 'agent_id': agent_idx + 1, + 'usage_metadata': None # gen_math doesn't track usage + } + all_messages.append(ai_message) # Extract final answers from each agent - final_answers = [context[-1]['content'] for context in agent_contexts] + for agent_context in agent_contexts: + if len(agent_context) >= 2: + final_answers.append(agent_context[-1]['content']) # Aggregate all answers into final result aggregated_answer = await self._aggregate_answers(query, final_answers) @@ -120,92 +110,179 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: "agents_participated": self.agents_num } - async def _call_llm(self, messages: List[Dict]) -> Dict[str, Any]: + async def _process_single_question_async(self, question: str, agents: int, rounds: int) -> List[List[Dict]]: """ - Call the LLM with given messages and return response with usage metadata. - - Args: - messages: List of message dictionaries - - Returns: - Dictionary containing response content and usage metadata + 异步处理单个问题的辩论过程 """ - # Prepare messages for API call - api_messages = [{"role": "system", "content": self.system_prompt}] + messages - - try: - response = await self.client.chat.completions.create( - model=self.model_name, - messages=api_messages - ) + # 为每个智能体创建初始上下文 + agent_contexts = [[{ + "role": "user", + "content": """Can you solve the following problem? {} Please explain your reasoning step by step. Make sure to state your final answer clearly at the end of your response.""".format(question), + "agent_id": i + 1, # 添加agent_id + "message_type": "user_query", # 添加消息类型 + "round": 0 # 添加轮次信息 + }] for i in range(agents)] + + # 多轮辩论 + for round in range(rounds): + # 收集所有智能体的任务 + tasks = [] - # Extract and clean content - content = response.choices[0].message.content - content = content.replace('\r\n', '\n').replace('\r', '\n').strip() - with contextlib.suppress(UnicodeDecodeError): - content = content.encode('utf-8').decode('utf-8-sig') # Remove BOM + for i, agent_context in enumerate(agent_contexts): + if round != 0: + agent_contexts_other = agent_contexts[:i] + agent_contexts[i+1:] + message = self._construct_message(agent_contexts_other, question, 2*round - 1) + # 添加元数据到message + message.update({ + "agent_id": i + 1, + "message_type": "debate_query", + "round": round + 1 + }) + agent_context.append(message) + + # 创建异步任务 + task = self._generate_answer_async(agent_context) + tasks.append((i, task)) - return { - 'content': content, - 'usage': response.usage - } + # 并行执行所有智能体的API调用 + print(f"第 {round + 1} 轮:并行调用 {len(tasks)} 个智能体...") - except Exception as e: - # Fallback response in case of API error - return { - 'content': f"Error calling LLM: {str(e)}", - 'usage': None - } + # 使用asyncio.gather真正并行执行所有任务 + task_results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) + + # 处理结果 + for (i, _), result in zip(tasks, task_results): + if isinstance(result, Exception): + print(f"智能体 {i} 出现异常: {result}") + content = f"抱歉,智能体 {i} 出现错误: {str(result)}" + usage = None + else: + content = result.choices[0].message.content if hasattr(result, 'choices') else "无法获取回答内容" + usage = getattr(result, 'usage', None) + + assistant_message = { + "role": "assistant", + "content": content, + "agent_id": i + 1, + "message_type": "ai_response", + "round": round + 1, + "usage_metadata": usage + } + agent_contexts[i].append(assistant_message) + + return agent_contexts - def _construct_debate_message(self, other_agent_contexts: List[List[Dict]], - query: str, round_idx: int) -> Dict[str, str]: + def _construct_message(self, agents: List[List[Dict]], question: str, idx: int) -> Dict[str, str]: """ - Construct a message containing other agents' opinions for the current agent. - - Args: - other_agent_contexts: List of other agents' conversation contexts - query: Original query/problem - round_idx: Current round index - - Returns: - Message dictionary containing other agents' opinions + 构造包含其他智能体意见的消息 """ - # Handle case with no other agents (single agent introspection) - if len(other_agent_contexts) == 0: + # Use introspection in the case in which there are no other agents. + if len(agents) == 0: return { "role": "user", "content": "Can you verify that your answer is correct. Please reiterate your answer, making sure to state your answer at the end of the response." } - - # Build message with other agents' recent responses + prefix_string = "These are the recent/updated opinions from other agents: " - - for agent_idx, agent_context in enumerate(other_agent_contexts): - # Get the most recent assistant response (last response in context) - if len(agent_context) > 1: # Ensure there's at least one response - # Find the latest assistant message - latest_response = None - for msg in reversed(agent_context): - if msg.get("role") == "assistant": - latest_response = msg["content"] - break - - if latest_response: - response_text = f"\n\nAgent {agent_idx + 1} response: ```{latest_response}```" - prefix_string += response_text - - # Add instruction for using the opinions - suffix_string = ( - f"\n\nUse these opinions carefully as additional advice, can you provide an updated answer? " - f"Make sure to state your answer at the end of the response. " - f"\nThe original problem is: {query}" - ) - + + for agent in agents: + if idx < len(agent): + agent_response = agent[idx]["content"] + agent_id = agent[idx].get("agent_id", "unknown") + response = f"\n\n Agent {agent_id} response: ```{agent_response}```" + prefix_string = prefix_string + response + + prefix_string = prefix_string + "\n\n Use these opinions carefully as additional advice, can you provide an updated answer? Make sure to state your answer at the end of the response." return { "role": "user", - "content": prefix_string + suffix_string + "content": prefix_string } + def _construct_assistant_message(self, completion) -> Dict[str, str]: + """ + 构造assistant消息 + """ + # 检查 completion 是否为字符串(错误情况) + if isinstance(completion, str): + print(f"警告:收到字符串响应而非API对象: {completion[:100]}...") + return {"role": "assistant", "content": "API返回了错误的响应格式"} + + # 检查是否有 choices 属性 + if hasattr(completion, 'choices') and len(completion.choices) > 0: + content = completion.choices[0].message.content + return {"role": "assistant", "content": content} + else: + # 备用方案 + return {"role": "assistant", "content": "抱歉,无法获取回答内容。"} + + async def _generate_answer_async(self, answer_context: List[Dict]) -> Any: + """ + 异步版本的API调用函数 + """ + # 添加系统提示到消息开头,移除元数据字段 + api_messages = [{"role": "system", "content": self.system_prompt}] + for msg in answer_context: + api_messages.append({ + "role": msg["role"], + "content": msg["content"] + }) + + max_retries = 3 + for attempt in range(max_retries): + try: + completion = await self.client.chat.completions.create( + model=self.model_name, + messages=api_messages, + n=1 + ) + return completion + except Exception as e: + print(f"API调用出错,正在重试... (尝试 {attempt + 1}/{max_retries})") + print(f"错误类型: {type(e).__name__}") + print(f"错误详情: {str(e)}") + if attempt < max_retries - 1: + print("等待5秒后重试...") # 减少等待时间 + await asyncio.sleep(5) + else: + print("已达到最大重试次数,跳过此请求") + # 返回一个模拟的 completion 对象以避免程序崩溃 + class MockCompletion: + def __init__(self): + self.choices = [type('obj', (object,), { + 'message': type('obj', (object,), { + 'content': "抱歉,由于API错误无法生成回答。" + })() + })()] + return MockCompletion() + + return None + + async def _call_llm(self, messages: List[Dict]) -> Dict[str, Any]: + """ + Call the LLM with given messages and return response with usage metadata. + """ + # 使用新的异步生成答案方法 + completion = await self._generate_answer_async(messages) + + # 提取内容和使用信息 + if hasattr(completion, 'choices') and len(completion.choices) > 0: + content = completion.choices[0].message.content + content = content.replace('\r\n', '\n').replace('\r', '\n').strip() + with contextlib.suppress(UnicodeDecodeError): + content = content.encode('utf-8').decode('utf-8-sig') # Remove BOM + + usage = getattr(completion, 'usage', None) + + return { + 'content': content, + 'usage': usage + } + else: + return { + 'content': "抱歉,无法获取回答内容。", + 'usage': None + } + async def _aggregate_answers(self, query: str, answers: List[str]) -> Dict[str, Any]: """ Aggregate all agents' final answers into a single result. @@ -235,4 +312,4 @@ async def _aggregate_answers(self, query: str, answers: List[str]) -> Dict[str, # Register the agent system AgentSystemRegistry.register("llm_debate", LLMDebate, - agents_num=3, rounds_num=2) \ No newline at end of file + agents_num=2, rounds_num=3) \ No newline at end of file From 73c13cc5a4898b75dec3a8662fe1857ee3544bf4 Mon Sep 17 00:00:00 2001 From: fuzi233 Date: Thu, 24 Jul 2025 16:56:31 +0800 Subject: [PATCH 2/3] Redesign Jarvis --- mas_arena/agents/jarvis.py | 68 +++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/mas_arena/agents/jarvis.py b/mas_arena/agents/jarvis.py index ce3e1af..32bf360 100644 --- a/mas_arena/agents/jarvis.py +++ b/mas_arena/agents/jarvis.py @@ -164,17 +164,17 @@ class Config: def __init__(self, llm: BaseLanguageModel, verbose: bool = False): """ - 初始化任务规划器。 + Initialize task planner. Args: - llm: 基础语言模型。 - verbose: 是否启用详细日志。 + llm: base language model. + verbose: whether to enable detailed logging. """ llm_chain = TaskPlaningChain.from_llm(llm, verbose=verbose) output_parser = PlanningOutputParser() stop = None - # 使用pydantic的正确初始化方式 + # use pydantic's correct initialization method super().__init__( llm_chain=llm_chain, output_parser=output_parser, @@ -216,13 +216,13 @@ def generate(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> def run(self, problem: str, task_list: str, executed_task_list: str, format_prompt: str = None, **kwargs) -> str: """ - 运行响应生成,兼容evaluation_framework中的调用方式 + Run response generation, compatible with the calling method in evaluation_framework. Args: - problem: 原始问题 - task_list: 任务列表 - executed_task_list: 执行结果列表 - format_prompt: 格式化提示,用于指导输出格式 + problem: original problem + task_list: task list + executed_task_list: execution result list + format_prompt: format prompt, for guiding output format """ # 构建基础输入 task_execution = f"Problem: {problem}\nTasks: {task_list}\nResults: {executed_task_list}" @@ -320,7 +320,7 @@ def from_llm( cls, llm: BaseLanguageModel, demos: List[Dict] = DEMONSTRATIONS, - verbose: bool = False, # 改为False减少输出 + verbose: bool = False, # set to False to reduce output ) -> LLMChain: """Get the response parser.""" system_template = """#1 Task Planning Stage: The AI assistant can parse user input to several tasks: [{{"task": task, "id": task_id, "dep": dependency_task_id, "args": {{"input name": text may contain }}}}]. The special tag "dep_id" refer to the one generated text/image/audio in the dependency task (Please consider whether the dependency task generates resources of this type.) and "dep_id" must be in "dep" list. The "dep" field denotes the ids of the previous prerequisite tasks which generate a new resource that the current task relies on. The task MUST be selected from the following tools (along with tool description, input name and output type): {tools}. There may be multiple tasks of the same type. Think step by step about all the tasks needed to resolve the user's request. Parse out as few tasks as possible while ensuring that the user request can be resolved. Pay attention to the dependencies and order among tasks. If the user input can't be parsed, you need to reply empty JSON [].""" # noqa: E501 @@ -368,20 +368,20 @@ def parse(self, text: str, hf_tools: List[BaseTool]) -> Plan: """ steps = [] try: - # 尝试找到JSON数组 + # try to find JSON array json_match = re.findall(r"\[.*\]", text) if not json_match: - # 如果没有找到JSON数组,返回空计划 + # if no JSON array is found, return empty plan return Plan(steps=[]) - # 尝试解析JSON + # try to parse JSON try: task_list = json.loads(json_match[0]) except json.JSONDecodeError: - # JSON解析失败,返回空计划 + # JSON parsing failed, return empty plan return Plan(steps=[]) - # 处理任务列表 + # process task list for v in task_list: if not isinstance(v, dict) or "task" not in v: continue @@ -396,7 +396,7 @@ def parse(self, text: str, hf_tools: List[BaseTool]) -> Plan: steps.append(Step(v["task"], v["id"], v["dep"], v["args"], tool)) except Exception as e: - # 任何其他错误,返回空计划 + # any other error, return empty plan pass return Plan(steps=steps) @@ -470,12 +470,12 @@ def get_execution_details(self): def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> Dict[str, Any]: """ - 运行HuggingGPT,并返回带有完整执行轨迹的结果。 + Run HuggingGPT and return the result with the complete execution trace. Args: - problem: 输入问题 - format_prompt: 格式化提示,用于指导输出格式 - **kwargs: 其他参数 + problem: input problem + format_prompt: format prompt, for guiding output format + **kwargs: other parameters """ message_collector = MessageCollectorCallback(name=self.name) # Add collector to callbacks @@ -485,11 +485,11 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D callbacks.append(message_collector) kwargs["callbacks"] = callbacks - # 临时禁用langchain详细输出 + # temporarily disable langchain detailed output import logging import warnings - # 禁用各种日志输出 + # disable various log outputs loggers_to_silence = [ "langchain", "langchain.chains", @@ -503,25 +503,25 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D old_levels[logger_name] = logger.level logger.setLevel(logging.ERROR) - # 抑制警告 + # suppress warnings warnings.filterwarnings("ignore", category=DeprecationWarning) try: - # 如果提供了格式化提示,则将其添加到问题中 + # if format prompt is provided, add it to the problem formatted_problem = problem if format_prompt: formatted_problem = f"{problem}\n\n{format_prompt}" - # 使用统一的task_planner + # use unified task_planner task_list = self.task_planner.plan(inputs={"input": formatted_problem, "hf_tools": self.tools}, **kwargs) finally: - # 恢复原日志级别 + # restore original log levels for logger_name, old_level in old_levels.items(): logging.getLogger(logger_name).setLevel(old_level) warnings.filterwarnings("default", category=DeprecationWarning) if not task_list.steps: - # 同样禁用response_generator的输出 + # disable response_generator's output again old_levels = {} for logger_name in loggers_to_silence: logger = logging.getLogger(logger_name) @@ -529,7 +529,7 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D logger.setLevel(logging.ERROR) try: - # 为response_generator也传递格式化提示 + # pass format prompt to response_generator response_input = f"Problem: {problem}\nTasks: []\nResults: []" if format_prompt: response_input += f"\n\nFormat Requirements: {format_prompt}" @@ -548,14 +548,14 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D "tasks": [], "executed_tasks": [], "response": response, - "messages": message_collector.messages or [("ai", response)] # 模拟消息历史 + "messages": message_collector.messages or [("ai", response)] # simulate message history } - # 执行任务 + # execute tasks self.task_executor = TaskExecutor(task_list) execution_status = self.task_executor.run() - # 收集执行结果 + # collect execution results executed_task_list = [] for task in self.task_executor.tasks: executed_task_list.append({ @@ -566,7 +566,7 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D "message": task.message }) - # 将Plan对象转换为可序列化的格式 + # convert Plan object to serializable format task_list_serializable = [] for step in task_list.steps: task_list_serializable.append({ @@ -576,7 +576,7 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D "args": step.args }) - # 再次禁用response_generator的输出 + # disable response_generator's output again old_levels = {} for logger_name in loggers_to_silence: logger = logging.getLogger(logger_name) @@ -595,7 +595,7 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D for logger_name, old_level in old_levels.items(): logging.getLogger(logger_name).setLevel(old_level) - # 收集所有消息以供评估 + # collect all messages for evaluation messages = message_collector.messages if not messages: messages = [("ai", response)] From f184a1ea4969910394d57c9a47f6cf4b7931bcde Mon Sep 17 00:00:00 2001 From: jiaqi Date: Fri, 25 Jul 2025 13:14:16 +0000 Subject: [PATCH 3/3] refactor: rename Jarvis agents and update initialization parameters --- mas_arena/agents/jarvis.py | 16 ++++++++-------- mas_arena/agents/llm_debate.py | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/mas_arena/agents/jarvis.py b/mas_arena/agents/jarvis.py index 32bf360..f3fbfdf 100644 --- a/mas_arena/agents/jarvis.py +++ b/mas_arena/agents/jarvis.py @@ -35,7 +35,7 @@ class MessageCollectorCallback(BaseCallbackHandler): """Callback to collect AIMessages and their usage metadata.""" - def __init__(self, name: str = "jarvis_redesign"): + def __init__(self, name: str = "jarvis"): super().__init__() self.messages: List[Any] = [] self.name = name @@ -435,7 +435,7 @@ def load_chat_planner(llm: BaseLanguageModel) -> TaskPlanner: class HuggingGPT: """Agent for interacting with HuggingGPT - Text Processing Version.""" - def __init__(self, llm: BaseLanguageModel, tools: List[BaseTool], name: str = "jarvis_redesign"): + def __init__(self, llm: BaseLanguageModel, tools: List[BaseTool], name: str = "jarvis"): self.llm = llm self.tools = tools self.name = name @@ -607,12 +607,12 @@ def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> D "messages": messages } -class JarvisRedesignAgent(AgentSystem): +class JarvisSingleAgent(AgentSystem): """ AgentSystem wrapper for the HuggingGPT implementation. """ - def __init__(self, name: str = "jarvis_redesign", config: Dict[str, Any] | None = None): + def __init__(self, name: str = "jarvis", config: Dict[str, Any] | None = None): super().__init__(name, config) self.config = config or {} self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") @@ -628,7 +628,7 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: Runs the HuggingGPT agent. """ problem_text = problem["problem"] - # print(f"[Jarvis-Debug] JarvisRedesignAgent.run_agent received problem: {problem_text}") + # print(f"[Jarvis-Debug] JarvisSingleAgent.run_agent received problem: {problem_text}") result = await asyncio.to_thread( self.agent.run_with_trace, @@ -637,7 +637,7 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: **kwargs ) - # print(f"[Jarvis-Debug] JarvisRedesignAgent.run_agent final result: {result}") + # print(f"[Jarvis-Debug] JarvisSingleAgent.run_agent final result: {result}") return { "messages": result.get("messages", []), @@ -660,6 +660,6 @@ def _create_agents(self) -> dict[str, list]: AgentSystemRegistry.register( - "jarvis_redesign", - JarvisRedesignAgent + "jarvis", + JarvisSingleAgent ) diff --git a/mas_arena/agents/llm_debate.py b/mas_arena/agents/llm_debate.py index ba6f80b..299f0d3 100644 --- a/mas_arena/agents/llm_debate.py +++ b/mas_arena/agents/llm_debate.py @@ -9,7 +9,6 @@ from typing import Dict, Any, List import contextlib import asyncio -import time from openai import AsyncOpenAI from dotenv import load_dotenv from mas_arena.agents.base import AgentSystem, AgentSystemRegistry @@ -79,9 +78,10 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: 'role': 'assistant', 'message_type': 'ai_response', 'round': round_idx + 1, - 'agent_id': agent_idx + 1, + 'agent_id': f'debate_agent_{agent_idx+1}', 'usage_metadata': None # gen_math doesn't track usage } + print(f"agent_name: {ai_message['name']}") all_messages.append(ai_message) # Extract final answers from each agent