Merged
Conversation
Contributor
评审者指南为 omni 模型响应新增了健壮的 Token 使用量提取逻辑,确保多智能体集群的 Token 统计包含主路由与合并的开销,同时收紧 MCP 客户端的 HTTP 状态处理,并支持流式 Token 使用量上报。 多智能体流式 token 统计(包含 sub_usage 事件)的时序图sequenceDiagram
actor User
participant AppChatService
participant MultiAgentOrchestrator
participant MasterAgentRouter
participant SubAgent
User->>AppChatService: multi_agent_chat_stream()
AppChatService->>MultiAgentOrchestrator: _execute_supervisor_stream(...)
MultiAgentOrchestrator->>MasterAgentRouter: _analyze_task() / _call_master_agent_llm()
MasterAgentRouter-->>MultiAgentOrchestrator: routing_decision + _last_routing_tokens
MultiAgentOrchestrator->>MultiAgentOrchestrator: task_analysis["routing_tokens"] = _last_routing_tokens
loop supervisor_stream
MultiAgentOrchestrator-->>AppChatService: event: sub_usage (routing_tokens)
AppChatService->>AppChatService: detect "event: sub_usage" and parse data.total_tokens
AppChatService->>AppChatService: total_tokens += data.get("total_tokens", 0)
AppChatService-->>User: (no forward for sub_usage)
MultiAgentOrchestrator->>SubAgent: _execute_sub_agent_stream()
loop sub_agent_events
SubAgent-->>MultiAgentOrchestrator: SSE event (may include sub_usage)
MultiAgentOrchestrator-->>AppChatService: passthrough event
alt event is sub_usage
AppChatService->>AppChatService: accumulate total_tokens
else other event
AppChatService-->>User: forward event
end
end
end
MultiAgentOrchestrator->>MultiAgentOrchestrator: _master_merge_results()
MultiAgentOrchestrator->>MultiAgentOrchestrator: _last_merge_tokens = merge_tokens
MultiAgentOrchestrator-->>AppChatService: final events
AppChatService-->>User: final response stream
在 LangChainAgent 中对 omni 模型进行流式 token 提取的时序图sequenceDiagram
participant Caller
participant LangChainAgent
participant LLMProvider
participant AIMessage
Caller->>LangChainAgent: chat_stream(..., files)
LangChainAgent->>LLMProvider: send streaming request
loop streaming chunks
LLMProvider-->>LangChainAgent: AIMessage chunk
LangChainAgent->>LangChainAgent: build content from chunk
end
LangChainAgent->>LangChainAgent: locate final AIMessage
LangChainAgent->>LangChainAgent: _extract_tokens_from_message(msg)
alt response_metadata.token_usage.total_tokens
LangChainAgent->>LangChainAgent: total = response_metadata["token_usage"]["total_tokens"]
else response_metadata.usage.total_tokens
LangChainAgent->>LangChainAgent: total = response_metadata["usage"]["total_tokens"]
else usage_metadata.total_tokens
LangChainAgent->>LangChainAgent: total = usage_metadata.total_tokens
else no tokens found
LangChainAgent->>LangChainAgent: total = 0
end
LangChainAgent-->>Caller: yield total_tokens as int in stream
LangChainAgent-->>Caller: yield content chunks as str (earlier in stream)
更新后的 token 使用量与流式处理类图classDiagram
class LangChainAgent {
+chat()
+chat_stream(end_user_id, message_chat, storage_type, user_rag_memory_id, memory_flag, files) AsyncGenerator~str|int~
-_prepare_messages()
-_build_multimodal_content(text, files) List~Dict~
<<static>> -_extract_tokens_from_message(msg) int
}
class MultiAgentOrchestrator {
+execute()
-_analyze_task(message, variables) Dict
-_execute_sequential()
-_execute_supervisor_stream(agent_data, message, end_user_id, storage_type, user_rag_memory_id, memory_flag) AsyncGenerator~str~
-_execute_sub_agent_stream()
-_master_merge_results(responses, api_key_config)
-_last_merge_tokens int
-router MasterAgentRouter
-config MultiAgentConfig
-db Session
}
class MasterAgentRouter {
-_call_master_agent_llm(prompt) str
-_last_routing_tokens int
-db Session
}
class BaseModel {
+get_model_params(config) Dict~str, Any~
}
class MCPClient {
-_initialize_sse_session()
-_send_sse_request(request) Dict~str, Any~
-_send_sse_notification(notification)
-_initialize_modelscope_session()
-_session ClientSession
-server_url str
-_endpoint_url str
}
class AppChatService {
+multi_agent_chat_stream()
}
class ModelApiKeyService {
+record_api_key_usage(db, api_key_id)
}
class RedBearModelConfig {
+model_name str
+base_url str
+api_key str
+temperature float
+max_retries int
+extra_params Dict~str, Any~
+provider ModelProvider
}
class ModelProvider {
<<enumeration>>
OPENAI
XINFERENCE
GPUSTACK
OLLAMA
VOLCANO
REDBEAR
DASHSCOPE
}
LangChainAgent ..> BaseModel : uses
MultiAgentOrchestrator --> MasterAgentRouter : has
MultiAgentOrchestrator ..> ModelApiKeyService : uses
MasterAgentRouter ..> ModelApiKeyService : uses
BaseModel --> RedBearModelConfig : takes
RedBearModelConfig --> ModelProvider : uses
AppChatService ..> MultiAgentOrchestrator : uses
MCPClient ..> MCPConnectionError : raises
文件级变更
技巧与命令与 Sourcery 交互
自定义使用体验访问你的 控制面板 来:
获取帮助Original review guide in EnglishReviewer's GuideAdds robust token usage extraction for omni model responses and ensures multi-agent cluster token accounting includes master routing and merge overhead, while tightening HTTP status handling for MCP clients and enabling streaming usage reporting. Sequence diagram for multi-agent streaming token accounting with sub_usage eventssequenceDiagram
actor User
participant AppChatService
participant MultiAgentOrchestrator
participant MasterAgentRouter
participant SubAgent
User->>AppChatService: multi_agent_chat_stream()
AppChatService->>MultiAgentOrchestrator: _execute_supervisor_stream(...)
MultiAgentOrchestrator->>MasterAgentRouter: _analyze_task() / _call_master_agent_llm()
MasterAgentRouter-->>MultiAgentOrchestrator: routing_decision + _last_routing_tokens
MultiAgentOrchestrator->>MultiAgentOrchestrator: task_analysis["routing_tokens"] = _last_routing_tokens
loop supervisor_stream
MultiAgentOrchestrator-->>AppChatService: event: sub_usage (routing_tokens)
AppChatService->>AppChatService: detect "event: sub_usage" and parse data.total_tokens
AppChatService->>AppChatService: total_tokens += data.get("total_tokens", 0)
AppChatService-->>User: (no forward for sub_usage)
MultiAgentOrchestrator->>SubAgent: _execute_sub_agent_stream()
loop sub_agent_events
SubAgent-->>MultiAgentOrchestrator: SSE event (may include sub_usage)
MultiAgentOrchestrator-->>AppChatService: passthrough event
alt event is sub_usage
AppChatService->>AppChatService: accumulate total_tokens
else other event
AppChatService-->>User: forward event
end
end
end
MultiAgentOrchestrator->>MultiAgentOrchestrator: _master_merge_results()
MultiAgentOrchestrator->>MultiAgentOrchestrator: _last_merge_tokens = merge_tokens
MultiAgentOrchestrator-->>AppChatService: final events
AppChatService-->>User: final response stream
Sequence diagram for omni model streaming token extraction in LangChainAgentsequenceDiagram
participant Caller
participant LangChainAgent
participant LLMProvider
participant AIMessage
Caller->>LangChainAgent: chat_stream(..., files)
LangChainAgent->>LLMProvider: send streaming request
loop streaming chunks
LLMProvider-->>LangChainAgent: AIMessage chunk
LangChainAgent->>LangChainAgent: build content from chunk
end
LangChainAgent->>LangChainAgent: locate final AIMessage
LangChainAgent->>LangChainAgent: _extract_tokens_from_message(msg)
alt response_metadata.token_usage.total_tokens
LangChainAgent->>LangChainAgent: total = response_metadata["token_usage"]["total_tokens"]
else response_metadata.usage.total_tokens
LangChainAgent->>LangChainAgent: total = response_metadata["usage"]["total_tokens"]
else usage_metadata.total_tokens
LangChainAgent->>LangChainAgent: total = usage_metadata.total_tokens
else no tokens found
LangChainAgent->>LangChainAgent: total = 0
end
LangChainAgent-->>Caller: yield total_tokens as int in stream
LangChainAgent-->>Caller: yield content chunks as str (earlier in stream)
Class diagram for updated token usage and streaming handlingclassDiagram
class LangChainAgent {
+chat()
+chat_stream(end_user_id, message_chat, storage_type, user_rag_memory_id, memory_flag, files) AsyncGenerator~str|int~
-_prepare_messages()
-_build_multimodal_content(text, files) List~Dict~
<<static>> -_extract_tokens_from_message(msg) int
}
class MultiAgentOrchestrator {
+execute()
-_analyze_task(message, variables) Dict
-_execute_sequential()
-_execute_supervisor_stream(agent_data, message, end_user_id, storage_type, user_rag_memory_id, memory_flag) AsyncGenerator~str~
-_execute_sub_agent_stream()
-_master_merge_results(responses, api_key_config)
-_last_merge_tokens int
-router MasterAgentRouter
-config MultiAgentConfig
-db Session
}
class MasterAgentRouter {
-_call_master_agent_llm(prompt) str
-_last_routing_tokens int
-db Session
}
class BaseModel {
+get_model_params(config) Dict~str, Any~
}
class MCPClient {
-_initialize_sse_session()
-_send_sse_request(request) Dict~str, Any~
-_send_sse_notification(notification)
-_initialize_modelscope_session()
-_session ClientSession
-server_url str
-_endpoint_url str
}
class AppChatService {
+multi_agent_chat_stream()
}
class ModelApiKeyService {
+record_api_key_usage(db, api_key_id)
}
class RedBearModelConfig {
+model_name str
+base_url str
+api_key str
+temperature float
+max_retries int
+extra_params Dict~str, Any~
+provider ModelProvider
}
class ModelProvider {
<<enumeration>>
OPENAI
XINFERENCE
GPUSTACK
OLLAMA
VOLCANO
REDBEAR
DASHSCOPE
}
LangChainAgent ..> BaseModel : uses
MultiAgentOrchestrator --> MasterAgentRouter : has
MultiAgentOrchestrator ..> ModelApiKeyService : uses
MasterAgentRouter ..> ModelApiKeyService : uses
BaseModel --> RedBearModelConfig : takes
RedBearModelConfig --> ModelProvider : uses
AppChatService ..> MultiAgentOrchestrator : uses
MCPClient ..> MCPConnectionError : raises
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - 我在这里留下了一些高层面的反馈:
- 新的
_extract_tokens_from_message辅助函数把 token 提取逻辑集中起来了,但在_master_merge_results和_call_master_agent_llm中现在又出现了类似的逻辑重复;建议在这些地方也复用该辅助函数,以保持一致性并便于未来维护。 - 在
chat_stream中,生成器类型改为了AsyncGenerator[str | int, None],现在可能会在字符串事件之外额外产出裸的int;这种混合类型的流对调用方来说可能比较脆弱,如果你总是像sub_usage那样把 token 计数包装成一致的 SSE/事件格式,会更安全一些。 - 在
chat_stream这个紧凑循环内部用于stream_total_tokens的logger.info调用在高负载下可能非常嘈杂;如果主要是用于排障,建议将其降级为debug日志,或者增加限流。
给 AI Agent 的提示词
Please address the comments from this code review:
## Overall Comments
- The new `_extract_tokens_from_message` helper centralizes token extraction, but similar logic is now duplicated in `_master_merge_results` and `_call_master_agent_llm`; consider reusing the helper there for consistency and easier future maintenance.
- In `chat_stream` the generator type is changed to `AsyncGenerator[str | int, None]` and may now yield a bare `int` alongside string events; this mixed-type stream could be brittle for callers and might be safer if you always wrap token counts in a consistent SSE/event format as done with `sub_usage`.
- The `logger.info` call for `stream_total_tokens` inside the tight `chat_stream` loop could be very noisy under load; consider downgrading this to `debug` or adding rate limiting if you only need it for troubleshooting.帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈改进后续的代码审查。
Original comment in English
Hey - I've left some high level feedback:
- The new
_extract_tokens_from_messagehelper centralizes token extraction, but similar logic is now duplicated in_master_merge_resultsand_call_master_agent_llm; consider reusing the helper there for consistency and easier future maintenance. - In
chat_streamthe generator type is changed toAsyncGenerator[str | int, None]and may now yield a bareintalongside string events; this mixed-type stream could be brittle for callers and might be safer if you always wrap token counts in a consistent SSE/event format as done withsub_usage. - The
logger.infocall forstream_total_tokensinside the tightchat_streamloop could be very noisy under load; consider downgrading this todebugor adding rate limiting if you only need it for troubleshooting.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `_extract_tokens_from_message` helper centralizes token extraction, but similar logic is now duplicated in `_master_merge_results` and `_call_master_agent_llm`; consider reusing the helper there for consistency and easier future maintenance.
- In `chat_stream` the generator type is changed to `AsyncGenerator[str | int, None]` and may now yield a bare `int` alongside string events; this mixed-type stream could be brittle for callers and might be safer if you always wrap token counts in a consistent SSE/event format as done with `sub_usage`.
- The `logger.info` call for `stream_total_tokens` inside the tight `chat_stream` loop could be very noisy under load; consider downgrading this to `debug` or adding rate limiting if you only need it for troubleshooting.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
# Conflicts: # api/app/core/agent/langchain_agent.py # api/app/core/tools/mcp/client.py
zhuwh
approved these changes
Apr 1, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
由 Sourcery 提供的摘要
改进对 omni 模型和多代理集群的 token 使用跟踪,并加强 HTTP/MCP 处理的健壮性。
新功能:
错误修复:
增强:
Original summary in English
Summary by Sourcery
Improve token usage tracking for omni models and multi-agent clusters, and harden HTTP/MCP handling.
New Features:
Bug Fixes:
Enhancements: