mcp sse stateful session in contrib#12
Conversation
Signed-off-by: 爵银 <jueyin.hsl@alibaba-inc.com>
新增 MCP SSE 有状态会话支持变更概述
变更文件
时序图sequenceDiagram
participant C as Client
participant E as Envoy
participant U as Upstream
C->>E: GET /sse?sessionId=original_session_id
E->>E: 解析 sessionId,提取原始 session ID 和编码的主机地址
E->>U: 路由到指定的上游服务器
U->>E: 返回 SSE 数据流
E->>E: 在 SSE 数据中注入主机地址
E->>C: 返回包含主机地址的 SSE 数据流
💡 小贴士与 lingma-agents 交流的方式📜 直接回复评论
📜 在代码行处标记
📜 在讨论中提问
|
There was a problem hiding this comment.
🔎 代码评审报告
🎯 评审意见概览
| 严重度 | 数量 | 说明 |
|---|---|---|
| 🔴 Blocker | 0 | 阻断性问题,需立即修复。例如:系统崩溃、关键功能不可用或严重安全漏洞。 |
| 🟠 Critical | 0 | 严重问题,高优先级修复。例如:核心功能异常或性能瓶颈影响用户体验。 |
| 🟡 Major | 2 | 主要问题,建议修复。例如:非核心功能缺陷或代码维护性较差。 |
| 🟢 Minor | 2 | 次要问题,酬情优化。例如:代码格式不规范或注释缺失。 |
总计: 4 个问题
📋 评审意见详情
💡 代码实现建议
以下是文件级别的代码建议,聚焦于代码的可读性、可维护性和潜在问题。
📄 contrib/mcp_sse_stateful_session/filters/http/source/mcp_sse_stateful_session.cc (1 💬)
- 优化会话状态创建的逻辑 (L66-L75)
📐 contrib/mcp_sse_stateful_session/filters/http/source/mcp_sse_stateful_session.h (1 💬)
- 增强对会话状态的断言 (L36-L40)
📄 contrib/mcp_sse_stateful_session/filters/http/test/stateful_session_integration_test.cc (2 💬)
- 测试用例中存在注释和代码不一致的问题,可能导致测试逻辑错误。 (L260-L261)
- 测试用例中存在重复代码,影响代码可维护性。 (L302-L339)
🚀 架构设计建议
以下是对代码架构和设计的综合分析,聚焦于跨文件交互、系统一致性和潜在优化空间。
🔍1. 架构一致性问题:MCP SSE有状态会话实现与现有会话管理机制的整合
新增的MCP SSE有状态会话功能在Envoy中引入了新的会话状态管理方式,但未与现有的会话保持机制(如stateful_session)进行充分整合或复用。这可能导致未来维护成本增加,以及在不同会话管理实现之间出现不一致性。建议评估是否可以复用或扩展现有的会话管理接口和逻辑,以保持架构的一致性。
可能导致代码重复、维护困难,以及在不同会话管理机制之间出现不一致的行为。
🔍2. 跨文件问题:会话状态工厂配置的重复实现风险
在mcp_sse_stateful_session.cc中实现了EmptySessionStateFactory,而在其他会话管理实现中可能也有类似的空状态工厂。这种重复实现可能增加维护成本和代码不一致性。建议抽象出通用的空状态工厂实现,供所有会话管理扩展使用。
📌 关键代码
class EmptySessionStateFactory : public Envoy::Http::McpSseSessionStateFactory {
public:
Envoy::Http::McpSseSessionStatePtr create(Envoy::Http::RequestHeaderMap&) const override {
return nullptr;
}
};
增加代码重复,提高维护成本,可能导致不同会话管理实现之间的行为不一致。
🔍3. 技术债务:严格的路由模式可能影响系统弹性
引入的严格路由模式(strict mode)在上游主机不可用时会直接返回503错误,这虽然符合某些业务场景的需求,但可能降低系统的整体弹性和容错能力。建议在严格模式下增加重试机制或降级策略,以提高系统的鲁棒性。
📌 关键代码
if (auto upstream_address = session_state_->upstreamAddress(); upstream_address.has_value()) {
decoder_callbacks_->setUpstreamOverrideHost(
std::make_pair(upstream_address.value(), effective_config.isStrict()));
}
在严格模式下,系统弹性降低,可能导致更多的503错误,影响用户体验。
🔍4. 可维护性问题:测试用例中的重复代码
在stateful_session_integration_test.cc中存在多个相似的测试用例,它们都涉及创建SSE请求和验证响应。这些重复的代码块增加了维护成本,且容易在修改时出现遗漏。建议提取公共的测试逻辑到辅助函数中,以提高测试代码的可维护性和可读性。
📌 关键代码
// 测试用例1的代码块
{
codec_client_ = makeHttpConnection(lookupPort("http"));
...
cleanupUpstreamAndDownstream();
}
// 测试用例2的代码块
{
codec_client_ = makeHttpConnection(lookupPort("http"));
...
cleanupUpstreamAndDownstream();
}
增加维护成本,容易在修改时出现遗漏,降低测试代码的可读性。
🔍5. 业务逻辑一致性:SSE数据处理的边界情况未充分覆盖
当前实现主要关注了SSE数据流中的标准结束符(如\n\n),但在实际应用中可能存在非标准的结束符或数据格式。建议在EnvelopeSessionState中增加对自定义结束符的支持,并在测试中覆盖更多边界情况,以确保业务逻辑的一致性和健壮性。
📌 关键代码
// The list of patterns to match the chunk end in the SSE data stream.
// Any of these patterns matched will be considered as the end of a chunk.
// recommended value is ["\r\n\r\n", "\n\n", "\r\r"]
// according to the HTML standard, the end of a server-sent-events' chunk can be
// - \r\n\r\n (double Carriage-Return Line-Feed)
// - \n\n (double Line-Feed)
// - \r\r (double Carriage-Return)
// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
// Customized patterns can be added to match the chunk end pattern.
repeated string chunk_end_patterns = 2 [(validate.rules).repeated = {
min_items: 1
items {string {min_len: 1}}
}];
在处理非标准SSE数据时可能出现错误,影响业务逻辑的一致性和系统的稳定性。
审查详情
📒 文件清单 (31 个文件)
✅ 新增: 24 个文件
📝 变更: 7 个文件
✅ 新增文件:
api/contrib/envoy/extensions/filters/http/mcp_sse_stateful_session/v3alpha/BUILDapi/contrib/envoy/extensions/filters/http/mcp_sse_stateful_session/v3alpha/mcp_sse_stateful_session.protoapi/contrib/envoy/extensions/http/mcp_sse_stateful_session/envelope.protoapi/contrib/envoy/extensions/http/mcp_sse_stateful_session/v3alpha/BUILDcontrib/mcp_sse_stateful_session/filters/http/source/BUILDcontrib/mcp_sse_stateful_session/filters/http/source/config.cccontrib/mcp_sse_stateful_session/filters/http/source/config.hcontrib/mcp_sse_stateful_session/filters/http/source/mcp_sse_stateful_session.cccontrib/mcp_sse_stateful_session/filters/http/source/mcp_sse_stateful_session.hcontrib/mcp_sse_stateful_session/filters/http/test/BUILDcontrib/mcp_sse_stateful_session/filters/http/test/config_test.cccontrib/mcp_sse_stateful_session/filters/http/test/mocks/BUILDcontrib/mcp_sse_stateful_session/filters/http/test/mocks/mcp_sse_stateful_session.cccontrib/mcp_sse_stateful_session/filters/http/test/mocks/mcp_sse_stateful_session.hcontrib/mcp_sse_stateful_session/filters/http/test/stateful_session_integration_test.cccontrib/mcp_sse_stateful_session/filters/http/test/stateful_session_test.cccontrib/mcp_sse_stateful_session/http/source/BUILDcontrib/mcp_sse_stateful_session/http/source/config.cccontrib/mcp_sse_stateful_session/http/source/config.hcontrib/mcp_sse_stateful_session/http/source/envelope.cccontrib/mcp_sse_stateful_session/http/source/envelope.hcontrib/mcp_sse_stateful_session/http/test/config_test.cccontrib/mcp_sse_stateful_session/http/test/envelope_test.ccenvoy/http/mcp_sse_stateful_session.h
📝 变更文件:
CODEOWNERSapi/BUILDapi/versioning/BUILDcontrib/contrib_build_config.bzlcontrib/extensions_metadata.yamlenvoy/http/BUILDtools/extensions/extensions_schema.yaml
💡 小贴士
与 lingma-agents 交流的方式
📜 直接回复评论
直接回复本条评论,lingma-agents 将自动处理您的请求。例如:
-
在当前代码中添加详细的注释说明。
-
请详细介绍一下你说的 LRU 改造方案,并使用伪代码加以说明。
📜 在代码行处标记
在文件的特定位置创建评论并 @lingma-agents。例如:
-
@lingma-agents 分析这个方法的性能瓶颈并提供优化建议。
-
@lingma-agents 对这个方法生成优化代码。
📜 在讨论中提问
在任何讨论中 @lingma-agents 来获取帮助。例如:
-
@lingma-agents 请总结上述讨论并提出解决方案。
-
@lingma-agents 请根据讨论内容生成优化代码。
| session_state_ = effective_config.createSessionState(headers); | ||
| if (session_state_ == nullptr) { | ||
| return Envoy::Http::FilterHeadersStatus::Continue; | ||
| } | ||
|
|
||
| if (auto upstream_address = session_state_->upstreamAddress(); upstream_address.has_value()) { | ||
| decoder_callbacks_->setUpstreamOverrideHost( | ||
| std::make_pair(upstream_address.value(), effective_config.isStrict())); | ||
| } | ||
| return Envoy::Http::FilterHeadersStatus::Continue; |
There was a problem hiding this comment.
优化会话状态创建的逻辑
🟢 Minor | 🧹 Code Smells
📋 问题详情
在 decodeHeaders 方法中,如果 session_state_ 为 nullptr,则直接返回 Continue。然而,如果 session_state_ 不为 nullptr 但 upstreamAddress 为空,仍然会执行 setUpstreamOverrideHost。这可能会导致不必要的操作。建议在调用 setUpstreamOverrideHost 前检查 upstreamAddress 是否为空。
💡 解决方案
当前代码已经正确处理了 upstreamAddress 为空的情况,无需进一步优化。建议保持现状。
您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
| Envoy::Http::McpSseSessionStatePtr | ||
| createSessionState(Envoy::Http::RequestHeaderMap& headers) const { | ||
| ASSERT(factory_ != nullptr); | ||
| return factory_->create(headers); | ||
| } |
There was a problem hiding this comment.
增强对会话状态的断言
🟡 Major | 🐞 Bugs
📋 问题详情
在 McpSseStatefulSessionConfig 类的 createSessionState 方法中,有一个 ASSERT(factory_ != nullptr); 断言。虽然这在调试时很有用,但在生产环境中可能会导致程序崩溃。建议添加一个运行时检查,以确保 factory_ 不为 nullptr,并在为空时返回 nullptr。
💡 解决方案
将 ASSERT 替换为运行时检查,以提高代码的健壮性:
Envoy::Http::McpSseSessionStatePtr
createSessionState(Envoy::Http::RequestHeaderMap& headers) const {
- ASSERT(factory_ != nullptr);
+ if (factory_ == nullptr) {
+ return nullptr;
+ }
return factory_->create(headers);
}您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
| // Expect that the selected upstream is index 1 | ||
| EXPECT_EQ(upstream_index.value(), 2); |
There was a problem hiding this comment.
测试用例中存在注释和代码不一致的问题,可能导致测试逻辑错误。
🟢 Minor | 🧹 Code Smells
📋 问题详情
在DownstreamRequestWithMcpSseStatefulSessionEnvelopeAndStrictMode测试用例中,注释表明期望选择的上游索引是1,但实际代码中设置的是索引2。这种不一致可能导致测试结果不符合预期,影响测试的准确性。
💡 解决方案
修改注释以匹配实际的期望值,确保注释和代码的一致性。
- // Expect that the selected upstream is index 1
+ // Expect that the selected upstream is index 2您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
| { | ||
| codec_client_ = makeHttpConnection(lookupPort("http")); | ||
|
|
||
| // Construct SSE request | ||
| Envoy::Http::TestRequestHeaderMapImpl sse_request_headers{ | ||
| {":method", "GET"}, | ||
| {":path", "/sse"}, | ||
| {":scheme", "http"}, | ||
| {":authority", "stateful.session.com"}}; | ||
|
|
||
| auto sse_response = codec_client_->makeRequestWithBody(sse_request_headers, 0); | ||
|
|
||
| // Wait for upstream request | ||
| auto upstream_index = waitForNextUpstreamRequest({0, 1, 2, 3}); | ||
| ASSERT(upstream_index.has_value()); | ||
|
|
||
| envoy::config::endpoint::v3::LbEndpoint endpoint; | ||
| setUpstreamAddress(upstream_index.value(), endpoint); | ||
| const std::string address_string = | ||
| fmt::format("127.0.0.1:{}", endpoint.endpoint().address().socket_address().port_value()); | ||
| const std::string encoded_host = | ||
| Envoy::Base64Url::encode(address_string.data(), address_string.size()); | ||
|
|
||
| // Set content type to text/event-stream (required for SSE) | ||
| default_response_headers_.addCopy(Envoy::Http::LowerCaseString("content-type"), | ||
| "text/event-stream"); | ||
| upstream_request_->encodeHeaders(default_response_headers_, false); // stream not closed yet | ||
|
|
||
| // Build and send initial SSE event data | ||
| const std::string original_session_id = "abcdefg"; | ||
| const std::string sse_data = | ||
| fmt::format("data: https://example.com/test?sessionId={}\n\n", original_session_id); | ||
| upstream_request_->encodeData(sse_data, true); | ||
| ASSERT_TRUE(sse_response->waitForEndStream()); | ||
|
|
||
| EXPECT_TRUE(upstream_request_->complete()); | ||
| EXPECT_TRUE(sse_response->complete()); | ||
|
|
There was a problem hiding this comment.
测试用例中存在重复代码,影响代码可维护性。
🟡 Major | 🧹 Code Smells
📋 问题详情
多个测试用例中存在重复的代码块,用于设置SSE请求和验证响应。这种重复增加了维护成本,并且容易在修改时引入错误。应该将重复的代码提取到辅助函数中。
💡 解决方案
将重复的代码提取到辅助函数中,以提高代码的可维护性和减少重复。
- codec_client_ = makeHttpConnection(lookupPort("http"));
-
- // Construct SSE request
- Envoy::Http::TestRequestHeaderMapImpl sse_request_headers{
- {":method", "GET"},
- {":path", "/sse"},
- {":scheme", "http"},
- {":authority", "stateful.session.com"}};
-
- auto sse_response = codec_client_->makeRequestWithBody(sse_request_headers, 0);
-
- // Wait for upstream request
- auto upstream_index = waitForNextUpstreamRequest({0, 1, 2, 3});
- ASSERT(upstream_index.has_value());
-
- envoy::config::endpoint::v3::LbEndpoint endpoint;
- setUpstreamAddress(upstream_index.value(), endpoint);
- const std::string address_string =
- fmt::format("127.0.0.1:{}", endpoint.endpoint().address().socket_address().port_value());
- const std::string encoded_host =
- Envoy::Base64Url::encode(address_string.data(), address_string.size());
-
- // Set content type to text/event-stream (required for SSE)
- default_response_headers_.addCopy(Envoy::Http::LowerCaseString("content-type"),
- "text/event-stream");
- upstream_request_->encodeHeaders(default_response_headers_, false); // stream not closed yet
-
- // Build and send initial SSE event data
- const std::string original_session_id = "abcdefg";
- const std::string sse_data =
- fmt::format("data: https://example.com/test?sessionId={}\\n\\n", original_session_id);
- upstream_request_->encodeData(sse_data, true);
- ASSERT_TRUE(sse_response->waitForEndStream());
-
- EXPECT_TRUE(upstream_request_->complete());
- EXPECT_TRUE(sse_response->complete());
+ // 提取到辅助函数中
+ auto sse_response = makeSseRequest();您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
cherry-pick strict mode from envoy proxy envoyproxy#30573 Signed-off-by: jue-yin <jueyin.hsl@alibaba-inc.com>
Signed-off-by: jue-yin <jueyin.hsl@alibaba-inc.com>
Additional Description:
Sync the feature in envoy-proxy.
This PR adds native support for the MCP 241105 spec in the envelope stateful session extension.
It enables Envoy to handle session context in SSE event streams, allowing session ID and upstream host to be encoded/decoded as required by the protocol.
Separator Description:
In SSE mode, the session ID and upstream host are joined using a separator . (dot).
The format is: sessionId=<original_session_id>.<base64url_encoded_host>
For example: sessionId=abcdefg.MTI3LjAuMC4xOjgw
When processing requests, Envoy splits the value at the last . to extract the original session ID and the upstream host (after Base64Url decoding).
The implementation covers both header-based and SSE (query param) session modes, and refactors the interface for better extensibility and clarity.
Comprehensive unit and integration tests are included to ensure correctness, including strict mode and various edge cases.
Risk Level:
Low
Testing:
Unit and integration tests added and enhanced for new SSE session mode.
Docs Changes:
Updated relevant documentation to describe the new SSE session mode, separator usage, and example formats.
Release Notes:
Added
Platform Specific Features:
N/A
Relevant Links:
Related issue: envoyproxy#40247
Related previous PR: envoyproxy#39004
MCP 241105 Spec
recommand use route level Envoyfilter to
istio EnvoyFilter example: