-
-
Notifications
You must be signed in to change notification settings - Fork 42
Idea: Paralize MCP tool call execution #335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
# Conflicts: # wingmen/open_ai_wingman.py
… should only work on first load.
…fore saving learning data
… handling with preload for faster response on first request
…in database operations and add preload data function
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
… prevent google validation error
… calls as they are most likely not dependent on exact order in which they are executed. For me it sometimes saved 2s on a large amount of requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces parallel execution of MCP (Model Context Protocol) tool calls to improve performance, particularly reducing startup time by approximately 2 seconds when multiple MCP calls are made. The implementation extracts tool call processing logic into a separate method and uses chunking to execute consecutive MCP tools in parallel while maintaining sequential execution for non-MCP tools.
Key Changes:
- Extracted tool call processing into a dedicated
_process_single_tool_callmethod for code reusability - Implemented chunking algorithm to group consecutive MCP tool calls together
- Added parallel execution using
asyncio.gatherfor MCP tool chunks while preserving sequential execution for non-MCP tools
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if is_mcp: | ||
| # Execute MCP tools in parallel | ||
| tasks = [self._process_single_tool_call(tc) for tc in chunk_tools] | ||
| results = await asyncio.gather(*tasks) |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error handling gap in parallel execution: When using asyncio.gather without return_exceptions=True, if any MCP tool call raises an exception that isn't caught within _process_single_tool_call, it will propagate and cancel all other parallel tasks. While _process_single_tool_call has a broad exception handler, consider adding return_exceptions=True to asyncio.gather for more robust error handling, or ensure that all possible exceptions are caught within _process_single_tool_call.
| results = await asyncio.gather(*tasks) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Normalize any exceptions into empty results so a single failing | |
| # MCP tool call does not cancel or break aggregation. | |
| normalized_results = [] | |
| for idx, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| printr.log( | |
| LogType.ERROR, | |
| LogSource.SYSTEM, | |
| f"Error in MCP tool call {idx + 1}/{len(results)}: {result}", | |
| ) | |
| normalized_results.append((None, None, [])) | |
| else: | |
| normalized_results.append(result) | |
| results = normalized_results |
| if res_instant: | ||
| instant_response = res_instant | ||
| if res_skill: |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ambiguous behavior when multiple tool calls return non-None values: The aggregation logic overwrites instant_response and skill whenever a non-None value is encountered. If multiple MCP tools executing in parallel return different instant_response or skill values, only the last one processed will be retained, which may not be the intended behavior. Consider documenting this behavior or implementing a more explicit strategy (e.g., keeping the first non-None value, or raising a warning when multiple values are found).
| if res_instant: | |
| instant_response = res_instant | |
| if res_skill: | |
| # Keep the first non-None instant_response and skill to avoid | |
| # ambiguous behavior when multiple tools return values. | |
| if res_instant is not None and instant_response is None: | |
| instant_response = res_instant | |
| if res_skill is not None and skill is None: |
|
|
||
| return response_message, response_message.tool_calls | ||
|
|
||
| async def _process_single_tool_call(self, tool_call): |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The title contains a typo: "Paralize" should be "Parallelize". The word "parallelize" means to execute tasks in parallel.
|
|
||
| return response_message, response_message.tool_calls | ||
|
|
||
| async def _process_single_tool_call(self, tool_call): |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description contains a typo: "paralyze" should be "parallelize". The word "parallelize" means to execute tasks in parallel, whereas "paralyze" means to cause paralysis or inability to move.
| if is_mcp: | ||
| # Execute MCP tools in parallel | ||
| tasks = [self._process_single_tool_call(tc) for tc in chunk_tools] | ||
| results = await asyncio.gather(*tasks) | ||
| else: | ||
| # Execute non-MCP tools sequentially | ||
| results = [] | ||
| for tc in chunk_tools: | ||
| results.append(await self._process_single_tool_call(tc)) |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition: When multiple MCP tool calls execute in parallel via asyncio.gather, they may concurrently call _add_tool_response or _update_tool_response, which both modify shared state (self.messages and self.pending_tool_calls lists). These methods are not thread-safe and concurrent modifications could lead to corrupted message history or lost updates. Consider adding a lock (asyncio.Lock) to protect these critical sections, or ensure that _update_tool_response and _add_tool_response are called sequentially after parallel execution completes.
| if is_mcp: | |
| # Execute MCP tools in parallel | |
| tasks = [self._process_single_tool_call(tc) for tc in chunk_tools] | |
| results = await asyncio.gather(*tasks) | |
| else: | |
| # Execute non-MCP tools sequentially | |
| results = [] | |
| for tc in chunk_tools: | |
| results.append(await self._process_single_tool_call(tc)) | |
| # Execute all tools in this chunk sequentially to avoid concurrent | |
| # mutations of shared state (e.g., self.messages, self.pending_tool_calls). | |
| # Previously, MCP tools were executed in parallel via asyncio.gather. | |
| results = [] | |
| for tc in chunk_tools: | |
| results.append(await self._process_single_tool_call(tc)) |
My idea is to paralyze MCP tool calls, as the order in which they are executed is most likely not important.
So doing it in parallel could shave of some valuable seconds on large request amount (like I have them on startup.)
For me on startup it saves around 2s. (~X MCP calls to load my current data)
Will keep that change locally. But if interested, here is the option to have it in Wingman AI Core officially.
So either accept it or close it depending on preference :)