diff --git a/mas_arena/agents/jarvis.py b/mas_arena/agents/jarvis.py index 7f9b89b..f3fbfdf 100644 --- a/mas_arena/agents/jarvis.py +++ b/mas_arena/agents/jarvis.py @@ -1,497 +1,665 @@ -import os -import time -import re +from __future__ import annotations import asyncio -from typing import Dict, Any, List -from dataclasses import dataclass +import copy +from typing import Dict, List +import json +import re +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Union +import os + +from langchain.base_language import BaseLanguageModel +from langchain.chains import LLMChain +from langchain_core.callbacks.manager import Callbacks +from langchain_core.prompts import PromptTemplate +from langchain_core.prompts.chat import ( + AIMessagePromptTemplate, + ChatPromptTemplate, + HumanMessagePromptTemplate, + SystemMessagePromptTemplate, +) +from langchain_core.tools import BaseTool +from pydantic import BaseModel from langchain_openai import ChatOpenAI -from langchain_core.messages import SystemMessage, HumanMessage, AIMessage +from langchain_core.callbacks.base import BaseCallbackHandler +from langchain_core.outputs import LLMResult +from openai.types.completion_usage import CompletionUsage + + from mas_arena.agents.base import AgentSystem, AgentSystemRegistry +DEMONSTRATIONS: list[dict] = [] + + +class MessageCollectorCallback(BaseCallbackHandler): + """Callback to collect AIMessages and their usage metadata.""" + + def __init__(self, name: str = "jarvis"): + super().__init__() + self.messages: List[Any] = [] + self.name = name + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + """Called at the end of an LLM call.""" + if response.generations: + generation = response.generations[0][0] + message = generation.message + message.name = self.name + + if response.llm_output and "token_usage" in response.llm_output: + token_usage_data = response.llm_output["token_usage"] + # Create a CompletionUsage object to match what base.py expects + message.usage_metadata = CompletionUsage( + completion_tokens=token_usage_data.get("completion_tokens", 0), + prompt_tokens=token_usage_data.get("prompt_tokens", 0), + total_tokens=token_usage_data.get("total_tokens", 0), + ) + + self.messages.append(message) + + class Task: - """Task execution representation.""" + """Task to be executed.""" - def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: str): + def __init__(self, task: str, id: int, dep: List[int], args: Dict, tool: BaseTool): self.task = task self.id = id self.dep = dep self.args = args self.tool = tool + self.status = "pending" + self.message = "" + self.result = "" + def __str__(self) -> str: + return f"{self.task}({self.args})" -@dataclass -class Agent: - """Represents an LLM agent""" - agent_id: str - name: str - model_name: str - system_prompt: str - chat_history: List[Dict[str, str]] = None - - def __post_init__(self): - self.chat_history = [] - self.llm = ChatOpenAI( - model=self.model_name - ) + def save_product(self) -> None: + """Save text-based products to result field.""" + # For text-based tasks, we directly store the result + # No file saving needed for text outputs + if hasattr(self, 'product'): + self.result = str(self.product) - async def generate_response(self, context: str) -> Dict[str, Any]: - """Generate agent response asynchronously""" - messages = [ - SystemMessage(content=self.system_prompt), - *[HumanMessage(content=msg["human"]) if msg.get("role") == "human" - else AIMessage(content=msg["ai"]) - for msg in self.chat_history], - HumanMessage(content=context) - ] - + def completed(self) -> bool: + return self.status == "completed" + + def failed(self) -> bool: + return self.status == "failed" + + def pending(self) -> bool: + return self.status == "pending" + + def run(self) -> str: + """Execute the task using the associated tool.""" try: - # Use standard output in async mode - response = await self.llm.ainvoke(messages) - # Save response name for source identification - response.name = self.name - - # Add to chat history - self.chat_history.append({ - "role": "human", - "human": context - }) - self.chat_history.append({ - "role": "ai", - "ai": response.content - }) + new_args = copy.deepcopy(self.args) + # For text-based tasks, execute tool and get result + result = self.tool(**new_args) - # Ensure we return the original response object to preserve usage_metadata - return { - "message": response, # Contains the complete AIMessage object with metadata - "content": response.content - } + # Store result directly for text-based outputs + if isinstance(result, str): + self.result = result + else: + # If tool returns complex object, store as product and convert to string + self.product = result + self.save_product() except Exception as e: - print(f"Response generation failed: {str(e)}") - - # Create simple response - error_content = f"Generation failed: {str(e)}" - - self.chat_history.append({ - "role": "human", - "human": context - }) - self.chat_history.append({ - "role": "ai", - "ai": error_content - }) - - # Create a minimal AIMessage object - error_message = AIMessage(content=error_content) - error_message.name = self.name - - return { - "message": error_message, - "content": error_content - } + self.status = "failed" + self.message = str(e) + return self.message + self.status = "completed" + return self.result -class JARVIS(AgentSystem): - """JARVIS multi-agent system""" +class Step: + """A step in the plan.""" - def __init__(self, name: str = "jarvis", config: Dict[str, Any] = None): - super().__init__(name, config) + def __init__( + self, task: str, id: int, dep: List[int], args: Dict[str, str], tool: BaseTool + ): + self.task = task + self.id = id + self.dep = dep + self.args = args + self.tool = tool + +class Plan: + """A plan to execute.""" + + def __init__(self, steps: List[Step]): + self.steps = steps + + def __str__(self) -> str: + return str([str(step) for step in self.steps]) + + def __repr__(self) -> str: + return str(self) + + +class BasePlanner(BaseModel): + """Base class for a planner.""" + + @abstractmethod + def plan(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> Plan: + """Given input, decide what to do.""" + + @abstractmethod + async def aplan( + self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any + ) -> Plan: + """Asynchronous Given input, decide what to do.""" + +class TaskPlanner(BasePlanner): + """Planner for tasks.""" + + llm_chain: LLMChain + output_parser: PlanningOutputParser + stop: Optional[List] = None + + class Config: + arbitrary_types_allowed = True + + def __init__(self, llm: BaseLanguageModel, verbose: bool = False): + """ + Initialize task planner. - # Get model name - self.model_name = os.getenv("MODEL_NAME", "gpt-4o-mini") + Args: + llm: base language model. + verbose: whether to enable detailed logging. + """ + llm_chain = TaskPlaningChain.from_llm(llm, verbose=verbose) + output_parser = PlanningOutputParser() + stop = None + + # use pydantic's correct initialization method + super().__init__( + llm_chain=llm_chain, + output_parser=output_parser, + stop=stop + ) + + def plan(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> Plan: + """Given input, decided what to do.""" + inputs["tools"] = [ + f"{tool.name}: {tool.description}" for tool in inputs["hf_tools"] + ] + llm_response = self.llm_chain.run(**inputs, stop=self.stop, callbacks=callbacks) + return self.output_parser.parse(llm_response, inputs["hf_tools"]) + + async def aplan( + self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any + ) -> Plan: + """Asynchronous Given input, decided what to do.""" + inputs["hf_tools"] = [ + f"{tool.name}: {tool.description}" for tool in inputs["hf_tools"] + ] + llm_response = await self.llm_chain.arun( + **inputs, stop=self.stop, callbacks=callbacks + ) + return self.output_parser.parse(llm_response, inputs["hf_tools"]) + +class ResponseGenerator: + """Generates a response based on the input.""" + + def __init__(self, llm_chain: LLMChain, stop: Optional[List] = None): + self.llm_chain = llm_chain + self.stop = stop + + def generate(self, inputs: dict, callbacks: Callbacks = None, **kwargs: Any) -> str: + """Given input, decided what to do.""" + llm_response = self.llm_chain.run(**inputs, stop=self.stop, callbacks=callbacks) + # print(f"[Jarvis-Debug] ResponseGenerator.generate LLM response: {llm_response}") + return llm_response + + def run(self, problem: str, task_list: str, executed_task_list: str, format_prompt: str = None, **kwargs) -> str: + """ + Run response generation, compatible with the calling method in evaluation_framework. + + Args: + problem: original problem + task_list: task list + executed_task_list: execution result list + format_prompt: format prompt, for guiding output format + """ + # 构建基础输入 + task_execution = f"Problem: {problem}\nTasks: {task_list}\nResults: {executed_task_list}" - # Initialize agents - self.planner_agent = None - self.executor_agent = None - self.response_generator_agent = None + # 如果提供了格式化提示,将其作为指令,否则使用通用指令 + format_instructions = "Please summarize the results and generate a response." + if format_prompt: + format_instructions = format_prompt - # Initialize task list + inputs = { + "task_execution": task_execution, + "format_instructions": format_instructions + } + return self.generate(inputs, **kwargs) + +class TaskExecutor: + """Load tools and execute tasks.""" + + def __init__(self, plan: Plan): + self.plan = plan self.tasks = [] - self.task_results = {} - - def _create_agents(self): - """Create the system's required agents""" - # Task planning agent - self.planner_agent = Agent( - agent_id="planner", - name="Task Planner", - model_name=self.model_name, - system_prompt="""You are an expert task planner specialized in problem decomposition. -Your job is to analyze problems and create a detailed task plan for solving them. - -For each task, provide: -1. Task ID (starting from 1) -2. Task description (clearly stating what needs to be accomplished) -3. Dependency tasks (list of IDs of tasks that must be completed before this task can start) - -Ensure dependencies between tasks are clear, and the task sequence effectively solves the problem. -Keep tasks linear and avoid complex parallel processing. - -Be especially careful with: -- Mathematical problems: Break them down into clear computational steps -- Reasoning problems: Create steps for analysis, inference, and conclusion -- Complex problems: Decompose into smaller, manageable sub-problems - -Always ensure the final task synthesizes all previous task results.""" - ) - - # Task execution agent - self.executor_agent = Agent( - agent_id="executor", - name="Task Executor", - model_name=self.model_name, - system_prompt="""You are an expert task executor specialized in precise problem-solving. -Your job is to execute specified tasks and provide detailed execution results. - -When receiving a task description and related parameters, you must: -1. Understand the task objective -2. Perform necessary calculations or reasoning with absolute precision -3. Show your detailed step-by-step work -4. Provide clear execution results - -Ensure your results are accurate, your reasoning process is clear, and your output can be used for subsequent tasks.""" + self.id_task_map = {} + self.status = "pending" + for step in self.plan.steps: + task = Task(step.task, step.id, step.dep, step.args, step.tool) + self.tasks.append(task) + self.id_task_map[step.id] = task + + def completed(self) -> bool: + return all(task.completed() for task in self.tasks) + + def failed(self) -> bool: + return any(task.failed() for task in self.tasks) + + def pending(self) -> bool: + return any(task.pending() for task in self.tasks) + + def check_dependency(self, task: Task) -> bool: + for dep_id in task.dep: + if dep_id == -1: + continue + dep_task = self.id_task_map[dep_id] + if dep_task.failed() or dep_task.pending(): + return False + return True + + def update_args(self, task: Task) -> None: + for dep_id in task.dep: + if dep_id == -1: + continue + dep_task = self.id_task_map[dep_id] + for k, v in task.args.items(): + if f"" in v: + task.args[k] = task.args[k].replace( + f"", dep_task.result + ) + + def run(self) -> str: + # for task in self.tasks: + for task in self.tasks: + # print(f"running {task}") # noqa: T201 + if task.pending() and self.check_dependency(task): + self.update_args(task) + task.run() + if self.completed(): + self.status = "completed" + elif self.failed(): + self.status = "failed" + else: + self.status = "pending" + return self.status + + def __str__(self) -> str: + result = "" + for task in self.tasks: + result += f"{task}\n" + result += f"status: {task.status}\n" + if task.failed(): + result += f"message: {task.message}\n" + if task.completed(): + result += f"result: {task.result}\n" + return result + + def __repr__(self) -> str: + return self.__str__() + + def describe(self) -> str: + return self.__str__() + +class TaskPlaningChain(LLMChain): + """Chain to execute tasks.""" + + @classmethod + def from_llm( + cls, + llm: BaseLanguageModel, + demos: List[Dict] = DEMONSTRATIONS, + verbose: bool = False, # set to False to reduce output + ) -> LLMChain: + """Get the response parser.""" + system_template = """#1 Task Planning Stage: The AI assistant can parse user input to several tasks: [{{"task": task, "id": task_id, "dep": dependency_task_id, "args": {{"input name": text may contain }}}}]. The special tag "dep_id" refer to the one generated text/image/audio in the dependency task (Please consider whether the dependency task generates resources of this type.) and "dep_id" must be in "dep" list. The "dep" field denotes the ids of the previous prerequisite tasks which generate a new resource that the current task relies on. The task MUST be selected from the following tools (along with tool description, input name and output type): {tools}. There may be multiple tasks of the same type. Think step by step about all the tasks needed to resolve the user's request. Parse out as few tasks as possible while ensuring that the user request can be resolved. Pay attention to the dependencies and order among tasks. If the user input can't be parsed, you need to reply empty JSON [].""" # noqa: E501 + human_template = """Now I input: {input}.""" + system_message_prompt = SystemMessagePromptTemplate.from_template( + system_template ) - - # Response generation agent - self.response_generator_agent = Agent( - agent_id="generator", - name="Response Generator", - model_name=self.model_name, - system_prompt=f"""You are an expert response generator specialized in synthesis and precision. -Your job is to generate a final comprehensive answer based on all previous task results. - -When receiving a problem description and all completed task results, you must: -- Synthesize and analyze all task results comprehensively -- Provide a clear, accurate final answer, and step-by-step solution -{self.format_prompt} -""" + human_message_prompt = HumanMessagePromptTemplate.from_template(human_template) + + demo_messages: List[ + Union[HumanMessagePromptTemplate, AIMessagePromptTemplate] + ] = [] + for demo in demos: + if demo["role"] == "user": + demo_messages.append( + HumanMessagePromptTemplate.from_template(demo["content"]) + ) + else: + demo_messages.append( + AIMessagePromptTemplate.from_template(demo["content"]) + ) + # demo_messages.append(message) + + prompt = ChatPromptTemplate.from_messages( + [system_message_prompt, *demo_messages, human_message_prompt] ) - def _parse_tasks_from_text(self, text: str) -> List[Task]: - """Parse task list from text""" - tasks = [] + return cls(prompt=prompt, llm=llm, verbose=verbose) + +class PlanningOutputParser(BaseModel): + """Parses the output of the planning stage.""" + + class Config: + arbitrary_types_allowed = True + + def parse(self, text: str, hf_tools: List[BaseTool]) -> Plan: + """Parse the output of the planning stage. + + Args: + text: The output of the planning stage. + hf_tools: The tools available. + + Returns: + The plan. + """ + steps = [] try: - # Try multiple regex patterns to catch different task formats - task_patterns = [ - r"Task\s*(\d+)[:\s]+\s*(.*?)(?=Task\s*\d+[:\s]+|$)", - r"(\d+)\.\s*Task[:\s]+\s*(.*?)(?=\d+\.\s*Task[:\s]+|$)", - r"(\d+)\.\s*(.*?)(?=\d+\.\s*|$)" - ] + # try to find JSON array + json_match = re.findall(r"\[.*\]", text) + if not json_match: + # if no JSON array is found, return empty plan + return Plan(steps=[]) - tasks_found = [] - for pattern in task_patterns: - tasks_found = re.findall(pattern, text, re.DOTALL) - if tasks_found: - break + # try to parse JSON + try: + task_list = json.loads(json_match[0]) + except json.JSONDecodeError: + # JSON parsing failed, return empty plan + return Plan(steps=[]) - for task_id_str, task_desc in tasks_found: - task_id = int(task_id_str) - - # Try to extract dependencies from description - dep_patterns = [ - r"Depends on[:\s]+\s*\[(.*?)\]", - r"Dependencies[:\s]+\s*\[(.*?)\]", - r"Dependency[:\s]+\s*\[(.*?)\]" - ] - - deps = [] - for pattern in dep_patterns: - dep_match = re.search(pattern, task_desc) - if dep_match: - deps_str = dep_match.group(1) - deps = [int(d.strip()) for d in deps_str.split(",") if d.strip().isdigit()] + # process task list + for v in task_list: + if not isinstance(v, dict) or "task" not in v: + continue + + choose_tool = None + for tool in hf_tools: + if tool.name == v["task"]: + choose_tool = tool break - - # Create task - task = Task( - task=task_desc.strip(), - id=task_id, - dep=deps, - args={"description": task_desc.strip()}, - tool="reasoning" - ) - tasks.append(task) - - # If no tasks found, create a default task - if not tasks: - tasks.append(Task( - task="Solve the problem", - id=1, - dep=[], - args={"description": "Analyze and solve the given problem"}, - tool="reasoning" - )) - + + if choose_tool: + steps.append(Step(v["task"], v["id"], v["dep"], v["args"], tool)) + except Exception as e: - print(f"Error parsing tasks: {str(e)}") - tasks.append(Task( - task="Solve the problem", - id=1, - dep=[], - args={"description": "Analyze and solve the given problem"}, - tool="reasoning" - )) - - return tasks - - async def _plan_tasks(self, problem: str) -> List[Task]: - """Plan task list asynchronously""" - if not self.planner_agent: - self._create_agents() + # any other error, return empty plan + pass - prompt = f""" - Please analyze the following problem and create a task plan for solving it: + return Plan(steps=steps) - Problem: {problem} - - Provide a list of tasks, each including: - 1. Task ID - 2. Task description - 3. List of dependency task IDs (empty list if no dependencies) - - Format example: - Task 1: Analyze the problem - Dependencies: [] - - Task 2: Solve sub-problem A - Dependencies: [1] - - Task 3: Solve sub-problem B - Dependencies: [1] - - Task 4: Merge results - Dependencies: [2, 3] - - Ensure tasks are linear and don't require complex parallel processing. - For mathematical problems, break down each computational step clearly. - """ - - response = await self.planner_agent.generate_response(prompt) - content = response.get("content", "") - - return self._parse_tasks_from_text(content) +class ResponseGenerationChain(LLMChain): + """Chain to execute tasks.""" - async def _execute_task(self, task: Task, problem: str) -> Dict[str, Any]: - """Execute a single task asynchronously""" - if not self.executor_agent: - self._create_agents() - - # Prepare results from dependent tasks - dependency_results = {} - for dep_id in task.dep: - if dep_id in self.task_results: - dependency_results[f"Task {dep_id} Result"] = self.task_results[dep_id].get("content", "") - - dependency_text = "\n\n".join([f"{name}:\n{result}" for name, result in dependency_results.items()]) - - prompt = f""" - Original Problem: {problem} - - Current Task: - ID: {task.id} - Description: {task.task} - - Results from Dependency Tasks: - {dependency_text if dependency_text else "No dependency tasks"} - - Please execute this task with precision and provide detailed step-by-step work and results. - For mathematical calculations, show all steps and verify your final answer. - For reasoning tasks, explain your logical process thoroughly. - """ - - response = await self.executor_agent.generate_response(prompt) - - execution_result = { - "task_id": task.id, - "content": response.get("content", ""), - "message": response.get("message", None) # Preserve complete message object with metadata - } - - return execution_result + @classmethod + def from_llm(cls, llm: BaseLanguageModel, verbose: bool = False) -> LLMChain: + execution_template = ( + "The AI assistant has parsed the user input into several tasks" + "and executed them. The results are as follows:\n" + "{task_execution}" + "\n{format_instructions}" + ) + prompt = PromptTemplate( + template=execution_template, + input_variables=["task_execution", "format_instructions"], + ) + return cls(prompt=prompt, llm=llm, verbose=verbose) - async def _generate_final_response(self, problem: str, task_results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]: - """Generate final response asynchronously""" - if not self.response_generator_agent: - self._create_agents() - - # Format all task results - formatted_results = "\n\n".join([ - f"Task {task_id} Result:\n{result.get('content', '')}" - for task_id, result in task_results.items() - ]) - - prompt = f""" - Original Problem: {problem} - - Task Execution Results: - {formatted_results} - - Based on all the information above, please generate a comprehensive final answer. - - Your response must: - - Verify all calculations from the previous tasks - - {self.format_prompt} - - Make sure your answer is concise, logically clear, and directly answers the original problem. - Double-check all mathematical operations for accuracy. + +def load_response_generator(llm: BaseLanguageModel) -> ResponseGenerator: + """Load the ResponseGenerator.""" + + llm_chain = ResponseGenerationChain.from_llm(llm, verbose=False) + return ResponseGenerator( + llm_chain=llm_chain, + ) + +def load_chat_planner(llm: BaseLanguageModel) -> TaskPlanner: + """Load the chat planner.""" + + return TaskPlanner(llm = llm) + +class HuggingGPT: + """Agent for interacting with HuggingGPT - Text Processing Version.""" + + def __init__(self, llm: BaseLanguageModel, tools: List[BaseTool], name: str = "jarvis"): + self.llm = llm + self.tools = tools + self.name = name + self.task_planner = load_chat_planner(llm) + self.response_generator = load_response_generator(llm) + self.task_executor = None + + def run(self, input: str) -> str: + """Process text input through planning, execution, and response generation.""" + # Plan tasks based on input + plan = self.task_planner.plan(inputs={"input": input, "hf_tools": self.tools}) + + # Execute planned tasks + self.task_executor = TaskExecutor(plan) + execution_status = self.task_executor.run() + + # Generate response based on execution results + response = self.response_generator.generate( + {"task_execution": self.task_executor} + ) + return response + + def get_plan(self, input: str): + """Get the execution plan for debugging purposes.""" + return self.task_planner.plan(inputs={"input": input, "hf_tools": self.tools}) + + def get_execution_details(self): + """Get detailed execution information for analysis.""" + if hasattr(self, 'task_executor') and self.task_executor: + return self.task_executor.describe() + return "No execution performed yet." + + def run_with_trace(self, problem: str, format_prompt: str = None, **kwargs) -> Dict[str, Any]: """ + Run HuggingGPT and return the result with the complete execution trace. - response = await self.response_generator_agent.generate_response(prompt) - content = response.get("content", "") - - return { - "final_answer": content, - "message": response.get("message", None) # Preserve complete message object with metadata - } + Args: + problem: input problem + format_prompt: format prompt, for guiding output format + **kwargs: other parameters + """ + message_collector = MessageCollectorCallback(name=self.name) + # Add collector to callbacks + callbacks = kwargs.get("callbacks", []) + if not isinstance(callbacks, list): + callbacks = [callbacks] + callbacks.append(message_collector) + kwargs["callbacks"] = callbacks - async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: - """Run the agent system to process a problem asynchronously""" - print("JARVIS: Starting problem processing...") - start_time = time.time() - - # Extract problem text - problem_text = problem.get("problem", "") - print(f"Problem: {problem_text[:100]}...") - - # Initialize agents - self._create_agents() - print("Agents created successfully") - - # Step 1: Task planning - print("Starting task planning...") - self.tasks = await self._plan_tasks(problem_text) - print(f"Task planning complete, {len(self.tasks)} tasks planned") - - # Collect all messages - containing complete AIMessage objects - all_messages = [] - planner_message = None - if self.planner_agent.chat_history: - planner_content = self.planner_agent.chat_history[-1]["ai"] - # If it's an AIMessage object, preserve it; otherwise create one - if hasattr(planner_content, "content"): - planner_message = planner_content - else: - # Create an AIMessage object - msg = AIMessage(content=str(planner_content)) - msg.name = "Task Planner" - planner_message = msg - - all_messages.append(planner_message) - print(f"Added planner agent response, length: {len(str(planner_message.content))}") - else: - print("Warning: Planner agent didn't generate history") - msg = AIMessage(content="Task planning didn't generate valid output") - msg.name = "Task Planner" - all_messages.append(msg) + # temporarily disable langchain detailed output + import logging + import warnings + + # disable various log outputs + loggers_to_silence = [ + "langchain", + "langchain.chains", + "langchain.schema", + "httpx", + ] - # Step 2: Execute tasks with dependency-based parallelization - print("Starting task execution...") - self.task_results = {} + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Track which tasks are completed - completed_tasks = set() - pending_tasks = {task.id: task for task in self.tasks} + # suppress warnings + warnings.filterwarnings("ignore", category=DeprecationWarning) - while pending_tasks: - # Find tasks that can be executed in parallel (all dependencies satisfied) - ready_tasks = [] - for task_id, task in list(pending_tasks.items()): - dependencies_satisfied = all(dep_id in completed_tasks for dep_id in task.dep) - if dependencies_satisfied: - ready_tasks.append(task) - del pending_tasks[task_id] - - if not ready_tasks: - # If no tasks are ready but there are pending tasks, there might be a dependency cycle - print("Warning: Possible dependency cycle detected. Breaking cycle...") - # Force-execute the first pending task - if pending_tasks: - first_pending = next(iter(pending_tasks.values())) - ready_tasks.append(first_pending) - del pending_tasks[first_pending.id] - else: - # All tasks are done - break - - print(f"Executing {len(ready_tasks)} tasks in parallel...") + try: + # if format prompt is provided, add it to the problem + formatted_problem = problem + if format_prompt: + formatted_problem = f"{problem}\n\n{format_prompt}" - # Execute all ready tasks in parallel - tasks_execution = [self._execute_task(task, problem_text) for task in ready_tasks] - results = await asyncio.gather(*tasks_execution) + # use unified task_planner + task_list = self.task_planner.plan(inputs={"input": formatted_problem, "hf_tools": self.tools}, **kwargs) + finally: + # restore original log levels + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + warnings.filterwarnings("default", category=DeprecationWarning) + + if not task_list.steps: + # disable response_generator's output again + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Process results - for i, result in enumerate(results): - task = ready_tasks[i] - task_id = task.id + try: + # pass format prompt to response_generator + response_input = f"Problem: {problem}\nTasks: []\nResults: []" + if format_prompt: + response_input += f"\n\nFormat Requirements: {format_prompt}" - self.task_results[task_id] = result - completed_tasks.add(task_id) - print(f"Task {task_id} completed") - - # Add to message list - preserve complete AIMessage object - if "message" in result and result["message"] is not None: - all_messages.append(result["message"]) - print(f"Added execution message to results, length: {len(str(result['message'].content))}") - else: - # Create an AIMessage object - content = result.get("content", f"Task {task_id} execution result unavailable") - msg = AIMessage(content=content) - msg.name = f"Task Executor {task_id}" - all_messages.append(msg) - print(f"Added execution message to results, length: {len(content)}") - - # Step 3: Generate final response - print("Generating final response...") - final_response = await self._generate_final_response(problem_text, self.task_results) - - # Add final response to message list - preserve complete AIMessage object - if "message" in final_response and final_response["message"] is not None: - all_messages.append(final_response["message"]) - print(f"Added final response to results, length: {len(str(final_response['message'].content))}") - else: - # Create an AIMessage object - content = final_response.get("final_answer", "Final response generation failed") - msg = AIMessage(content=content) - msg.name = "Response Generator" - all_messages.append(msg) - print(f"Added final response to results, length: {len(content)}") + response = self.response_generator.run( + problem=problem, + task_list="[]", + executed_task_list="[]", + format_prompt=format_prompt, + **kwargs + ) + finally: + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + return { + "tasks": [], + "executed_tasks": [], + "response": response, + "messages": message_collector.messages or [("ai", response)] # simulate message history + } + + # execute tasks + self.task_executor = TaskExecutor(task_list) + execution_status = self.task_executor.run() + + # collect execution results + executed_task_list = [] + for task in self.task_executor.tasks: + executed_task_list.append({ + "task": task.task, + "id": task.id, + "status": task.status, + "result": task.result, + "message": task.message + }) - end_time = time.time() - duration_ms = (end_time - start_time) * 1000 - print(f"Processing complete, took {duration_ms:.2f}ms") + # convert Plan object to serializable format + task_list_serializable = [] + for step in task_list.steps: + task_list_serializable.append({ + "task": step.task, + "id": step.id, + "dep": step.dep, + "args": step.args + }) - # Build result - result = { - "problem_id": problem.get("id", ""), - "problem": problem_text, - "messages": all_messages, # Preserve complete AIMessage object list - "final_answer": final_response.get("final_answer", ""), - "execution_time_ms": duration_ms, - } + # disable response_generator's output again + old_levels = {} + for logger_name in loggers_to_silence: + logger = logging.getLogger(logger_name) + old_levels[logger_name] = logger.level + logger.setLevel(logging.ERROR) - # Record agent responses - pass complete AIMessage object list to preserve usage_metadata try: - print("Recording agent responses...") - self._record_agent_responses(result["problem_id"], all_messages) - except Exception as e: - print(f"Error recording responses: {str(e)}") - - print("JARVIS processing complete") - return result + response = self.response_generator.run( + problem=problem, + task_list=json.dumps(task_list_serializable), + executed_task_list=json.dumps(executed_task_list), + format_prompt=format_prompt, + **kwargs + ) + finally: + for logger_name, old_level in old_levels.items(): + logging.getLogger(logger_name).setLevel(old_level) + + # collect all messages for evaluation + messages = message_collector.messages + if not messages: + messages = [("ai", response)] + + return { + "tasks": task_list, + "executed_tasks": executed_task_list, + "response": response, + "messages": messages + } + +class JarvisSingleAgent(AgentSystem): + """ + AgentSystem wrapper for the HuggingGPT implementation. + """ + def __init__(self, name: str = "jarvis", config: Dict[str, Any] | None = None): + super().__init__(name, config) + self.config = config or {} + self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") + self.llm = ChatOpenAI(model=self.model_name, temperature=0.0) -# Register agent system -AgentSystemRegistry.register("jarvis", JARVIS) + # As per user request, tools are initialized as an empty list + self.tools: list[BaseTool] = [] + self.agent = HuggingGPT(self.llm, self.tools, name=self.name) + # self.format_prompt is inherited from AgentSystem and set in super().__init__ + + async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: + """ + Runs the HuggingGPT agent. + """ + problem_text = problem["problem"] + # print(f"[Jarvis-Debug] JarvisSingleAgent.run_agent received problem: {problem_text}") + + result = await asyncio.to_thread( + self.agent.run_with_trace, + problem=problem_text, + format_prompt=self.format_prompt, + **kwargs + ) + + # print(f"[Jarvis-Debug] JarvisSingleAgent.run_agent final result: {result}") + + return { + "messages": result.get("messages", []), + "final_answer": result.get("response", "") + } + + def _create_agents(self) -> dict[str, list]: + """ + Create agents for tool integration. For HuggingGPT, we don't have separate agents, + but we can expose the main LLM instance for tool binding. + """ + # The wrapper expects a dictionary: {"workers": [worker1, worker2, ...]} + # Each worker should have a .name and .llm attribute. + # We can create a pseudo-worker representing the HuggingGPT planner. + planner_pseudo_worker = type("Worker", (object,), { + "name": "hugging_gpt_planner", + "llm": self.llm + }) + return {"workers": [planner_pseudo_worker]} -if __name__ == "__main__": - # Test the JARVIS agent - problem = { - "id": "1", - "problem": "What is the sum of the first 100 natural numbers?" - } - - jarvis = JARVIS() - result = jarvis.run_agent(problem) - print(result) +AgentSystemRegistry.register( + "jarvis", + JarvisSingleAgent +) diff --git a/mas_arena/agents/llm_debate.py b/mas_arena/agents/llm_debate.py index 46ce314..299f0d3 100644 --- a/mas_arena/agents/llm_debate.py +++ b/mas_arena/agents/llm_debate.py @@ -8,13 +8,13 @@ import os from typing import Dict, Any, List import contextlib +import asyncio from openai import AsyncOpenAI from dotenv import load_dotenv from mas_arena.agents.base import AgentSystem, AgentSystemRegistry # Load environment variables -load_dotenv() - +load_dotenv(override=True) class LLMDebate(AgentSystem): """ @@ -30,8 +30,8 @@ def __init__(self, name: str = "llm_debate", config: Dict[str, Any] = None): self.config = config or {} # Configuration parameters - self.agents_num = self.config.get("agents_num", 3) # Number of debate agents - self.rounds_num = self.config.get("rounds_num", 2) # Number of debate rounds + self.agents_num = self.config.get("agents_num", 2) # Number of debate agents + self.rounds_num = self.config.get("rounds_num", 3) # Number of debate rounds self.model_name = self.config.get("model_name") or os.getenv("MODEL_NAME", "gpt-4o-mini") # System prompt with format requirements @@ -41,7 +41,8 @@ def __init__(self, name: str = "llm_debate", config: Dict[str, Any] = None): # Initialize OpenAI client self.client = AsyncOpenAI( api_key=os.getenv("OPENAI_API_KEY"), - base_url=os.getenv("OPENAI_API_BASE") + base_url=os.getenv("OPENAI_API_BASE"), + timeout=40 ) async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: @@ -56,48 +57,37 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: """ query = problem["problem"] - # Initialize agent contexts - each agent starts with the same query - initial_message = { - "role": "user", - "content": f"{query} Make sure to state your answer at the end of the response." - } - agent_contexts = [[initial_message] for _ in range(self.agents_num)] + # Use the gen_math logic for processing + agent_contexts = await self._process_single_question_async(query, self.agents_num, self.rounds_num) + + # Convert to evaluation framework format all_messages = [] + final_answers = [] - # Multi-round debate + # Extract messages from agent contexts for round_idx in range(self.rounds_num): - round_messages = [] - for agent_idx, agent_context in enumerate(agent_contexts): - # For rounds after the first, add other agents' opinions - if round_idx > 0: - # Get contexts from other agents (excluding current agent) - other_contexts = agent_contexts[:agent_idx] + agent_contexts[agent_idx+1:] - debate_message = self._construct_debate_message(other_contexts, query, round_idx) - agent_context.append(debate_message) - - # Get response from current agent - response = await self._call_llm(agent_context) - - # Create response message with usage metadata - ai_message = { - 'content': response['content'], - 'name': f'debate_agent_{agent_idx+1}', - 'role': 'assistant', - 'message_type': 'ai_response', - 'round': round_idx + 1, - 'agent_id': agent_idx + 1, - 'usage_metadata': response['usage'] - } - - # Add response to agent's context - agent_context.append({"role": "assistant", "content": response['content']}) - round_messages.append(ai_message) - - all_messages.extend(round_messages) + # Get the assistant response for this round + response_index = 2 * round_idx + 1 # Based on gen_math logic + if response_index < len(agent_context): + response_msg = agent_context[response_index] + + ai_message = { + 'content': response_msg['content'], + 'name': f'debate_agent_{agent_idx+1}', + 'role': 'assistant', + 'message_type': 'ai_response', + 'round': round_idx + 1, + 'agent_id': f'debate_agent_{agent_idx+1}', + 'usage_metadata': None # gen_math doesn't track usage + } + print(f"agent_name: {ai_message['name']}") + all_messages.append(ai_message) # Extract final answers from each agent - final_answers = [context[-1]['content'] for context in agent_contexts] + for agent_context in agent_contexts: + if len(agent_context) >= 2: + final_answers.append(agent_context[-1]['content']) # Aggregate all answers into final result aggregated_answer = await self._aggregate_answers(query, final_answers) @@ -120,92 +110,179 @@ async def run_agent(self, problem: Dict[str, Any], **kwargs) -> Dict[str, Any]: "agents_participated": self.agents_num } - async def _call_llm(self, messages: List[Dict]) -> Dict[str, Any]: + async def _process_single_question_async(self, question: str, agents: int, rounds: int) -> List[List[Dict]]: """ - Call the LLM with given messages and return response with usage metadata. - - Args: - messages: List of message dictionaries - - Returns: - Dictionary containing response content and usage metadata + 异步处理单个问题的辩论过程 """ - # Prepare messages for API call - api_messages = [{"role": "system", "content": self.system_prompt}] + messages - - try: - response = await self.client.chat.completions.create( - model=self.model_name, - messages=api_messages - ) + # 为每个智能体创建初始上下文 + agent_contexts = [[{ + "role": "user", + "content": """Can you solve the following problem? {} Please explain your reasoning step by step. Make sure to state your final answer clearly at the end of your response.""".format(question), + "agent_id": i + 1, # 添加agent_id + "message_type": "user_query", # 添加消息类型 + "round": 0 # 添加轮次信息 + }] for i in range(agents)] + + # 多轮辩论 + for round in range(rounds): + # 收集所有智能体的任务 + tasks = [] - # Extract and clean content - content = response.choices[0].message.content - content = content.replace('\r\n', '\n').replace('\r', '\n').strip() - with contextlib.suppress(UnicodeDecodeError): - content = content.encode('utf-8').decode('utf-8-sig') # Remove BOM + for i, agent_context in enumerate(agent_contexts): + if round != 0: + agent_contexts_other = agent_contexts[:i] + agent_contexts[i+1:] + message = self._construct_message(agent_contexts_other, question, 2*round - 1) + # 添加元数据到message + message.update({ + "agent_id": i + 1, + "message_type": "debate_query", + "round": round + 1 + }) + agent_context.append(message) + + # 创建异步任务 + task = self._generate_answer_async(agent_context) + tasks.append((i, task)) - return { - 'content': content, - 'usage': response.usage - } + # 并行执行所有智能体的API调用 + print(f"第 {round + 1} 轮:并行调用 {len(tasks)} 个智能体...") - except Exception as e: - # Fallback response in case of API error - return { - 'content': f"Error calling LLM: {str(e)}", - 'usage': None - } + # 使用asyncio.gather真正并行执行所有任务 + task_results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) + + # 处理结果 + for (i, _), result in zip(tasks, task_results): + if isinstance(result, Exception): + print(f"智能体 {i} 出现异常: {result}") + content = f"抱歉,智能体 {i} 出现错误: {str(result)}" + usage = None + else: + content = result.choices[0].message.content if hasattr(result, 'choices') else "无法获取回答内容" + usage = getattr(result, 'usage', None) + + assistant_message = { + "role": "assistant", + "content": content, + "agent_id": i + 1, + "message_type": "ai_response", + "round": round + 1, + "usage_metadata": usage + } + agent_contexts[i].append(assistant_message) + + return agent_contexts - def _construct_debate_message(self, other_agent_contexts: List[List[Dict]], - query: str, round_idx: int) -> Dict[str, str]: + def _construct_message(self, agents: List[List[Dict]], question: str, idx: int) -> Dict[str, str]: """ - Construct a message containing other agents' opinions for the current agent. - - Args: - other_agent_contexts: List of other agents' conversation contexts - query: Original query/problem - round_idx: Current round index - - Returns: - Message dictionary containing other agents' opinions + 构造包含其他智能体意见的消息 """ - # Handle case with no other agents (single agent introspection) - if len(other_agent_contexts) == 0: + # Use introspection in the case in which there are no other agents. + if len(agents) == 0: return { "role": "user", "content": "Can you verify that your answer is correct. Please reiterate your answer, making sure to state your answer at the end of the response." } - - # Build message with other agents' recent responses + prefix_string = "These are the recent/updated opinions from other agents: " - - for agent_idx, agent_context in enumerate(other_agent_contexts): - # Get the most recent assistant response (last response in context) - if len(agent_context) > 1: # Ensure there's at least one response - # Find the latest assistant message - latest_response = None - for msg in reversed(agent_context): - if msg.get("role") == "assistant": - latest_response = msg["content"] - break - - if latest_response: - response_text = f"\n\nAgent {agent_idx + 1} response: ```{latest_response}```" - prefix_string += response_text - - # Add instruction for using the opinions - suffix_string = ( - f"\n\nUse these opinions carefully as additional advice, can you provide an updated answer? " - f"Make sure to state your answer at the end of the response. " - f"\nThe original problem is: {query}" - ) - + + for agent in agents: + if idx < len(agent): + agent_response = agent[idx]["content"] + agent_id = agent[idx].get("agent_id", "unknown") + response = f"\n\n Agent {agent_id} response: ```{agent_response}```" + prefix_string = prefix_string + response + + prefix_string = prefix_string + "\n\n Use these opinions carefully as additional advice, can you provide an updated answer? Make sure to state your answer at the end of the response." return { "role": "user", - "content": prefix_string + suffix_string + "content": prefix_string } + def _construct_assistant_message(self, completion) -> Dict[str, str]: + """ + 构造assistant消息 + """ + # 检查 completion 是否为字符串(错误情况) + if isinstance(completion, str): + print(f"警告:收到字符串响应而非API对象: {completion[:100]}...") + return {"role": "assistant", "content": "API返回了错误的响应格式"} + + # 检查是否有 choices 属性 + if hasattr(completion, 'choices') and len(completion.choices) > 0: + content = completion.choices[0].message.content + return {"role": "assistant", "content": content} + else: + # 备用方案 + return {"role": "assistant", "content": "抱歉,无法获取回答内容。"} + + async def _generate_answer_async(self, answer_context: List[Dict]) -> Any: + """ + 异步版本的API调用函数 + """ + # 添加系统提示到消息开头,移除元数据字段 + api_messages = [{"role": "system", "content": self.system_prompt}] + for msg in answer_context: + api_messages.append({ + "role": msg["role"], + "content": msg["content"] + }) + + max_retries = 3 + for attempt in range(max_retries): + try: + completion = await self.client.chat.completions.create( + model=self.model_name, + messages=api_messages, + n=1 + ) + return completion + except Exception as e: + print(f"API调用出错,正在重试... (尝试 {attempt + 1}/{max_retries})") + print(f"错误类型: {type(e).__name__}") + print(f"错误详情: {str(e)}") + if attempt < max_retries - 1: + print("等待5秒后重试...") # 减少等待时间 + await asyncio.sleep(5) + else: + print("已达到最大重试次数,跳过此请求") + # 返回一个模拟的 completion 对象以避免程序崩溃 + class MockCompletion: + def __init__(self): + self.choices = [type('obj', (object,), { + 'message': type('obj', (object,), { + 'content': "抱歉,由于API错误无法生成回答。" + })() + })()] + return MockCompletion() + + return None + + async def _call_llm(self, messages: List[Dict]) -> Dict[str, Any]: + """ + Call the LLM with given messages and return response with usage metadata. + """ + # 使用新的异步生成答案方法 + completion = await self._generate_answer_async(messages) + + # 提取内容和使用信息 + if hasattr(completion, 'choices') and len(completion.choices) > 0: + content = completion.choices[0].message.content + content = content.replace('\r\n', '\n').replace('\r', '\n').strip() + with contextlib.suppress(UnicodeDecodeError): + content = content.encode('utf-8').decode('utf-8-sig') # Remove BOM + + usage = getattr(completion, 'usage', None) + + return { + 'content': content, + 'usage': usage + } + else: + return { + 'content': "抱歉,无法获取回答内容。", + 'usage': None + } + async def _aggregate_answers(self, query: str, answers: List[str]) -> Dict[str, Any]: """ Aggregate all agents' final answers into a single result. @@ -235,4 +312,4 @@ async def _aggregate_answers(self, query: str, answers: List[str]) -> Dict[str, # Register the agent system AgentSystemRegistry.register("llm_debate", LLMDebate, - agents_num=3, rounds_num=2) \ No newline at end of file + agents_num=2, rounds_num=3) \ No newline at end of file